1use std::{
2 borrow::Cow,
3 collections::hash_map::Entry,
4 mem::{take, transmute},
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use anyhow::Result;
10use rustc_hash::{FxHashMap, FxHashSet};
11use turbopack_trace_utils::tracing::{TraceRow, TraceValue};
12
13use super::TraceFormat;
14use crate::{
15 FxIndexMap,
16 span::SpanIndex,
17 store_container::{StoreContainer, StoreWriteGuard},
18 timestamp::Timestamp,
19};
20
21#[derive(Default)]
22struct AllocationInfo {
23 allocations: u64,
24 deallocations: u64,
25 allocation_count: u64,
26 deallocation_count: u64,
27}
28
29struct InternalRow<'a> {
30 id: Option<u64>,
31 ty: InternalRowType<'a>,
32}
33
34impl InternalRow<'_> {
35 fn into_static(self) -> InternalRow<'static> {
36 InternalRow {
37 id: self.id,
38 ty: self.ty.into_static(),
39 }
40 }
41}
42
43enum InternalRowType<'a> {
44 Start {
45 new_id: u64,
46 ts: Timestamp,
47 name: Cow<'a, str>,
48 target: Cow<'a, str>,
49 values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
50 },
51 End {
52 ts: Timestamp,
53 },
54 SelfTime {
55 start: Timestamp,
56 end: Timestamp,
57 },
58 Event {
59 ts: Timestamp,
60 values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
61 },
62 Record {
63 values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
64 },
65 Allocation {
66 allocations: u64,
67 allocation_count: u64,
68 },
69 Deallocation {
70 deallocations: u64,
71 deallocation_count: u64,
72 },
73}
74
75impl InternalRowType<'_> {
76 fn into_static(self) -> InternalRowType<'static> {
77 match self {
78 InternalRowType::Start {
79 ts,
80 new_id,
81 name,
82 target,
83 values,
84 } => InternalRowType::Start {
85 ts,
86 new_id,
87 name: name.into_owned().into(),
88 target: target.into_owned().into(),
89 values: values
90 .into_iter()
91 .map(|(k, v)| (k.into_owned().into(), v.into_static()))
92 .collect(),
93 },
94 InternalRowType::End { ts } => InternalRowType::End { ts },
95 InternalRowType::SelfTime { start, end } => InternalRowType::SelfTime { start, end },
96 InternalRowType::Event { ts, values } => InternalRowType::Event {
97 ts,
98 values: values
99 .into_iter()
100 .map(|(k, v)| (k.into_owned().into(), v.into_static()))
101 .collect(),
102 },
103 InternalRowType::Record { values } => InternalRowType::Record {
104 values: values
105 .into_iter()
106 .map(|(k, v)| (k.into_owned().into(), v.into_static()))
107 .collect(),
108 },
109 InternalRowType::Allocation {
110 allocations,
111 allocation_count,
112 } => InternalRowType::Allocation {
113 allocations,
114 allocation_count,
115 },
116 InternalRowType::Deallocation {
117 deallocations,
118 deallocation_count,
119 } => InternalRowType::Deallocation {
120 deallocations,
121 deallocation_count,
122 },
123 }
124 }
125}
126
127#[derive(Default)]
128struct QueuedRows {
129 rows: Vec<InternalRow<'static>>,
130}
131
132pub struct TurbopackFormat {
133 store: Arc<StoreContainer>,
134 id_mapping: FxHashMap<u64, SpanIndex>,
135 queued_rows: FxHashMap<u64, QueuedRows>,
136 outdated_spans: FxHashSet<SpanIndex>,
137 thread_stacks: FxHashMap<u64, Vec<u64>>,
138 thread_allocation_counters: FxHashMap<u64, AllocationInfo>,
139 self_time_started: FxHashMap<(u64, u64), Timestamp>,
140}
141
142impl TurbopackFormat {
143 pub fn new(store: Arc<StoreContainer>) -> Self {
144 Self {
145 store,
146 id_mapping: FxHashMap::default(),
147 queued_rows: FxHashMap::default(),
148 outdated_spans: FxHashSet::default(),
149 thread_stacks: FxHashMap::default(),
150 thread_allocation_counters: FxHashMap::default(),
151 self_time_started: FxHashMap::default(),
152 }
153 }
154
155 fn process(&mut self, store: &mut StoreWriteGuard, row: TraceRow<'_>) {
156 match row {
157 TraceRow::Start {
158 ts,
159 id,
160 parent,
161 name,
162 target,
163 values,
164 } => {
165 let ts = Timestamp::from_micros(ts);
166 self.process_internal_row(
167 store,
168 InternalRow {
169 id: parent,
170 ty: InternalRowType::Start {
171 ts,
172 new_id: id,
173 name,
174 target,
175 values,
176 },
177 },
178 );
179 }
180 TraceRow::Record { id, values } => {
181 self.process_internal_row(
182 store,
183 InternalRow {
184 id: Some(id),
185 ty: InternalRowType::Record { values },
186 },
187 );
188 }
189 TraceRow::End { ts, id } => {
190 let ts = Timestamp::from_micros(ts);
191 self.process_internal_row(
192 store,
193 InternalRow {
194 id: Some(id),
195 ty: InternalRowType::End { ts },
196 },
197 );
198 }
199 TraceRow::Enter { ts, id, thread_id } => {
200 let ts = Timestamp::from_micros(ts);
201 let stack = self.thread_stacks.entry(thread_id).or_default();
202 if let Some(&parent) = stack.last() {
203 if let Some(parent_start) = self.self_time_started.remove(&(parent, thread_id))
204 {
205 stack.push(id);
206 self.process_internal_row(
207 store,
208 InternalRow {
209 id: Some(parent),
210 ty: InternalRowType::SelfTime {
211 start: parent_start,
212 end: ts,
213 },
214 },
215 );
216 } else {
217 stack.push(id);
218 }
219 } else {
220 stack.push(id);
221 }
222 self.self_time_started.insert((id, thread_id), ts);
223 }
224 TraceRow::Exit { ts, id, thread_id } => {
225 let ts = Timestamp::from_micros(ts);
226 let stack = self.thread_stacks.entry(thread_id).or_default();
227 if let Some(pos) = stack.iter().rev().position(|&x| x == id) {
228 let stack_index = stack.len() - pos - 1;
229 stack.remove(stack_index);
230 if stack_index > 0 {
231 let parent = stack[stack_index - 1];
232 self.self_time_started.insert((parent, thread_id), ts);
233 }
234 }
235 if let Some(start) = self.self_time_started.remove(&(id, thread_id)) {
236 self.process_internal_row(
237 store,
238 InternalRow {
239 id: Some(id),
240 ty: InternalRowType::SelfTime { start, end: ts },
241 },
242 );
243 }
244 }
245 TraceRow::Event { ts, parent, values } => {
246 let ts = Timestamp::from_micros(ts);
247 self.process_internal_row(
248 store,
249 InternalRow {
250 id: parent,
251 ty: InternalRowType::Event { ts, values },
252 },
253 );
254 }
255 TraceRow::Allocation {
256 ts: _,
257 thread_id,
258 allocations,
259 allocation_count,
260 deallocations,
261 deallocation_count,
262 } => {
263 let stack = self.thread_stacks.entry(thread_id).or_default();
264 if let Some(&id) = stack.last() {
265 if allocations > 0 {
266 self.process_internal_row(
267 store,
268 InternalRow {
269 id: Some(id),
270 ty: InternalRowType::Allocation {
271 allocations,
272 allocation_count,
273 },
274 },
275 );
276 }
277 if deallocations > 0 {
278 self.process_internal_row(
279 store,
280 InternalRow {
281 id: Some(id),
282 ty: InternalRowType::Deallocation {
283 deallocations,
284 deallocation_count,
285 },
286 },
287 );
288 }
289 }
290 }
291 TraceRow::AllocationCounters {
292 ts: _,
293 thread_id,
294 allocations,
295 allocation_count,
296 deallocations,
297 deallocation_count,
298 } => {
299 let info = AllocationInfo {
300 allocations,
301 deallocations,
302 allocation_count,
303 deallocation_count,
304 };
305 let mut diff = AllocationInfo::default();
306 match self.thread_allocation_counters.entry(thread_id) {
307 Entry::Occupied(mut entry) => {
308 let counter = entry.get_mut();
309 diff.allocations = info.allocations - counter.allocations;
310 diff.deallocations = info.deallocations - counter.deallocations;
311 diff.allocation_count = info.allocation_count - counter.allocation_count;
312 diff.deallocation_count =
313 info.deallocation_count - counter.deallocation_count;
314 counter.allocations = info.allocations;
315 counter.deallocations = info.deallocations;
316 counter.allocation_count = info.allocation_count;
317 counter.deallocation_count = info.deallocation_count;
318 }
319 Entry::Vacant(entry) => {
320 entry.insert(info);
321 }
322 }
323 let stack = self.thread_stacks.entry(thread_id).or_default();
324 if let Some(&id) = stack.last() {
325 if diff.allocations > 0 {
326 self.process_internal_row(
327 store,
328 InternalRow {
329 id: Some(id),
330 ty: InternalRowType::Allocation {
331 allocations: diff.allocations,
332 allocation_count: diff.allocation_count,
333 },
334 },
335 );
336 }
337 if diff.deallocations > 0 {
338 self.process_internal_row(
339 store,
340 InternalRow {
341 id: Some(id),
342 ty: InternalRowType::Deallocation {
343 deallocations: diff.deallocations,
344 deallocation_count: diff.deallocation_count,
345 },
346 },
347 );
348 }
349 }
350 }
351 }
352 }
353
354 fn process_internal_row(&mut self, store: &mut StoreWriteGuard, row: InternalRow<'_>) {
355 let mut queue = Vec::new();
356 queue.push(row);
357 while !queue.is_empty() {
358 let q = take(&mut queue);
359 for row in q {
360 self.process_internal_row_queue(store, row, &mut queue);
361 }
362 }
363 }
364
365 fn process_internal_row_queue(
366 &mut self,
367 store: &mut StoreWriteGuard,
368 row: InternalRow<'_>,
369 queue: &mut Vec<InternalRow<'_>>,
370 ) {
371 let id = if let Some(id) = row.id {
372 if let Some(id) = self.id_mapping.get(&id) {
373 Some(*id)
374 } else {
375 self.queued_rows
376 .entry(id)
377 .or_default()
378 .rows
379 .push(row.into_static());
380 return;
381 }
382 } else {
383 None
384 };
385 match row.ty {
386 InternalRowType::Start {
387 ts,
388 new_id,
389 name,
390 target,
391 values,
392 } => {
393 let span_id = store.add_span(
394 id,
395 ts,
396 target.into_owned(),
397 name.into_owned(),
398 values
399 .iter()
400 .map(|(k, v)| (k.to_string(), v.to_string()))
401 .collect(),
402 &mut self.outdated_spans,
403 );
404 self.id_mapping.insert(new_id, span_id);
405 if let Some(QueuedRows { rows }) = self.queued_rows.remove(&new_id) {
406 for row in rows {
407 queue.push(row);
408 }
409 }
410 }
411 InternalRowType::Record { ref values } => {
412 store.add_args(
413 id.unwrap(),
414 values
415 .iter()
416 .map(|(k, v)| (k.to_string(), v.to_string()))
417 .collect(),
418 &mut self.outdated_spans,
419 );
420 }
421 InternalRowType::End { ts: _ } => {
422 store.complete_span(id.unwrap());
423 }
424 InternalRowType::SelfTime { start, end } => {
425 store.add_self_time(id.unwrap(), start, end, &mut self.outdated_spans);
426 }
427 InternalRowType::Event { ts, values } => {
428 let mut values = values.into_iter().collect::<FxIndexMap<_, _>>();
429 let duration = Timestamp::from_micros(
430 values
431 .swap_remove("duration")
432 .and_then(|v| v.as_u64())
433 .unwrap_or(0),
434 );
435 let name = values
436 .swap_remove("name")
437 .and_then(|v| v.as_str().map(|s| s.to_string()))
438 .unwrap_or("event".into());
439
440 let id = store.add_span(
441 id,
442 ts.saturating_sub(duration),
443 "event".into(),
444 name,
445 values
446 .iter()
447 .map(|(k, v)| (k.to_string(), v.to_string()))
448 .collect(),
449 &mut self.outdated_spans,
450 );
451 store.add_self_time(
452 id,
453 ts.saturating_sub(duration),
454 ts,
455 &mut self.outdated_spans,
456 );
457 store.complete_span(id);
458 }
459 InternalRowType::Allocation {
460 allocations,
461 allocation_count,
462 } => {
463 store.add_allocation(
464 id.unwrap(),
465 allocations,
466 allocation_count,
467 &mut self.outdated_spans,
468 );
469 }
470 InternalRowType::Deallocation {
471 deallocations,
472 deallocation_count,
473 } => {
474 store.add_deallocation(
475 id.unwrap(),
476 deallocations,
477 deallocation_count,
478 &mut self.outdated_spans,
479 );
480 }
481 }
482 }
483}
484
485impl TraceFormat for TurbopackFormat {
486 type Reused = Vec<TraceRow<'static>>;
487
488 fn read(&mut self, mut buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize> {
489 reuse.clear();
490 let mut reuse = ClearOnDrop(reuse);
491 let rows =
494 unsafe { transmute::<&mut Vec<TraceRow<'_>>, &mut Vec<TraceRow<'_>>>(&mut *reuse) };
495 let mut bytes_read = 0;
496 loop {
497 match postcard::take_from_bytes(buffer) {
498 Ok((row, remaining)) => {
499 bytes_read += buffer.len() - remaining.len();
500 buffer = remaining;
501 rows.push(row);
502 }
503 Err(err) => {
504 if matches!(err, postcard::Error::DeserializeUnexpectedEnd) {
505 break;
506 }
507 return Err(err.into());
508 }
509 }
510 }
511 if !rows.is_empty() {
512 let store = self.store.clone();
513 let mut iter = rows.drain(..);
514 {
515 let mut store = store.write();
516 for row in iter.by_ref() {
517 self.process(&mut store, row);
518 }
519 store.invalidate_outdated_spans(&self.outdated_spans);
520 self.outdated_spans.clear();
521 }
522 }
523 Ok(bytes_read)
524 }
525}
526
527struct ClearOnDrop<'l, T>(&'l mut Vec<T>);
528
529impl<T> Drop for ClearOnDrop<'_, T> {
530 fn drop(&mut self) {
531 self.0.clear();
532 }
533}
534
535impl<T> Deref for ClearOnDrop<'_, T> {
536 type Target = Vec<T>;
537
538 fn deref(&self) -> &Self::Target {
539 self.0
540 }
541}
542
543impl<T> DerefMut for ClearOnDrop<'_, T> {
544 fn deref_mut(&mut self) -> &mut Self::Target {
545 self.0
546 }
547}