Skip to main content

turbo_trace_server/reader/
turbopack.rs

1use std::{
2    borrow::Cow,
3    collections::hash_map::Entry,
4    mem::{take, transmute},
5    ops::{Deref, DerefMut},
6    sync::Arc,
7};
8
9use anyhow::Result;
10use rustc_hash::{FxHashMap, FxHashSet};
11use turbopack_trace_utils::tracing::{TraceRow, TraceValue};
12
13use super::TraceFormat;
14use crate::{
15    FxIndexMap,
16    span::SpanIndex,
17    store_container::{StoreContainer, StoreWriteGuard},
18    timestamp::Timestamp,
19};
20
21#[derive(Default)]
22struct AllocationInfo {
23    allocations: u64,
24    deallocations: u64,
25    allocation_count: u64,
26    deallocation_count: u64,
27}
28
29struct InternalRow<'a> {
30    id: Option<u64>,
31    ty: InternalRowType<'a>,
32}
33
34impl InternalRow<'_> {
35    fn into_static(self) -> InternalRow<'static> {
36        InternalRow {
37            id: self.id,
38            ty: self.ty.into_static(),
39        }
40    }
41}
42
43enum InternalRowType<'a> {
44    Start {
45        new_id: u64,
46        ts: Timestamp,
47        name: Cow<'a, str>,
48        target: Cow<'a, str>,
49        values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
50    },
51    End {
52        ts: Timestamp,
53    },
54    SelfTime {
55        start: Timestamp,
56        end: Timestamp,
57    },
58    Event {
59        ts: Timestamp,
60        values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
61    },
62    Record {
63        values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
64    },
65    Allocation {
66        allocations: u64,
67        allocation_count: u64,
68    },
69    Deallocation {
70        deallocations: u64,
71        deallocation_count: u64,
72    },
73}
74
75impl InternalRowType<'_> {
76    fn into_static(self) -> InternalRowType<'static> {
77        match self {
78            InternalRowType::Start {
79                ts,
80                new_id,
81                name,
82                target,
83                values,
84            } => InternalRowType::Start {
85                ts,
86                new_id,
87                name: name.into_owned().into(),
88                target: target.into_owned().into(),
89                values: values
90                    .into_iter()
91                    .map(|(k, v)| (k.into_owned().into(), v.into_static()))
92                    .collect(),
93            },
94            InternalRowType::End { ts } => InternalRowType::End { ts },
95            InternalRowType::SelfTime { start, end } => InternalRowType::SelfTime { start, end },
96            InternalRowType::Event { ts, values } => InternalRowType::Event {
97                ts,
98                values: values
99                    .into_iter()
100                    .map(|(k, v)| (k.into_owned().into(), v.into_static()))
101                    .collect(),
102            },
103            InternalRowType::Record { values } => InternalRowType::Record {
104                values: values
105                    .into_iter()
106                    .map(|(k, v)| (k.into_owned().into(), v.into_static()))
107                    .collect(),
108            },
109            InternalRowType::Allocation {
110                allocations,
111                allocation_count,
112            } => InternalRowType::Allocation {
113                allocations,
114                allocation_count,
115            },
116            InternalRowType::Deallocation {
117                deallocations,
118                deallocation_count,
119            } => InternalRowType::Deallocation {
120                deallocations,
121                deallocation_count,
122            },
123        }
124    }
125}
126
127#[derive(Default)]
128struct QueuedRows {
129    rows: Vec<InternalRow<'static>>,
130}
131
132pub struct TurbopackFormat {
133    store: Arc<StoreContainer>,
134    id_mapping: FxHashMap<u64, SpanIndex>,
135    queued_rows: FxHashMap<u64, QueuedRows>,
136    outdated_spans: FxHashSet<SpanIndex>,
137    thread_stacks: FxHashMap<u64, Vec<u64>>,
138    thread_allocation_counters: FxHashMap<u64, AllocationInfo>,
139    self_time_started: FxHashMap<(u64, u64), Timestamp>,
140}
141
142impl TurbopackFormat {
143    pub fn new(store: Arc<StoreContainer>) -> Self {
144        Self {
145            store,
146            id_mapping: FxHashMap::default(),
147            queued_rows: FxHashMap::default(),
148            outdated_spans: FxHashSet::default(),
149            thread_stacks: FxHashMap::default(),
150            thread_allocation_counters: FxHashMap::default(),
151            self_time_started: FxHashMap::default(),
152        }
153    }
154
155    fn process(&mut self, store: &mut StoreWriteGuard, row: TraceRow<'_>) {
156        match row {
157            TraceRow::Start {
158                ts,
159                id,
160                parent,
161                name,
162                target,
163                values,
164            } => {
165                let ts = Timestamp::from_micros(ts);
166                self.process_internal_row(
167                    store,
168                    InternalRow {
169                        id: parent,
170                        ty: InternalRowType::Start {
171                            ts,
172                            new_id: id,
173                            name,
174                            target,
175                            values,
176                        },
177                    },
178                );
179            }
180            TraceRow::Record { id, values } => {
181                self.process_internal_row(
182                    store,
183                    InternalRow {
184                        id: Some(id),
185                        ty: InternalRowType::Record { values },
186                    },
187                );
188            }
189            TraceRow::End { ts, id } => {
190                let ts = Timestamp::from_micros(ts);
191                self.process_internal_row(
192                    store,
193                    InternalRow {
194                        id: Some(id),
195                        ty: InternalRowType::End { ts },
196                    },
197                );
198            }
199            TraceRow::Enter { ts, id, thread_id } => {
200                let ts = Timestamp::from_micros(ts);
201                let stack = self.thread_stacks.entry(thread_id).or_default();
202                if let Some(&parent) = stack.last() {
203                    if let Some(parent_start) = self.self_time_started.remove(&(parent, thread_id))
204                    {
205                        stack.push(id);
206                        self.process_internal_row(
207                            store,
208                            InternalRow {
209                                id: Some(parent),
210                                ty: InternalRowType::SelfTime {
211                                    start: parent_start,
212                                    end: ts,
213                                },
214                            },
215                        );
216                    } else {
217                        stack.push(id);
218                    }
219                } else {
220                    stack.push(id);
221                }
222                self.self_time_started.insert((id, thread_id), ts);
223            }
224            TraceRow::Exit { ts, id, thread_id } => {
225                let ts = Timestamp::from_micros(ts);
226                let stack = self.thread_stacks.entry(thread_id).or_default();
227                if let Some(pos) = stack.iter().rev().position(|&x| x == id) {
228                    let stack_index = stack.len() - pos - 1;
229                    stack.remove(stack_index);
230                    if stack_index > 0 {
231                        let parent = stack[stack_index - 1];
232                        self.self_time_started.insert((parent, thread_id), ts);
233                    }
234                }
235                if let Some(start) = self.self_time_started.remove(&(id, thread_id)) {
236                    self.process_internal_row(
237                        store,
238                        InternalRow {
239                            id: Some(id),
240                            ty: InternalRowType::SelfTime { start, end: ts },
241                        },
242                    );
243                }
244            }
245            TraceRow::Event { ts, parent, values } => {
246                let ts = Timestamp::from_micros(ts);
247                self.process_internal_row(
248                    store,
249                    InternalRow {
250                        id: parent,
251                        ty: InternalRowType::Event { ts, values },
252                    },
253                );
254            }
255            TraceRow::Allocation {
256                ts: _,
257                thread_id,
258                allocations,
259                allocation_count,
260                deallocations,
261                deallocation_count,
262            } => {
263                let stack = self.thread_stacks.entry(thread_id).or_default();
264                if let Some(&id) = stack.last() {
265                    if allocations > 0 {
266                        self.process_internal_row(
267                            store,
268                            InternalRow {
269                                id: Some(id),
270                                ty: InternalRowType::Allocation {
271                                    allocations,
272                                    allocation_count,
273                                },
274                            },
275                        );
276                    }
277                    if deallocations > 0 {
278                        self.process_internal_row(
279                            store,
280                            InternalRow {
281                                id: Some(id),
282                                ty: InternalRowType::Deallocation {
283                                    deallocations,
284                                    deallocation_count,
285                                },
286                            },
287                        );
288                    }
289                }
290            }
291            TraceRow::MemorySample { ts, memory } => {
292                let ts = Timestamp::from_micros(ts);
293                store.add_memory_sample(ts, memory);
294            }
295            TraceRow::AllocationCounters {
296                ts: _,
297                thread_id,
298                allocations,
299                allocation_count,
300                deallocations,
301                deallocation_count,
302            } => {
303                let info = AllocationInfo {
304                    allocations,
305                    deallocations,
306                    allocation_count,
307                    deallocation_count,
308                };
309                let mut diff = AllocationInfo::default();
310                match self.thread_allocation_counters.entry(thread_id) {
311                    Entry::Occupied(mut entry) => {
312                        let counter = entry.get_mut();
313                        diff.allocations = info.allocations - counter.allocations;
314                        diff.deallocations = info.deallocations - counter.deallocations;
315                        diff.allocation_count = info.allocation_count - counter.allocation_count;
316                        diff.deallocation_count =
317                            info.deallocation_count - counter.deallocation_count;
318                        counter.allocations = info.allocations;
319                        counter.deallocations = info.deallocations;
320                        counter.allocation_count = info.allocation_count;
321                        counter.deallocation_count = info.deallocation_count;
322                    }
323                    Entry::Vacant(entry) => {
324                        entry.insert(info);
325                    }
326                }
327                let stack = self.thread_stacks.entry(thread_id).or_default();
328                if let Some(&id) = stack.last() {
329                    if diff.allocations > 0 {
330                        self.process_internal_row(
331                            store,
332                            InternalRow {
333                                id: Some(id),
334                                ty: InternalRowType::Allocation {
335                                    allocations: diff.allocations,
336                                    allocation_count: diff.allocation_count,
337                                },
338                            },
339                        );
340                    }
341                    if diff.deallocations > 0 {
342                        self.process_internal_row(
343                            store,
344                            InternalRow {
345                                id: Some(id),
346                                ty: InternalRowType::Deallocation {
347                                    deallocations: diff.deallocations,
348                                    deallocation_count: diff.deallocation_count,
349                                },
350                            },
351                        );
352                    }
353                }
354            }
355        }
356    }
357
358    fn process_internal_row(&mut self, store: &mut StoreWriteGuard, row: InternalRow<'_>) {
359        let mut queue = Vec::new();
360        queue.push(row);
361        while !queue.is_empty() {
362            let q = take(&mut queue);
363            for row in q {
364                self.process_internal_row_queue(store, row, &mut queue);
365            }
366        }
367    }
368
369    fn process_internal_row_queue(
370        &mut self,
371        store: &mut StoreWriteGuard,
372        row: InternalRow<'_>,
373        queue: &mut Vec<InternalRow<'_>>,
374    ) {
375        let id = if let Some(id) = row.id {
376            if let Some(id) = self.id_mapping.get(&id) {
377                Some(*id)
378            } else {
379                self.queued_rows
380                    .entry(id)
381                    .or_default()
382                    .rows
383                    .push(row.into_static());
384                return;
385            }
386        } else {
387            None
388        };
389        match row.ty {
390            InternalRowType::Start {
391                ts,
392                new_id,
393                name,
394                target,
395                values,
396            } => {
397                let span_id = store.add_span(
398                    id,
399                    ts,
400                    target.into_owned(),
401                    name.into_owned(),
402                    values
403                        .iter()
404                        .map(|(k, v)| (k.to_string(), v.to_string()))
405                        .collect(),
406                    &mut self.outdated_spans,
407                );
408                self.id_mapping.insert(new_id, span_id);
409                if let Some(QueuedRows { rows }) = self.queued_rows.remove(&new_id) {
410                    for row in rows {
411                        queue.push(row);
412                    }
413                }
414            }
415            InternalRowType::Record { ref values } => {
416                store.add_args(
417                    id.unwrap(),
418                    values
419                        .iter()
420                        .map(|(k, v)| (k.to_string(), v.to_string()))
421                        .collect(),
422                    &mut self.outdated_spans,
423                );
424            }
425            InternalRowType::End { ts: _ } => {
426                store.complete_span(id.unwrap());
427            }
428            InternalRowType::SelfTime { start, end } => {
429                store.add_self_time(id.unwrap(), start, end, &mut self.outdated_spans);
430            }
431            InternalRowType::Event { ts, values } => {
432                let mut values = values.into_iter().collect::<FxIndexMap<_, _>>();
433                let duration = Timestamp::from_micros(
434                    values
435                        .swap_remove("duration")
436                        .and_then(|v| v.as_u64())
437                        .unwrap_or(0),
438                );
439                let name = values
440                    .swap_remove("name")
441                    .and_then(|v| v.as_str().map(|s| s.to_string()))
442                    .unwrap_or("event".into());
443
444                let id = store.add_span(
445                    id,
446                    ts.saturating_sub(duration),
447                    "event".into(),
448                    name,
449                    values
450                        .iter()
451                        .map(|(k, v)| (k.to_string(), v.to_string()))
452                        .collect(),
453                    &mut self.outdated_spans,
454                );
455                store.add_self_time(
456                    id,
457                    ts.saturating_sub(duration),
458                    ts,
459                    &mut self.outdated_spans,
460                );
461                store.complete_span(id);
462            }
463            InternalRowType::Allocation {
464                allocations,
465                allocation_count,
466            } => {
467                store.add_allocation(
468                    id.unwrap(),
469                    allocations,
470                    allocation_count,
471                    &mut self.outdated_spans,
472                );
473            }
474            InternalRowType::Deallocation {
475                deallocations,
476                deallocation_count,
477            } => {
478                store.add_deallocation(
479                    id.unwrap(),
480                    deallocations,
481                    deallocation_count,
482                    &mut self.outdated_spans,
483                );
484            }
485        }
486    }
487}
488
489impl TraceFormat for TurbopackFormat {
490    type Reused = Vec<TraceRow<'static>>;
491
492    fn read(&mut self, mut buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize> {
493        reuse.clear();
494        let mut reuse = ClearOnDrop(reuse);
495        // Safety: The Vec is empty and is cleared on leaving this scope, so it's safe to cast the
496        // lifetime of data, since there is no data and data can't leave this function.
497        let rows =
498            unsafe { transmute::<&mut Vec<TraceRow<'_>>, &mut Vec<TraceRow<'_>>>(&mut *reuse) };
499        let mut bytes_read = 0;
500        loop {
501            match postcard::take_from_bytes(buffer) {
502                Ok((row, remaining)) => {
503                    bytes_read += buffer.len() - remaining.len();
504                    buffer = remaining;
505                    rows.push(row);
506                }
507                Err(err) => {
508                    if matches!(err, postcard::Error::DeserializeUnexpectedEnd) {
509                        break;
510                    }
511                    return Err(err.into());
512                }
513            }
514        }
515        if !rows.is_empty() {
516            let store = self.store.clone();
517            let mut iter = rows.drain(..);
518            {
519                let mut store = store.write();
520                for row in iter.by_ref() {
521                    self.process(&mut store, row);
522                }
523                store.invalidate_outdated_spans(&self.outdated_spans);
524                self.outdated_spans.clear();
525            }
526        }
527        Ok(bytes_read)
528    }
529}
530
531struct ClearOnDrop<'l, T>(&'l mut Vec<T>);
532
533impl<T> Drop for ClearOnDrop<'_, T> {
534    fn drop(&mut self) {
535        self.0.clear();
536    }
537}
538
539impl<T> Deref for ClearOnDrop<'_, T> {
540    type Target = Vec<T>;
541
542    fn deref(&self) -> &Self::Target {
543        self.0
544    }
545}
546
547impl<T> DerefMut for ClearOnDrop<'_, T> {
548    fn deref_mut(&mut self) -> &mut Self::Target {
549        self.0
550    }
551}