Skip to main content

turbopack_trace_utils/
raw_trace.rs

1use std::{
2    borrow::Cow,
3    cell::Cell,
4    fmt::Write,
5    marker::PhantomData,
6    sync::atomic::{AtomicU64, Ordering},
7    thread,
8    time::Instant,
9};
10
11use tracing::{
12    Subscriber,
13    field::{Visit, display},
14    span,
15};
16use tracing_subscriber::{Layer, registry::LookupSpan};
17use turbo_tasks_malloc::TurboMalloc;
18
19use crate::{
20    flavor::WriteGuardFlavor,
21    trace_writer::TraceWriter,
22    tracing::{TraceRow, TraceValue},
23};
24
25/// 10ms in microseconds
26const MEMORY_SAMPLE_INTERVAL_US: u64 = 10_000;
27
28static GLOBAL_LAST_MEMORY_SAMPLE: AtomicU64 = AtomicU64::new(0);
29
30thread_local! {
31    static THREAD_LOCAL_LAST_MEMORY_SAMPLE: Cell<u64> = const { Cell::new(0) };
32}
33
34pub struct RawTraceLayerOptions {}
35
36struct RawTraceLayerExtension {
37    id: u64,
38}
39
40fn get_id<S: Subscriber + for<'a> LookupSpan<'a>>(
41    ctx: tracing_subscriber::layer::Context<'_, S>,
42    id: &span::Id,
43) -> u64 {
44    ctx.span(id)
45        .unwrap()
46        .extensions()
47        .get::<RawTraceLayerExtension>()
48        .unwrap()
49        .id
50}
51
52/// A tracing layer that writes raw trace data to a writer. We store data using the [`TraceRow`],
53/// serialized with [`postcard`].
54pub struct RawTraceLayer<S: Subscriber + for<'a> LookupSpan<'a>> {
55    trace_writer: TraceWriter,
56    start: Instant,
57    next_id: AtomicU64,
58    _phantom: PhantomData<fn(S)>,
59}
60
61impl<S: Subscriber + for<'a> LookupSpan<'a>> RawTraceLayer<S> {
62    pub fn new(trace_writer: TraceWriter) -> Self {
63        Self {
64            trace_writer,
65            start: Instant::now(),
66            next_id: AtomicU64::new(1),
67            _phantom: PhantomData,
68        }
69    }
70
71    fn write(&self, data: TraceRow<'_>) {
72        let start = TurboMalloc::allocation_counters();
73        let guard = self.trace_writer.start_write();
74        postcard::serialize_with_flavor(&data, WriteGuardFlavor { guard }).unwrap();
75        TurboMalloc::reset_allocation_counters(start);
76    }
77
78    fn maybe_report_memory_sample(&self, ts: u64) {
79        // Fast thread-local check
80        let skip = THREAD_LOCAL_LAST_MEMORY_SAMPLE
81            .with(|tl| ts.saturating_sub(tl.get()) < MEMORY_SAMPLE_INTERVAL_US);
82        if skip {
83            return;
84        }
85
86        // Check global atomic
87        let global_last = GLOBAL_LAST_MEMORY_SAMPLE.load(Ordering::Relaxed);
88        if ts.saturating_sub(global_last) < MEMORY_SAMPLE_INTERVAL_US {
89            // Another thread sampled recently; update thread-local cache
90            THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(global_last));
91            return;
92        }
93
94        // Try to atomically claim the sample
95        match GLOBAL_LAST_MEMORY_SAMPLE.compare_exchange(
96            global_last,
97            ts,
98            Ordering::Relaxed,
99            Ordering::Relaxed,
100        ) {
101            Ok(_) => {
102                // We won the race — write the sample
103                THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(ts));
104                let memory = TurboMalloc::memory_usage() as u64;
105                self.write(TraceRow::MemorySample { ts, memory });
106            }
107            Err(actual) => {
108                // Lost the race; update thread-local with the winner's timestamp
109                THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(actual));
110            }
111        }
112    }
113
114    fn report_allocations(&self, ts: u64, thread_id: u64) {
115        let allocation_counters = turbo_tasks_malloc::TurboMalloc::allocation_counters();
116        self.write(TraceRow::AllocationCounters {
117            ts,
118            thread_id,
119            allocations: allocation_counters.allocations as u64,
120            deallocations: allocation_counters.deallocations as u64,
121            allocation_count: allocation_counters.allocation_count as u64,
122            deallocation_count: allocation_counters.deallocation_count as u64,
123        });
124    }
125}
126
127impl<S: Subscriber + for<'a> LookupSpan<'a>> Layer<S> for RawTraceLayer<S> {
128    fn on_new_span(
129        &self,
130        attrs: &span::Attributes<'_>,
131        id: &span::Id,
132        ctx: tracing_subscriber::layer::Context<'_, S>,
133    ) {
134        let ts = self.start.elapsed().as_micros() as u64;
135        let mut values = ValuesVisitor::new();
136        attrs.values().record(&mut values);
137        let external_id = self
138            .next_id
139            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
140        ctx.span(id)
141            .unwrap()
142            .extensions_mut()
143            .insert(RawTraceLayerExtension { id: external_id });
144        self.write(TraceRow::Start {
145            ts,
146            id: external_id,
147            parent: if attrs.is_contextual() {
148                ctx.current_span().id().map(|p| get_id(ctx, p))
149            } else {
150                attrs.parent().map(|p| get_id(ctx, p))
151            },
152            name: attrs.metadata().name().into(),
153            target: attrs.metadata().target().into(),
154            values: values.values,
155        });
156    }
157
158    fn on_close(&self, id: span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
159        let ts = self.start.elapsed().as_micros() as u64;
160        self.write(TraceRow::End {
161            ts,
162            id: get_id(ctx, &id),
163        });
164    }
165
166    fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
167        let ts = self.start.elapsed().as_micros() as u64;
168        let thread_id = thread::current().id().as_u64().into();
169        self.maybe_report_memory_sample(ts);
170        self.report_allocations(ts, thread_id);
171        self.write(TraceRow::Enter {
172            ts,
173            id: get_id(ctx, id),
174            thread_id,
175        });
176    }
177
178    fn on_exit(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
179        let ts = self.start.elapsed().as_micros() as u64;
180        let thread_id = thread::current().id().as_u64().into();
181        self.report_allocations(ts, thread_id);
182        self.write(TraceRow::Exit {
183            ts,
184            id: get_id(ctx, id),
185            thread_id,
186        });
187    }
188
189    fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
190        let ts = self.start.elapsed().as_micros() as u64;
191        let mut values = ValuesVisitor::new();
192        event.record(&mut values);
193        self.write(TraceRow::Event {
194            ts,
195            parent: if event.is_contextual() {
196                ctx.current_span().id().map(|p| get_id(ctx, p))
197            } else {
198                event.parent().map(|p| get_id(ctx, p))
199            },
200            values: values.values,
201        });
202    }
203
204    fn on_record(
205        &self,
206        id: &span::Id,
207        record: &span::Record<'_>,
208        ctx: tracing_subscriber::layer::Context<'_, S>,
209    ) {
210        let mut values = ValuesVisitor::new();
211        record.record(&mut values);
212        self.write(TraceRow::Record {
213            id: get_id(ctx, id),
214            values: values.values,
215        });
216    }
217}
218
219struct ValuesVisitor {
220    values: Vec<(Cow<'static, str>, TraceValue<'static>)>,
221}
222
223impl ValuesVisitor {
224    fn new() -> Self {
225        Self { values: Vec::new() }
226    }
227}
228
229impl Visit for ValuesVisitor {
230    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
231        let mut str = String::new();
232        let _ = write!(str, "{value:?}");
233        self.values
234            .push((field.name().into(), TraceValue::String(str.into())));
235    }
236
237    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
238        self.values
239            .push((field.name().into(), TraceValue::Float(value)));
240    }
241
242    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
243        self.values
244            .push((field.name().into(), TraceValue::Int(value)));
245    }
246
247    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
248        self.values
249            .push((field.name().into(), TraceValue::UInt(value)));
250    }
251
252    fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
253        self.record_debug(field, &value)
254    }
255
256    fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
257        self.record_debug(field, &value)
258    }
259
260    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
261        self.values
262            .push((field.name().into(), TraceValue::Bool(value)));
263    }
264
265    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
266        self.values.push((
267            field.name().into(),
268            TraceValue::String(value.to_string().into()),
269        ));
270    }
271
272    fn record_error(
273        &mut self,
274        field: &tracing::field::Field,
275        value: &(dyn std::error::Error + 'static),
276    ) {
277        self.record_debug(field, &display(value))
278    }
279}