Skip to main content

turbopack_trace_utils/
raw_trace.rs

1use std::{
2    borrow::Cow,
3    cell::Cell,
4    fmt::Write,
5    marker::PhantomData,
6    sync::atomic::{AtomicU64, Ordering},
7    thread,
8    time::Instant,
9};
10
11use tracing::{
12    Subscriber,
13    field::{Visit, display},
14    span,
15};
16use tracing_subscriber::{Layer, registry::LookupSpan};
17use turbo_tasks_malloc::TurboMalloc;
18
19use crate::{
20    flavor::WriteGuardFlavor,
21    trace_writer::TraceWriter,
22    tracing::{TraceRow, TraceValue},
23};
24
25/// 10ms in microseconds
26const MEMORY_SAMPLE_INTERVAL_US: u64 = 10_000;
27
28static GLOBAL_LAST_MEMORY_SAMPLE: AtomicU64 = AtomicU64::new(0);
29
30thread_local! {
31    static THREAD_LOCAL_LAST_MEMORY_SAMPLE: Cell<u64> = const { Cell::new(0) };
32}
33
34pub struct RawTraceLayerOptions {}
35
36struct RawTraceLayerExtension {
37    id: u64,
38}
39
40fn get_id<S: Subscriber + for<'a> LookupSpan<'a>>(
41    ctx: tracing_subscriber::layer::Context<'_, S>,
42    id: &span::Id,
43) -> u64 {
44    ctx.span(id)
45        .unwrap()
46        .extensions()
47        .get::<RawTraceLayerExtension>()
48        .unwrap()
49        .id
50}
51
52/// A tracing layer that writes raw trace data to a writer. We store data using the [`TraceRow`],
53/// serialized with [`postcard`].
54pub struct RawTraceLayer<S: Subscriber + for<'a> LookupSpan<'a>> {
55    trace_writer: TraceWriter,
56    start: Instant,
57    next_id: AtomicU64,
58    _phantom: PhantomData<fn(S)>,
59}
60
61impl<S: Subscriber + for<'a> LookupSpan<'a>> RawTraceLayer<S> {
62    pub fn new(trace_writer: TraceWriter) -> Self {
63        Self {
64            trace_writer,
65            start: Instant::now(),
66            next_id: AtomicU64::new(1),
67            _phantom: PhantomData,
68        }
69    }
70
71    fn write(&self, data: TraceRow<'_>) {
72        let start = TurboMalloc::allocation_counters();
73        let guard = self.trace_writer.start_write();
74        postcard::serialize_with_flavor(&data, WriteGuardFlavor { guard }).unwrap();
75        TurboMalloc::reset_allocation_counters(start);
76    }
77
78    fn maybe_report_memory_sample(&self, ts: u64) {
79        // Fast thread-local check
80        let skip = THREAD_LOCAL_LAST_MEMORY_SAMPLE
81            .with(|tl| ts.saturating_sub(tl.get()) < MEMORY_SAMPLE_INTERVAL_US);
82        if skip {
83            return;
84        }
85
86        // Check global atomic
87        let global_last = GLOBAL_LAST_MEMORY_SAMPLE.load(Ordering::Relaxed);
88        if ts.saturating_sub(global_last) < MEMORY_SAMPLE_INTERVAL_US {
89            // Another thread sampled recently; update thread-local cache
90            THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(global_last));
91            return;
92        }
93
94        // Try to atomically claim the sample
95        match GLOBAL_LAST_MEMORY_SAMPLE.compare_exchange(
96            global_last,
97            ts,
98            Ordering::Relaxed,
99            Ordering::Relaxed,
100        ) {
101            Ok(_) => {
102                // We won the race — write the sample
103                THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(ts));
104                let memory = TurboMalloc::memory_usage() as u64;
105                let memory_pressure = TurboMalloc::memory_pressure().unwrap_or(0);
106                self.write(TraceRow::MemorySample {
107                    ts,
108                    memory,
109                    memory_pressure,
110                });
111            }
112            Err(actual) => {
113                // Lost the race; update thread-local with the winner's timestamp
114                THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(actual));
115            }
116        }
117    }
118
119    fn report_allocations(&self, ts: u64, thread_id: u64) {
120        let allocation_counters = turbo_tasks_malloc::TurboMalloc::allocation_counters();
121        self.write(TraceRow::AllocationCounters {
122            ts,
123            thread_id,
124            allocations: allocation_counters.allocations as u64,
125            deallocations: allocation_counters.deallocations as u64,
126            allocation_count: allocation_counters.allocation_count as u64,
127            deallocation_count: allocation_counters.deallocation_count as u64,
128        });
129    }
130}
131
132impl<S: Subscriber + for<'a> LookupSpan<'a>> Layer<S> for RawTraceLayer<S> {
133    fn on_new_span(
134        &self,
135        attrs: &span::Attributes<'_>,
136        id: &span::Id,
137        ctx: tracing_subscriber::layer::Context<'_, S>,
138    ) {
139        let ts = self.start.elapsed().as_micros() as u64;
140        let mut values = ValuesVisitor::new();
141        attrs.values().record(&mut values);
142        let external_id = self
143            .next_id
144            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
145        ctx.span(id)
146            .unwrap()
147            .extensions_mut()
148            .insert(RawTraceLayerExtension { id: external_id });
149        self.write(TraceRow::Start {
150            ts,
151            id: external_id,
152            parent: if attrs.is_contextual() {
153                ctx.current_span().id().map(|p| get_id(ctx, p))
154            } else {
155                attrs.parent().map(|p| get_id(ctx, p))
156            },
157            name: attrs.metadata().name().into(),
158            target: attrs.metadata().target().into(),
159            values: values.values,
160        });
161    }
162
163    fn on_close(&self, id: span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
164        let ts = self.start.elapsed().as_micros() as u64;
165        self.write(TraceRow::End {
166            ts,
167            id: get_id(ctx, &id),
168        });
169    }
170
171    fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
172        let ts = self.start.elapsed().as_micros() as u64;
173        let thread_id = thread::current().id().as_u64().into();
174        self.maybe_report_memory_sample(ts);
175        self.report_allocations(ts, thread_id);
176        self.write(TraceRow::Enter {
177            ts,
178            id: get_id(ctx, id),
179            thread_id,
180        });
181    }
182
183    fn on_exit(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
184        let ts = self.start.elapsed().as_micros() as u64;
185        let thread_id = thread::current().id().as_u64().into();
186        self.report_allocations(ts, thread_id);
187        self.write(TraceRow::Exit {
188            ts,
189            id: get_id(ctx, id),
190            thread_id,
191        });
192    }
193
194    fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
195        let ts = self.start.elapsed().as_micros() as u64;
196        let mut values = ValuesVisitor::new();
197        event.record(&mut values);
198        self.write(TraceRow::Event {
199            ts,
200            parent: if event.is_contextual() {
201                ctx.current_span().id().map(|p| get_id(ctx, p))
202            } else {
203                event.parent().map(|p| get_id(ctx, p))
204            },
205            values: values.values,
206        });
207    }
208
209    fn on_record(
210        &self,
211        id: &span::Id,
212        record: &span::Record<'_>,
213        ctx: tracing_subscriber::layer::Context<'_, S>,
214    ) {
215        let mut values = ValuesVisitor::new();
216        record.record(&mut values);
217        self.write(TraceRow::Record {
218            id: get_id(ctx, id),
219            values: values.values,
220        });
221    }
222}
223
224struct ValuesVisitor {
225    values: Vec<(Cow<'static, str>, TraceValue<'static>)>,
226}
227
228impl ValuesVisitor {
229    fn new() -> Self {
230        Self { values: Vec::new() }
231    }
232}
233
234impl Visit for ValuesVisitor {
235    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
236        let mut str = String::new();
237        let _ = write!(str, "{value:?}");
238        self.values
239            .push((field.name().into(), TraceValue::String(str.into())));
240    }
241
242    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
243        self.values
244            .push((field.name().into(), TraceValue::Float(value)));
245    }
246
247    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
248        self.values
249            .push((field.name().into(), TraceValue::Int(value)));
250    }
251
252    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
253        self.values
254            .push((field.name().into(), TraceValue::UInt(value)));
255    }
256
257    fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
258        self.record_debug(field, &value)
259    }
260
261    fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
262        self.record_debug(field, &value)
263    }
264
265    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
266        self.values
267            .push((field.name().into(), TraceValue::Bool(value)));
268    }
269
270    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
271        self.values.push((
272            field.name().into(),
273            TraceValue::String(value.to_string().into()),
274        ));
275    }
276
277    fn record_error(
278        &mut self,
279        field: &tracing::field::Field,
280        value: &(dyn std::error::Error + 'static),
281    ) {
282        self.record_debug(field, &display(value))
283    }
284}