1use std::{
2 borrow::Cow,
3 collections::hash_map::Entry,
4 mem::{take, transmute},
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use anyhow::Result;
10use rustc_hash::{FxHashMap, FxHashSet};
11use turbopack_trace_utils::tracing::{TraceRow, TraceValue};
12
13use super::TraceFormat;
14use crate::{
15 FxIndexMap,
16 span::SpanIndex,
17 store_container::{StoreContainer, StoreWriteGuard},
18 timestamp::Timestamp,
19};
20
21#[derive(Default)]
22struct AllocationInfo {
23 allocations: u64,
24 deallocations: u64,
25 allocation_count: u64,
26 deallocation_count: u64,
27}
28
29struct InternalRow<'a> {
30 id: Option<u64>,
31 ty: InternalRowType<'a>,
32}
33
34impl InternalRow<'_> {
35 fn into_static(self) -> InternalRow<'static> {
36 InternalRow {
37 id: self.id,
38 ty: self.ty.into_static(),
39 }
40 }
41}
42
43enum InternalRowType<'a> {
44 Start {
45 new_id: u64,
46 ts: Timestamp,
47 name: Cow<'a, str>,
48 target: Cow<'a, str>,
49 values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
50 },
51 End {
52 ts: Timestamp,
53 },
54 SelfTime {
55 start: Timestamp,
56 end: Timestamp,
57 },
58 Event {
59 ts: Timestamp,
60 values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
61 },
62 Record {
63 values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
64 },
65 Allocation {
66 allocations: u64,
67 allocation_count: u64,
68 },
69 Deallocation {
70 deallocations: u64,
71 deallocation_count: u64,
72 },
73}
74
75impl InternalRowType<'_> {
76 fn into_static(self) -> InternalRowType<'static> {
77 match self {
78 InternalRowType::Start {
79 ts,
80 new_id,
81 name,
82 target,
83 values,
84 } => InternalRowType::Start {
85 ts,
86 new_id,
87 name: name.into_owned().into(),
88 target: target.into_owned().into(),
89 values: values
90 .into_iter()
91 .map(|(k, v)| (k.into_owned().into(), v.into_static()))
92 .collect(),
93 },
94 InternalRowType::End { ts } => InternalRowType::End { ts },
95 InternalRowType::SelfTime { start, end } => InternalRowType::SelfTime { start, end },
96 InternalRowType::Event { ts, values } => InternalRowType::Event {
97 ts,
98 values: values
99 .into_iter()
100 .map(|(k, v)| (k.into_owned().into(), v.into_static()))
101 .collect(),
102 },
103 InternalRowType::Record { values } => InternalRowType::Record {
104 values: values
105 .into_iter()
106 .map(|(k, v)| (k.into_owned().into(), v.into_static()))
107 .collect(),
108 },
109 InternalRowType::Allocation {
110 allocations,
111 allocation_count,
112 } => InternalRowType::Allocation {
113 allocations,
114 allocation_count,
115 },
116 InternalRowType::Deallocation {
117 deallocations,
118 deallocation_count,
119 } => InternalRowType::Deallocation {
120 deallocations,
121 deallocation_count,
122 },
123 }
124 }
125}
126
127#[derive(Default)]
128struct QueuedRows {
129 rows: Vec<InternalRow<'static>>,
130}
131
132pub struct TurbopackFormat {
133 store: Arc<StoreContainer>,
134 id_mapping: FxHashMap<u64, SpanIndex>,
135 queued_rows: FxHashMap<u64, QueuedRows>,
136 outdated_spans: FxHashSet<SpanIndex>,
137 thread_stacks: FxHashMap<u64, Vec<u64>>,
138 thread_allocation_counters: FxHashMap<u64, AllocationInfo>,
139 self_time_started: FxHashMap<(u64, u64), Timestamp>,
140}
141
142impl TurbopackFormat {
143 pub fn new(store: Arc<StoreContainer>) -> Self {
144 Self {
145 store,
146 id_mapping: FxHashMap::default(),
147 queued_rows: FxHashMap::default(),
148 outdated_spans: FxHashSet::default(),
149 thread_stacks: FxHashMap::default(),
150 thread_allocation_counters: FxHashMap::default(),
151 self_time_started: FxHashMap::default(),
152 }
153 }
154
155 fn process(&mut self, store: &mut StoreWriteGuard, row: TraceRow<'_>) {
156 match row {
157 TraceRow::Start {
158 ts,
159 id,
160 parent,
161 name,
162 target,
163 values,
164 } => {
165 let ts = Timestamp::from_micros(ts);
166 self.process_internal_row(
167 store,
168 InternalRow {
169 id: parent,
170 ty: InternalRowType::Start {
171 ts,
172 new_id: id,
173 name,
174 target,
175 values,
176 },
177 },
178 );
179 }
180 TraceRow::Record { id, values } => {
181 self.process_internal_row(
182 store,
183 InternalRow {
184 id: Some(id),
185 ty: InternalRowType::Record { values },
186 },
187 );
188 }
189 TraceRow::End { ts, id } => {
190 let ts = Timestamp::from_micros(ts);
191 self.process_internal_row(
192 store,
193 InternalRow {
194 id: Some(id),
195 ty: InternalRowType::End { ts },
196 },
197 );
198 }
199 TraceRow::Enter { ts, id, thread_id } => {
200 let ts = Timestamp::from_micros(ts);
201 let stack = self.thread_stacks.entry(thread_id).or_default();
202 if let Some(&parent) = stack.last() {
203 if let Some(parent_start) = self.self_time_started.remove(&(parent, thread_id))
204 {
205 stack.push(id);
206 self.process_internal_row(
207 store,
208 InternalRow {
209 id: Some(parent),
210 ty: InternalRowType::SelfTime {
211 start: parent_start,
212 end: ts,
213 },
214 },
215 );
216 } else {
217 stack.push(id);
218 }
219 } else {
220 stack.push(id);
221 }
222 self.self_time_started.insert((id, thread_id), ts);
223 }
224 TraceRow::Exit { ts, id, thread_id } => {
225 let ts = Timestamp::from_micros(ts);
226 let stack = self.thread_stacks.entry(thread_id).or_default();
227 if let Some(pos) = stack.iter().rev().position(|&x| x == id) {
228 let stack_index = stack.len() - pos - 1;
229 stack.remove(stack_index);
230 if stack_index > 0 {
231 let parent = stack[stack_index - 1];
232 self.self_time_started.insert((parent, thread_id), ts);
233 }
234 }
235 if let Some(start) = self.self_time_started.remove(&(id, thread_id)) {
236 self.process_internal_row(
237 store,
238 InternalRow {
239 id: Some(id),
240 ty: InternalRowType::SelfTime { start, end: ts },
241 },
242 );
243 }
244 }
245 TraceRow::Event { ts, parent, values } => {
246 let ts = Timestamp::from_micros(ts);
247 self.process_internal_row(
248 store,
249 InternalRow {
250 id: parent,
251 ty: InternalRowType::Event { ts, values },
252 },
253 );
254 }
255 TraceRow::Allocation {
256 ts: _,
257 thread_id,
258 allocations,
259 allocation_count,
260 deallocations,
261 deallocation_count,
262 } => {
263 let stack = self.thread_stacks.entry(thread_id).or_default();
264 if let Some(&id) = stack.last() {
265 if allocations > 0 {
266 self.process_internal_row(
267 store,
268 InternalRow {
269 id: Some(id),
270 ty: InternalRowType::Allocation {
271 allocations,
272 allocation_count,
273 },
274 },
275 );
276 }
277 if deallocations > 0 {
278 self.process_internal_row(
279 store,
280 InternalRow {
281 id: Some(id),
282 ty: InternalRowType::Deallocation {
283 deallocations,
284 deallocation_count,
285 },
286 },
287 );
288 }
289 }
290 }
291 TraceRow::MemorySample { ts, memory } => {
292 let ts = Timestamp::from_micros(ts);
293 store.add_memory_sample(ts, memory);
294 }
295 TraceRow::AllocationCounters {
296 ts: _,
297 thread_id,
298 allocations,
299 allocation_count,
300 deallocations,
301 deallocation_count,
302 } => {
303 let info = AllocationInfo {
304 allocations,
305 deallocations,
306 allocation_count,
307 deallocation_count,
308 };
309 let mut diff = AllocationInfo::default();
310 match self.thread_allocation_counters.entry(thread_id) {
311 Entry::Occupied(mut entry) => {
312 let counter = entry.get_mut();
313 diff.allocations = info.allocations - counter.allocations;
314 diff.deallocations = info.deallocations - counter.deallocations;
315 diff.allocation_count = info.allocation_count - counter.allocation_count;
316 diff.deallocation_count =
317 info.deallocation_count - counter.deallocation_count;
318 counter.allocations = info.allocations;
319 counter.deallocations = info.deallocations;
320 counter.allocation_count = info.allocation_count;
321 counter.deallocation_count = info.deallocation_count;
322 }
323 Entry::Vacant(entry) => {
324 entry.insert(info);
325 }
326 }
327 let stack = self.thread_stacks.entry(thread_id).or_default();
328 if let Some(&id) = stack.last() {
329 if diff.allocations > 0 {
330 self.process_internal_row(
331 store,
332 InternalRow {
333 id: Some(id),
334 ty: InternalRowType::Allocation {
335 allocations: diff.allocations,
336 allocation_count: diff.allocation_count,
337 },
338 },
339 );
340 }
341 if diff.deallocations > 0 {
342 self.process_internal_row(
343 store,
344 InternalRow {
345 id: Some(id),
346 ty: InternalRowType::Deallocation {
347 deallocations: diff.deallocations,
348 deallocation_count: diff.deallocation_count,
349 },
350 },
351 );
352 }
353 }
354 }
355 }
356 }
357
358 fn process_internal_row(&mut self, store: &mut StoreWriteGuard, row: InternalRow<'_>) {
359 let mut queue = Vec::new();
360 queue.push(row);
361 while !queue.is_empty() {
362 let q = take(&mut queue);
363 for row in q {
364 self.process_internal_row_queue(store, row, &mut queue);
365 }
366 }
367 }
368
369 fn process_internal_row_queue(
370 &mut self,
371 store: &mut StoreWriteGuard,
372 row: InternalRow<'_>,
373 queue: &mut Vec<InternalRow<'_>>,
374 ) {
375 let id = if let Some(id) = row.id {
376 if let Some(id) = self.id_mapping.get(&id) {
377 Some(*id)
378 } else {
379 self.queued_rows
380 .entry(id)
381 .or_default()
382 .rows
383 .push(row.into_static());
384 return;
385 }
386 } else {
387 None
388 };
389 match row.ty {
390 InternalRowType::Start {
391 ts,
392 new_id,
393 name,
394 target,
395 values,
396 } => {
397 let span_id = store.add_span(
398 id,
399 ts,
400 target.into_owned(),
401 name.into_owned(),
402 values
403 .iter()
404 .map(|(k, v)| (k.to_string(), v.to_string()))
405 .collect(),
406 &mut self.outdated_spans,
407 );
408 self.id_mapping.insert(new_id, span_id);
409 if let Some(QueuedRows { rows }) = self.queued_rows.remove(&new_id) {
410 for row in rows {
411 queue.push(row);
412 }
413 }
414 }
415 InternalRowType::Record { ref values } => {
416 store.add_args(
417 id.unwrap(),
418 values
419 .iter()
420 .map(|(k, v)| (k.to_string(), v.to_string()))
421 .collect(),
422 &mut self.outdated_spans,
423 );
424 }
425 InternalRowType::End { ts: _ } => {
426 store.complete_span(id.unwrap());
427 }
428 InternalRowType::SelfTime { start, end } => {
429 store.add_self_time(id.unwrap(), start, end, &mut self.outdated_spans);
430 }
431 InternalRowType::Event { ts, values } => {
432 let mut values = values.into_iter().collect::<FxIndexMap<_, _>>();
433 let duration = Timestamp::from_micros(
434 values
435 .swap_remove("duration")
436 .and_then(|v| v.as_u64())
437 .unwrap_or(0),
438 );
439 let name = values
440 .swap_remove("name")
441 .and_then(|v| v.as_str().map(|s| s.to_string()))
442 .unwrap_or("event".into());
443
444 let id = store.add_span(
445 id,
446 ts.saturating_sub(duration),
447 "event".into(),
448 name,
449 values
450 .iter()
451 .map(|(k, v)| (k.to_string(), v.to_string()))
452 .collect(),
453 &mut self.outdated_spans,
454 );
455 store.add_self_time(
456 id,
457 ts.saturating_sub(duration),
458 ts,
459 &mut self.outdated_spans,
460 );
461 store.complete_span(id);
462 }
463 InternalRowType::Allocation {
464 allocations,
465 allocation_count,
466 } => {
467 store.add_allocation(
468 id.unwrap(),
469 allocations,
470 allocation_count,
471 &mut self.outdated_spans,
472 );
473 }
474 InternalRowType::Deallocation {
475 deallocations,
476 deallocation_count,
477 } => {
478 store.add_deallocation(
479 id.unwrap(),
480 deallocations,
481 deallocation_count,
482 &mut self.outdated_spans,
483 );
484 }
485 }
486 }
487}
488
489impl TraceFormat for TurbopackFormat {
490 type Reused = Vec<TraceRow<'static>>;
491
492 fn read(&mut self, mut buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize> {
493 reuse.clear();
494 let mut reuse = ClearOnDrop(reuse);
495 let rows =
498 unsafe { transmute::<&mut Vec<TraceRow<'_>>, &mut Vec<TraceRow<'_>>>(&mut *reuse) };
499 let mut bytes_read = 0;
500 loop {
501 match postcard::take_from_bytes(buffer) {
502 Ok((row, remaining)) => {
503 bytes_read += buffer.len() - remaining.len();
504 buffer = remaining;
505 rows.push(row);
506 }
507 Err(err) => {
508 if matches!(err, postcard::Error::DeserializeUnexpectedEnd) {
509 break;
510 }
511 return Err(err.into());
512 }
513 }
514 }
515 if !rows.is_empty() {
516 let store = self.store.clone();
517 let mut iter = rows.drain(..);
518 {
519 let mut store = store.write();
520 for row in iter.by_ref() {
521 self.process(&mut store, row);
522 }
523 store.invalidate_outdated_spans(&self.outdated_spans);
524 self.outdated_spans.clear();
525 }
526 }
527 Ok(bytes_read)
528 }
529}
530
531struct ClearOnDrop<'l, T>(&'l mut Vec<T>);
532
533impl<T> Drop for ClearOnDrop<'_, T> {
534 fn drop(&mut self) {
535 self.0.clear();
536 }
537}
538
539impl<T> Deref for ClearOnDrop<'_, T> {
540 type Target = Vec<T>;
541
542 fn deref(&self) -> &Self::Target {
543 self.0
544 }
545}
546
547impl<T> DerefMut for ClearOnDrop<'_, T> {
548 fn deref_mut(&mut self) -> &mut Self::Target {
549 self.0
550 }
551}