turbo_trace_server/reader/
turbopack.rs

1use std::{
2    borrow::Cow,
3    collections::hash_map::Entry,
4    mem::{take, transmute},
5    ops::{Deref, DerefMut},
6    sync::Arc,
7};
8
9use anyhow::Result;
10use rustc_hash::{FxHashMap, FxHashSet};
11use turbopack_trace_utils::tracing::{TraceRow, TraceValue};
12
13use super::TraceFormat;
14use crate::{
15    FxIndexMap,
16    span::SpanIndex,
17    store_container::{StoreContainer, StoreWriteGuard},
18    timestamp::Timestamp,
19};
20
21#[derive(Default)]
22struct AllocationInfo {
23    allocations: u64,
24    deallocations: u64,
25    allocation_count: u64,
26    deallocation_count: u64,
27}
28
29struct InternalRow<'a> {
30    id: Option<u64>,
31    ty: InternalRowType<'a>,
32}
33
34impl InternalRow<'_> {
35    fn into_static(self) -> InternalRow<'static> {
36        InternalRow {
37            id: self.id,
38            ty: self.ty.into_static(),
39        }
40    }
41}
42
43enum InternalRowType<'a> {
44    Start {
45        new_id: u64,
46        ts: Timestamp,
47        name: Cow<'a, str>,
48        target: Cow<'a, str>,
49        values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
50    },
51    End {
52        ts: Timestamp,
53    },
54    SelfTime {
55        start: Timestamp,
56        end: Timestamp,
57    },
58    Event {
59        ts: Timestamp,
60        values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
61    },
62    Record {
63        values: Vec<(Cow<'a, str>, TraceValue<'a>)>,
64    },
65    Allocation {
66        allocations: u64,
67        allocation_count: u64,
68    },
69    Deallocation {
70        deallocations: u64,
71        deallocation_count: u64,
72    },
73}
74
75impl InternalRowType<'_> {
76    fn into_static(self) -> InternalRowType<'static> {
77        match self {
78            InternalRowType::Start {
79                ts,
80                new_id,
81                name,
82                target,
83                values,
84            } => InternalRowType::Start {
85                ts,
86                new_id,
87                name: name.into_owned().into(),
88                target: target.into_owned().into(),
89                values: values
90                    .into_iter()
91                    .map(|(k, v)| (k.into_owned().into(), v.into_static()))
92                    .collect(),
93            },
94            InternalRowType::End { ts } => InternalRowType::End { ts },
95            InternalRowType::SelfTime { start, end } => InternalRowType::SelfTime { start, end },
96            InternalRowType::Event { ts, values } => InternalRowType::Event {
97                ts,
98                values: values
99                    .into_iter()
100                    .map(|(k, v)| (k.into_owned().into(), v.into_static()))
101                    .collect(),
102            },
103            InternalRowType::Record { values } => InternalRowType::Record {
104                values: values
105                    .into_iter()
106                    .map(|(k, v)| (k.into_owned().into(), v.into_static()))
107                    .collect(),
108            },
109            InternalRowType::Allocation {
110                allocations,
111                allocation_count,
112            } => InternalRowType::Allocation {
113                allocations,
114                allocation_count,
115            },
116            InternalRowType::Deallocation {
117                deallocations,
118                deallocation_count,
119            } => InternalRowType::Deallocation {
120                deallocations,
121                deallocation_count,
122            },
123        }
124    }
125}
126
127#[derive(Default)]
128struct QueuedRows {
129    rows: Vec<InternalRow<'static>>,
130}
131
132pub struct TurbopackFormat {
133    store: Arc<StoreContainer>,
134    id_mapping: FxHashMap<u64, SpanIndex>,
135    queued_rows: FxHashMap<u64, QueuedRows>,
136    outdated_spans: FxHashSet<SpanIndex>,
137    thread_stacks: FxHashMap<u64, Vec<u64>>,
138    thread_allocation_counters: FxHashMap<u64, AllocationInfo>,
139    self_time_started: FxHashMap<(u64, u64), Timestamp>,
140}
141
142impl TurbopackFormat {
143    pub fn new(store: Arc<StoreContainer>) -> Self {
144        Self {
145            store,
146            id_mapping: FxHashMap::default(),
147            queued_rows: FxHashMap::default(),
148            outdated_spans: FxHashSet::default(),
149            thread_stacks: FxHashMap::default(),
150            thread_allocation_counters: FxHashMap::default(),
151            self_time_started: FxHashMap::default(),
152        }
153    }
154
155    fn process(&mut self, store: &mut StoreWriteGuard, row: TraceRow<'_>) {
156        match row {
157            TraceRow::Start {
158                ts,
159                id,
160                parent,
161                name,
162                target,
163                values,
164            } => {
165                let ts = Timestamp::from_micros(ts);
166                self.process_internal_row(
167                    store,
168                    InternalRow {
169                        id: parent,
170                        ty: InternalRowType::Start {
171                            ts,
172                            new_id: id,
173                            name,
174                            target,
175                            values,
176                        },
177                    },
178                );
179            }
180            TraceRow::Record { id, values } => {
181                self.process_internal_row(
182                    store,
183                    InternalRow {
184                        id: Some(id),
185                        ty: InternalRowType::Record { values },
186                    },
187                );
188            }
189            TraceRow::End { ts, id } => {
190                let ts = Timestamp::from_micros(ts);
191                self.process_internal_row(
192                    store,
193                    InternalRow {
194                        id: Some(id),
195                        ty: InternalRowType::End { ts },
196                    },
197                );
198            }
199            TraceRow::Enter { ts, id, thread_id } => {
200                let ts = Timestamp::from_micros(ts);
201                let stack = self.thread_stacks.entry(thread_id).or_default();
202                if let Some(&parent) = stack.last() {
203                    if let Some(parent_start) = self.self_time_started.remove(&(parent, thread_id))
204                    {
205                        stack.push(id);
206                        self.process_internal_row(
207                            store,
208                            InternalRow {
209                                id: Some(parent),
210                                ty: InternalRowType::SelfTime {
211                                    start: parent_start,
212                                    end: ts,
213                                },
214                            },
215                        );
216                    } else {
217                        stack.push(id);
218                    }
219                } else {
220                    stack.push(id);
221                }
222                self.self_time_started.insert((id, thread_id), ts);
223            }
224            TraceRow::Exit { ts, id, thread_id } => {
225                let ts = Timestamp::from_micros(ts);
226                let stack = self.thread_stacks.entry(thread_id).or_default();
227                if let Some(pos) = stack.iter().rev().position(|&x| x == id) {
228                    let stack_index = stack.len() - pos - 1;
229                    stack.remove(stack_index);
230                    if stack_index > 0 {
231                        let parent = stack[stack_index - 1];
232                        self.self_time_started.insert((parent, thread_id), ts);
233                    }
234                }
235                if let Some(start) = self.self_time_started.remove(&(id, thread_id)) {
236                    self.process_internal_row(
237                        store,
238                        InternalRow {
239                            id: Some(id),
240                            ty: InternalRowType::SelfTime { start, end: ts },
241                        },
242                    );
243                }
244            }
245            TraceRow::Event { ts, parent, values } => {
246                let ts = Timestamp::from_micros(ts);
247                self.process_internal_row(
248                    store,
249                    InternalRow {
250                        id: parent,
251                        ty: InternalRowType::Event { ts, values },
252                    },
253                );
254            }
255            TraceRow::Allocation {
256                ts: _,
257                thread_id,
258                allocations,
259                allocation_count,
260                deallocations,
261                deallocation_count,
262            } => {
263                let stack = self.thread_stacks.entry(thread_id).or_default();
264                if let Some(&id) = stack.last() {
265                    if allocations > 0 {
266                        self.process_internal_row(
267                            store,
268                            InternalRow {
269                                id: Some(id),
270                                ty: InternalRowType::Allocation {
271                                    allocations,
272                                    allocation_count,
273                                },
274                            },
275                        );
276                    }
277                    if deallocations > 0 {
278                        self.process_internal_row(
279                            store,
280                            InternalRow {
281                                id: Some(id),
282                                ty: InternalRowType::Deallocation {
283                                    deallocations,
284                                    deallocation_count,
285                                },
286                            },
287                        );
288                    }
289                }
290            }
291            TraceRow::AllocationCounters {
292                ts: _,
293                thread_id,
294                allocations,
295                allocation_count,
296                deallocations,
297                deallocation_count,
298            } => {
299                let info = AllocationInfo {
300                    allocations,
301                    deallocations,
302                    allocation_count,
303                    deallocation_count,
304                };
305                let mut diff = AllocationInfo::default();
306                match self.thread_allocation_counters.entry(thread_id) {
307                    Entry::Occupied(mut entry) => {
308                        let counter = entry.get_mut();
309                        diff.allocations = info.allocations - counter.allocations;
310                        diff.deallocations = info.deallocations - counter.deallocations;
311                        diff.allocation_count = info.allocation_count - counter.allocation_count;
312                        diff.deallocation_count =
313                            info.deallocation_count - counter.deallocation_count;
314                        counter.allocations = info.allocations;
315                        counter.deallocations = info.deallocations;
316                        counter.allocation_count = info.allocation_count;
317                        counter.deallocation_count = info.deallocation_count;
318                    }
319                    Entry::Vacant(entry) => {
320                        entry.insert(info);
321                    }
322                }
323                let stack = self.thread_stacks.entry(thread_id).or_default();
324                if let Some(&id) = stack.last() {
325                    if diff.allocations > 0 {
326                        self.process_internal_row(
327                            store,
328                            InternalRow {
329                                id: Some(id),
330                                ty: InternalRowType::Allocation {
331                                    allocations: diff.allocations,
332                                    allocation_count: diff.allocation_count,
333                                },
334                            },
335                        );
336                    }
337                    if diff.deallocations > 0 {
338                        self.process_internal_row(
339                            store,
340                            InternalRow {
341                                id: Some(id),
342                                ty: InternalRowType::Deallocation {
343                                    deallocations: diff.deallocations,
344                                    deallocation_count: diff.deallocation_count,
345                                },
346                            },
347                        );
348                    }
349                }
350            }
351        }
352    }
353
354    fn process_internal_row(&mut self, store: &mut StoreWriteGuard, row: InternalRow<'_>) {
355        let mut queue = Vec::new();
356        queue.push(row);
357        while !queue.is_empty() {
358            let q = take(&mut queue);
359            for row in q {
360                self.process_internal_row_queue(store, row, &mut queue);
361            }
362        }
363    }
364
365    fn process_internal_row_queue(
366        &mut self,
367        store: &mut StoreWriteGuard,
368        row: InternalRow<'_>,
369        queue: &mut Vec<InternalRow<'_>>,
370    ) {
371        let id = if let Some(id) = row.id {
372            if let Some(id) = self.id_mapping.get(&id) {
373                Some(*id)
374            } else {
375                self.queued_rows
376                    .entry(id)
377                    .or_default()
378                    .rows
379                    .push(row.into_static());
380                return;
381            }
382        } else {
383            None
384        };
385        match row.ty {
386            InternalRowType::Start {
387                ts,
388                new_id,
389                name,
390                target,
391                values,
392            } => {
393                let span_id = store.add_span(
394                    id,
395                    ts,
396                    target.into_owned(),
397                    name.into_owned(),
398                    values
399                        .iter()
400                        .map(|(k, v)| (k.to_string(), v.to_string()))
401                        .collect(),
402                    &mut self.outdated_spans,
403                );
404                self.id_mapping.insert(new_id, span_id);
405                if let Some(QueuedRows { rows }) = self.queued_rows.remove(&new_id) {
406                    for row in rows {
407                        queue.push(row);
408                    }
409                }
410            }
411            InternalRowType::Record { ref values } => {
412                store.add_args(
413                    id.unwrap(),
414                    values
415                        .iter()
416                        .map(|(k, v)| (k.to_string(), v.to_string()))
417                        .collect(),
418                    &mut self.outdated_spans,
419                );
420            }
421            InternalRowType::End { ts: _ } => {
422                store.complete_span(id.unwrap());
423            }
424            InternalRowType::SelfTime { start, end } => {
425                store.add_self_time(id.unwrap(), start, end, &mut self.outdated_spans);
426            }
427            InternalRowType::Event { ts, values } => {
428                let mut values = values.into_iter().collect::<FxIndexMap<_, _>>();
429                let duration = Timestamp::from_micros(
430                    values
431                        .swap_remove("duration")
432                        .and_then(|v| v.as_u64())
433                        .unwrap_or(0),
434                );
435                let name = values
436                    .swap_remove("name")
437                    .and_then(|v| v.as_str().map(|s| s.to_string()))
438                    .unwrap_or("event".into());
439
440                let id = store.add_span(
441                    id,
442                    ts.saturating_sub(duration),
443                    "event".into(),
444                    name,
445                    values
446                        .iter()
447                        .map(|(k, v)| (k.to_string(), v.to_string()))
448                        .collect(),
449                    &mut self.outdated_spans,
450                );
451                store.add_self_time(
452                    id,
453                    ts.saturating_sub(duration),
454                    ts,
455                    &mut self.outdated_spans,
456                );
457                store.complete_span(id);
458            }
459            InternalRowType::Allocation {
460                allocations,
461                allocation_count,
462            } => {
463                store.add_allocation(
464                    id.unwrap(),
465                    allocations,
466                    allocation_count,
467                    &mut self.outdated_spans,
468                );
469            }
470            InternalRowType::Deallocation {
471                deallocations,
472                deallocation_count,
473            } => {
474                store.add_deallocation(
475                    id.unwrap(),
476                    deallocations,
477                    deallocation_count,
478                    &mut self.outdated_spans,
479                );
480            }
481        }
482    }
483}
484
485impl TraceFormat for TurbopackFormat {
486    type Reused = Vec<TraceRow<'static>>;
487
488    fn read(&mut self, mut buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize> {
489        reuse.clear();
490        let mut reuse = ClearOnDrop(reuse);
491        // Safety: The Vec is empty and is cleared on leaving this scope, so it's safe to cast the
492        // lifetime of data, since there is no data and data can't leave this function.
493        let rows =
494            unsafe { transmute::<&mut Vec<TraceRow<'_>>, &mut Vec<TraceRow<'_>>>(&mut *reuse) };
495        let mut bytes_read = 0;
496        loop {
497            match postcard::take_from_bytes(buffer) {
498                Ok((row, remaining)) => {
499                    bytes_read += buffer.len() - remaining.len();
500                    buffer = remaining;
501                    rows.push(row);
502                }
503                Err(err) => {
504                    if matches!(err, postcard::Error::DeserializeUnexpectedEnd) {
505                        break;
506                    }
507                    return Err(err.into());
508                }
509            }
510        }
511        if !rows.is_empty() {
512            let store = self.store.clone();
513            let mut iter = rows.drain(..);
514            {
515                let mut store = store.write();
516                for row in iter.by_ref() {
517                    self.process(&mut store, row);
518                }
519                store.invalidate_outdated_spans(&self.outdated_spans);
520                self.outdated_spans.clear();
521            }
522        }
523        Ok(bytes_read)
524    }
525}
526
527struct ClearOnDrop<'l, T>(&'l mut Vec<T>);
528
529impl<T> Drop for ClearOnDrop<'_, T> {
530    fn drop(&mut self) {
531        self.0.clear();
532    }
533}
534
535impl<T> Deref for ClearOnDrop<'_, T> {
536    type Target = Vec<T>;
537
538    fn deref(&self) -> &Self::Target {
539        self.0
540    }
541}
542
543impl<T> DerefMut for ClearOnDrop<'_, T> {
544    fn deref_mut(&mut self) -> &mut Self::Target {
545        self.0
546    }
547}