1use std::{
2 borrow::Cow,
3 cell::Cell,
4 fmt::Write,
5 marker::PhantomData,
6 sync::atomic::{AtomicU64, Ordering},
7 thread,
8 time::Instant,
9};
10
11use tracing::{
12 Subscriber,
13 field::{Visit, display},
14 span,
15};
16use tracing_subscriber::{Layer, registry::LookupSpan};
17use turbo_tasks_malloc::TurboMalloc;
18
19use crate::{
20 flavor::WriteGuardFlavor,
21 trace_writer::TraceWriter,
22 tracing::{TraceRow, TraceValue},
23};
24
25const MEMORY_SAMPLE_INTERVAL_US: u64 = 10_000;
27
28static GLOBAL_LAST_MEMORY_SAMPLE: AtomicU64 = AtomicU64::new(0);
29
30thread_local! {
31 static THREAD_LOCAL_LAST_MEMORY_SAMPLE: Cell<u64> = const { Cell::new(0) };
32}
33
34pub struct RawTraceLayerOptions {}
35
36struct RawTraceLayerExtension {
37 id: u64,
38}
39
40fn get_id<S: Subscriber + for<'a> LookupSpan<'a>>(
41 ctx: tracing_subscriber::layer::Context<'_, S>,
42 id: &span::Id,
43) -> u64 {
44 ctx.span(id)
45 .unwrap()
46 .extensions()
47 .get::<RawTraceLayerExtension>()
48 .unwrap()
49 .id
50}
51
52pub struct RawTraceLayer<S: Subscriber + for<'a> LookupSpan<'a>> {
55 trace_writer: TraceWriter,
56 start: Instant,
57 next_id: AtomicU64,
58 _phantom: PhantomData<fn(S)>,
59}
60
61impl<S: Subscriber + for<'a> LookupSpan<'a>> RawTraceLayer<S> {
62 pub fn new(trace_writer: TraceWriter) -> Self {
63 Self {
64 trace_writer,
65 start: Instant::now(),
66 next_id: AtomicU64::new(1),
67 _phantom: PhantomData,
68 }
69 }
70
71 fn write(&self, data: TraceRow<'_>) {
72 let start = TurboMalloc::allocation_counters();
73 let guard = self.trace_writer.start_write();
74 postcard::serialize_with_flavor(&data, WriteGuardFlavor { guard }).unwrap();
75 TurboMalloc::reset_allocation_counters(start);
76 }
77
78 fn maybe_report_memory_sample(&self, ts: u64) {
79 let skip = THREAD_LOCAL_LAST_MEMORY_SAMPLE
81 .with(|tl| ts.saturating_sub(tl.get()) < MEMORY_SAMPLE_INTERVAL_US);
82 if skip {
83 return;
84 }
85
86 let global_last = GLOBAL_LAST_MEMORY_SAMPLE.load(Ordering::Relaxed);
88 if ts.saturating_sub(global_last) < MEMORY_SAMPLE_INTERVAL_US {
89 THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(global_last));
91 return;
92 }
93
94 match GLOBAL_LAST_MEMORY_SAMPLE.compare_exchange(
96 global_last,
97 ts,
98 Ordering::Relaxed,
99 Ordering::Relaxed,
100 ) {
101 Ok(_) => {
102 THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(ts));
104 let memory = TurboMalloc::memory_usage() as u64;
105 let memory_pressure = TurboMalloc::memory_pressure().unwrap_or(0);
106 self.write(TraceRow::MemorySample {
107 ts,
108 memory,
109 memory_pressure,
110 });
111 }
112 Err(actual) => {
113 THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(actual));
115 }
116 }
117 }
118
119 fn report_allocations(&self, ts: u64, thread_id: u64) {
120 let allocation_counters = turbo_tasks_malloc::TurboMalloc::allocation_counters();
121 self.write(TraceRow::AllocationCounters {
122 ts,
123 thread_id,
124 allocations: allocation_counters.allocations as u64,
125 deallocations: allocation_counters.deallocations as u64,
126 allocation_count: allocation_counters.allocation_count as u64,
127 deallocation_count: allocation_counters.deallocation_count as u64,
128 });
129 }
130}
131
132impl<S: Subscriber + for<'a> LookupSpan<'a>> Layer<S> for RawTraceLayer<S> {
133 fn on_new_span(
134 &self,
135 attrs: &span::Attributes<'_>,
136 id: &span::Id,
137 ctx: tracing_subscriber::layer::Context<'_, S>,
138 ) {
139 let ts = self.start.elapsed().as_micros() as u64;
140 let mut values = ValuesVisitor::new();
141 attrs.values().record(&mut values);
142 let external_id = self
143 .next_id
144 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
145 ctx.span(id)
146 .unwrap()
147 .extensions_mut()
148 .insert(RawTraceLayerExtension { id: external_id });
149 self.write(TraceRow::Start {
150 ts,
151 id: external_id,
152 parent: if attrs.is_contextual() {
153 ctx.current_span().id().map(|p| get_id(ctx, p))
154 } else {
155 attrs.parent().map(|p| get_id(ctx, p))
156 },
157 name: attrs.metadata().name().into(),
158 target: attrs.metadata().target().into(),
159 values: values.values,
160 });
161 }
162
163 fn on_close(&self, id: span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
164 let ts = self.start.elapsed().as_micros() as u64;
165 self.write(TraceRow::End {
166 ts,
167 id: get_id(ctx, &id),
168 });
169 }
170
171 fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
172 let ts = self.start.elapsed().as_micros() as u64;
173 let thread_id = thread::current().id().as_u64().into();
174 self.maybe_report_memory_sample(ts);
175 self.report_allocations(ts, thread_id);
176 self.write(TraceRow::Enter {
177 ts,
178 id: get_id(ctx, id),
179 thread_id,
180 });
181 }
182
183 fn on_exit(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
184 let ts = self.start.elapsed().as_micros() as u64;
185 let thread_id = thread::current().id().as_u64().into();
186 self.report_allocations(ts, thread_id);
187 self.write(TraceRow::Exit {
188 ts,
189 id: get_id(ctx, id),
190 thread_id,
191 });
192 }
193
194 fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
195 let ts = self.start.elapsed().as_micros() as u64;
196 let mut values = ValuesVisitor::new();
197 event.record(&mut values);
198 self.write(TraceRow::Event {
199 ts,
200 parent: if event.is_contextual() {
201 ctx.current_span().id().map(|p| get_id(ctx, p))
202 } else {
203 event.parent().map(|p| get_id(ctx, p))
204 },
205 values: values.values,
206 });
207 }
208
209 fn on_record(
210 &self,
211 id: &span::Id,
212 record: &span::Record<'_>,
213 ctx: tracing_subscriber::layer::Context<'_, S>,
214 ) {
215 let mut values = ValuesVisitor::new();
216 record.record(&mut values);
217 self.write(TraceRow::Record {
218 id: get_id(ctx, id),
219 values: values.values,
220 });
221 }
222}
223
224struct ValuesVisitor {
225 values: Vec<(Cow<'static, str>, TraceValue<'static>)>,
226}
227
228impl ValuesVisitor {
229 fn new() -> Self {
230 Self { values: Vec::new() }
231 }
232}
233
234impl Visit for ValuesVisitor {
235 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
236 let mut str = String::new();
237 let _ = write!(str, "{value:?}");
238 self.values
239 .push((field.name().into(), TraceValue::String(str.into())));
240 }
241
242 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
243 self.values
244 .push((field.name().into(), TraceValue::Float(value)));
245 }
246
247 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
248 self.values
249 .push((field.name().into(), TraceValue::Int(value)));
250 }
251
252 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
253 self.values
254 .push((field.name().into(), TraceValue::UInt(value)));
255 }
256
257 fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
258 self.record_debug(field, &value)
259 }
260
261 fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
262 self.record_debug(field, &value)
263 }
264
265 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
266 self.values
267 .push((field.name().into(), TraceValue::Bool(value)));
268 }
269
270 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
271 self.values.push((
272 field.name().into(),
273 TraceValue::String(value.to_string().into()),
274 ));
275 }
276
277 fn record_error(
278 &mut self,
279 field: &tracing::field::Field,
280 value: &(dyn std::error::Error + 'static),
281 ) {
282 self.record_debug(field, &display(value))
283 }
284}