1use std::{
2 borrow::Cow,
3 cell::Cell,
4 fmt::Write,
5 marker::PhantomData,
6 sync::atomic::{AtomicU64, Ordering},
7 thread,
8 time::Instant,
9};
10
11use tracing::{
12 Subscriber,
13 field::{Visit, display},
14 span,
15};
16use tracing_subscriber::{Layer, registry::LookupSpan};
17use turbo_tasks_malloc::TurboMalloc;
18
19use crate::{
20 flavor::WriteGuardFlavor,
21 trace_writer::TraceWriter,
22 tracing::{TraceRow, TraceValue},
23};
24
25const MEMORY_SAMPLE_INTERVAL_US: u64 = 10_000;
27
28static GLOBAL_LAST_MEMORY_SAMPLE: AtomicU64 = AtomicU64::new(0);
29
30thread_local! {
31 static THREAD_LOCAL_LAST_MEMORY_SAMPLE: Cell<u64> = const { Cell::new(0) };
32}
33
34pub struct RawTraceLayerOptions {}
35
36struct RawTraceLayerExtension {
37 id: u64,
38}
39
40fn get_id<S: Subscriber + for<'a> LookupSpan<'a>>(
41 ctx: tracing_subscriber::layer::Context<'_, S>,
42 id: &span::Id,
43) -> u64 {
44 ctx.span(id)
45 .unwrap()
46 .extensions()
47 .get::<RawTraceLayerExtension>()
48 .unwrap()
49 .id
50}
51
52pub struct RawTraceLayer<S: Subscriber + for<'a> LookupSpan<'a>> {
55 trace_writer: TraceWriter,
56 start: Instant,
57 next_id: AtomicU64,
58 _phantom: PhantomData<fn(S)>,
59}
60
61impl<S: Subscriber + for<'a> LookupSpan<'a>> RawTraceLayer<S> {
62 pub fn new(trace_writer: TraceWriter) -> Self {
63 Self {
64 trace_writer,
65 start: Instant::now(),
66 next_id: AtomicU64::new(1),
67 _phantom: PhantomData,
68 }
69 }
70
71 fn write(&self, data: TraceRow<'_>) {
72 let start = TurboMalloc::allocation_counters();
73 let guard = self.trace_writer.start_write();
74 postcard::serialize_with_flavor(&data, WriteGuardFlavor { guard }).unwrap();
75 TurboMalloc::reset_allocation_counters(start);
76 }
77
78 fn maybe_report_memory_sample(&self, ts: u64) {
79 let skip = THREAD_LOCAL_LAST_MEMORY_SAMPLE
81 .with(|tl| ts.saturating_sub(tl.get()) < MEMORY_SAMPLE_INTERVAL_US);
82 if skip {
83 return;
84 }
85
86 let global_last = GLOBAL_LAST_MEMORY_SAMPLE.load(Ordering::Relaxed);
88 if ts.saturating_sub(global_last) < MEMORY_SAMPLE_INTERVAL_US {
89 THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(global_last));
91 return;
92 }
93
94 match GLOBAL_LAST_MEMORY_SAMPLE.compare_exchange(
96 global_last,
97 ts,
98 Ordering::Relaxed,
99 Ordering::Relaxed,
100 ) {
101 Ok(_) => {
102 THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(ts));
104 let memory = TurboMalloc::memory_usage() as u64;
105 self.write(TraceRow::MemorySample { ts, memory });
106 }
107 Err(actual) => {
108 THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(actual));
110 }
111 }
112 }
113
114 fn report_allocations(&self, ts: u64, thread_id: u64) {
115 let allocation_counters = turbo_tasks_malloc::TurboMalloc::allocation_counters();
116 self.write(TraceRow::AllocationCounters {
117 ts,
118 thread_id,
119 allocations: allocation_counters.allocations as u64,
120 deallocations: allocation_counters.deallocations as u64,
121 allocation_count: allocation_counters.allocation_count as u64,
122 deallocation_count: allocation_counters.deallocation_count as u64,
123 });
124 }
125}
126
127impl<S: Subscriber + for<'a> LookupSpan<'a>> Layer<S> for RawTraceLayer<S> {
128 fn on_new_span(
129 &self,
130 attrs: &span::Attributes<'_>,
131 id: &span::Id,
132 ctx: tracing_subscriber::layer::Context<'_, S>,
133 ) {
134 let ts = self.start.elapsed().as_micros() as u64;
135 let mut values = ValuesVisitor::new();
136 attrs.values().record(&mut values);
137 let external_id = self
138 .next_id
139 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
140 ctx.span(id)
141 .unwrap()
142 .extensions_mut()
143 .insert(RawTraceLayerExtension { id: external_id });
144 self.write(TraceRow::Start {
145 ts,
146 id: external_id,
147 parent: if attrs.is_contextual() {
148 ctx.current_span().id().map(|p| get_id(ctx, p))
149 } else {
150 attrs.parent().map(|p| get_id(ctx, p))
151 },
152 name: attrs.metadata().name().into(),
153 target: attrs.metadata().target().into(),
154 values: values.values,
155 });
156 }
157
158 fn on_close(&self, id: span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
159 let ts = self.start.elapsed().as_micros() as u64;
160 self.write(TraceRow::End {
161 ts,
162 id: get_id(ctx, &id),
163 });
164 }
165
166 fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
167 let ts = self.start.elapsed().as_micros() as u64;
168 let thread_id = thread::current().id().as_u64().into();
169 self.maybe_report_memory_sample(ts);
170 self.report_allocations(ts, thread_id);
171 self.write(TraceRow::Enter {
172 ts,
173 id: get_id(ctx, id),
174 thread_id,
175 });
176 }
177
178 fn on_exit(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
179 let ts = self.start.elapsed().as_micros() as u64;
180 let thread_id = thread::current().id().as_u64().into();
181 self.report_allocations(ts, thread_id);
182 self.write(TraceRow::Exit {
183 ts,
184 id: get_id(ctx, id),
185 thread_id,
186 });
187 }
188
189 fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
190 let ts = self.start.elapsed().as_micros() as u64;
191 let mut values = ValuesVisitor::new();
192 event.record(&mut values);
193 self.write(TraceRow::Event {
194 ts,
195 parent: if event.is_contextual() {
196 ctx.current_span().id().map(|p| get_id(ctx, p))
197 } else {
198 event.parent().map(|p| get_id(ctx, p))
199 },
200 values: values.values,
201 });
202 }
203
204 fn on_record(
205 &self,
206 id: &span::Id,
207 record: &span::Record<'_>,
208 ctx: tracing_subscriber::layer::Context<'_, S>,
209 ) {
210 let mut values = ValuesVisitor::new();
211 record.record(&mut values);
212 self.write(TraceRow::Record {
213 id: get_id(ctx, id),
214 values: values.values,
215 });
216 }
217}
218
219struct ValuesVisitor {
220 values: Vec<(Cow<'static, str>, TraceValue<'static>)>,
221}
222
223impl ValuesVisitor {
224 fn new() -> Self {
225 Self { values: Vec::new() }
226 }
227}
228
229impl Visit for ValuesVisitor {
230 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
231 let mut str = String::new();
232 let _ = write!(str, "{value:?}");
233 self.values
234 .push((field.name().into(), TraceValue::String(str.into())));
235 }
236
237 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
238 self.values
239 .push((field.name().into(), TraceValue::Float(value)));
240 }
241
242 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
243 self.values
244 .push((field.name().into(), TraceValue::Int(value)));
245 }
246
247 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
248 self.values
249 .push((field.name().into(), TraceValue::UInt(value)));
250 }
251
252 fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
253 self.record_debug(field, &value)
254 }
255
256 fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
257 self.record_debug(field, &value)
258 }
259
260 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
261 self.values
262 .push((field.name().into(), TraceValue::Bool(value)));
263 }
264
265 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
266 self.values.push((
267 field.name().into(),
268 TraceValue::String(value.to_string().into()),
269 ));
270 }
271
272 fn record_error(
273 &mut self,
274 field: &tracing::field::Field,
275 value: &(dyn std::error::Error + 'static),
276 ) {
277 self.record_debug(field, &display(value))
278 }
279}