Skip to main content

turbo_trace_server/
store.rs

1use std::{
2    cmp::{max, min},
3    env,
4    num::NonZeroUsize,
5    sync::{OnceLock, atomic::AtomicU64},
6};
7
8use rustc_hash::FxHashSet;
9
10use crate::{
11    self_time_tree::SelfTimeTree,
12    span::{Span, SpanEvent, SpanIndex},
13    span_ref::SpanRef,
14    timestamp::Timestamp,
15};
16
17pub type SpanId = NonZeroUsize;
18
19/// This max depth is used to avoid deep recursion in the span tree,
20/// which can lead to stack overflows and performance issues.
21/// Spans deeper than this depth will be re-parented to an ancestor
22/// at the cut-off depth (Flattening).
23const CUT_OFF_DEPTH: u32 = 80;
24
25/// A single memory usage sample: (timestamp, memory_bytes).
26/// Sorted by timestamp.
27type MemorySample = (Timestamp, u64);
28
29/// Maximum number of memory samples returned in a query result.
30const MAX_MEMORY_SAMPLES: usize = 200;
31
32pub struct Store {
33    pub(crate) spans: Vec<Span>,
34    pub(crate) self_time_tree: Option<SelfTimeTree<SpanIndex>>,
35    max_self_time_lookup_time: AtomicU64,
36    /// Global sorted list of memory samples (timestamp, memory_bytes).
37    memory_samples: Vec<MemorySample>,
38}
39
40fn new_root_span() -> Span {
41    Span {
42        parent: None,
43        depth: 0,
44        start: Timestamp::MAX,
45        category: "".into(),
46        name: "(root)".into(),
47        args: vec![],
48        events: vec![],
49        is_complete: true,
50        max_depth: OnceLock::new(),
51        self_allocations: 0,
52        self_allocation_count: 0,
53        self_deallocations: 0,
54        self_deallocation_count: 0,
55        total_allocations: OnceLock::new(),
56        total_deallocations: OnceLock::new(),
57        total_persistent_allocations: OnceLock::new(),
58        total_allocation_count: OnceLock::new(),
59        total_span_count: OnceLock::new(),
60        time_data: OnceLock::new(),
61        extra: OnceLock::new(),
62        names: OnceLock::new(),
63    }
64}
65
66impl Store {
67    pub fn new() -> Self {
68        Self {
69            spans: vec![new_root_span()],
70            self_time_tree: env::var("NO_CORRECTED_TIME")
71                .ok()
72                .is_none()
73                .then(SelfTimeTree::new),
74            max_self_time_lookup_time: AtomicU64::new(0),
75            memory_samples: Vec::new(),
76        }
77    }
78
79    pub fn reset(&mut self) {
80        self.spans.truncate(1);
81        self.spans[0] = new_root_span();
82        if let Some(tree) = self.self_time_tree.as_mut() {
83            *tree = SelfTimeTree::new();
84        }
85        *self.max_self_time_lookup_time.get_mut() = 0;
86        self.memory_samples.clear();
87    }
88
89    pub fn has_time_info(&self) -> bool {
90        self.self_time_tree
91            .as_ref()
92            .is_none_or(|tree| tree.len() > 0)
93    }
94
95    pub fn add_span(
96        &mut self,
97        parent: Option<SpanIndex>,
98        start: Timestamp,
99        category: String,
100        name: String,
101        args: Vec<(String, String)>,
102        outdated_spans: &mut FxHashSet<SpanIndex>,
103    ) -> SpanIndex {
104        let id = SpanIndex::new(self.spans.len()).unwrap();
105        self.spans.push(Span {
106            parent,
107            depth: 0,
108            start,
109            category,
110            name,
111            args,
112            events: vec![],
113            is_complete: false,
114            max_depth: OnceLock::new(),
115            self_allocations: 0,
116            self_allocation_count: 0,
117            self_deallocations: 0,
118            self_deallocation_count: 0,
119            total_allocations: OnceLock::new(),
120            total_deallocations: OnceLock::new(),
121            total_persistent_allocations: OnceLock::new(),
122            total_allocation_count: OnceLock::new(),
123            total_span_count: OnceLock::new(),
124            time_data: OnceLock::new(),
125            extra: OnceLock::new(),
126            names: OnceLock::new(),
127        });
128        let mut parent = if let Some(parent) = parent {
129            outdated_spans.insert(parent);
130            &mut self.spans[parent.get()]
131        } else {
132            &mut self.spans[0]
133        };
134        let mut depth = parent.depth + 1;
135        if depth >= CUT_OFF_DEPTH
136            && let Some(parent_of_parent) = parent.parent
137        {
138            outdated_spans.insert(parent_of_parent);
139            self.spans[id.get()].parent = Some(parent_of_parent);
140            parent = &mut self.spans[parent_of_parent.get()];
141            depth = CUT_OFF_DEPTH - 1;
142        }
143        if depth < CUT_OFF_DEPTH {
144            parent.events.push(SpanEvent::Child { index: id });
145        }
146        parent.start = min(parent.start, start);
147        let span = &mut self.spans[id.get()];
148        span.depth = depth;
149        id
150    }
151
152    pub fn add_args(
153        &mut self,
154        span_index: SpanIndex,
155        args: Vec<(String, String)>,
156        outdated_spans: &mut FxHashSet<SpanIndex>,
157    ) {
158        let span = &mut self.spans[span_index.get()];
159        span.args.extend(args);
160        outdated_spans.insert(span_index);
161    }
162
163    pub fn set_max_self_time_lookup(&self, time: Timestamp) {
164        let time = *time;
165        let mut old = self
166            .max_self_time_lookup_time
167            .load(std::sync::atomic::Ordering::Relaxed);
168        while old < time {
169            match self.max_self_time_lookup_time.compare_exchange(
170                old,
171                time,
172                std::sync::atomic::Ordering::Relaxed,
173                std::sync::atomic::Ordering::Relaxed,
174            ) {
175                Ok(_) => break,
176                Err(real_old) => old = real_old,
177            }
178        }
179    }
180
181    fn insert_self_time(
182        &mut self,
183        start: Timestamp,
184        end: Timestamp,
185        span_index: SpanIndex,
186        outdated_spans: &mut FxHashSet<SpanIndex>,
187    ) {
188        if let Some(tree) = self.self_time_tree.as_mut() {
189            if Timestamp::from_value(*self.max_self_time_lookup_time.get_mut()) >= start {
190                tree.for_each_in_range(start, end, |_, _, span| {
191                    outdated_spans.insert(*span);
192                });
193            }
194            tree.insert(start, end, span_index);
195        }
196    }
197
198    pub fn add_self_time(
199        &mut self,
200        span_index: SpanIndex,
201        start: Timestamp,
202        end: Timestamp,
203        outdated_spans: &mut FxHashSet<SpanIndex>,
204    ) {
205        let span = &mut self.spans[span_index.get()];
206        let time_data = span.time_data_mut();
207        if time_data.ignore_self_time {
208            return;
209        }
210        outdated_spans.insert(span_index);
211        time_data.self_time += end - start;
212        time_data.self_end = max(time_data.self_end, end);
213        span.events.push(SpanEvent::SelfTime { start, end });
214        self.insert_self_time(start, end, span_index, outdated_spans);
215    }
216
217    pub fn set_total_time(
218        &mut self,
219        span_index: SpanIndex,
220        start_time: Timestamp,
221        total_time: Timestamp,
222        outdated_spans: &mut FxHashSet<SpanIndex>,
223    ) {
224        let span = SpanRef {
225            span: &self.spans[span_index.get()],
226            store: self,
227            index: span_index.get(),
228        };
229        let mut children = span
230            .children()
231            .map(|c| (c.span.start, c.span.time_data().self_end, c.index()))
232            .collect::<Vec<_>>();
233        children.sort();
234        let self_end = start_time + total_time;
235        let mut self_time = Timestamp::ZERO;
236        let mut current = start_time;
237        let mut events = Vec::new();
238        for (start, end, index) in children {
239            if start > current {
240                if start > self_end {
241                    events.push(SpanEvent::SelfTime {
242                        start: current,
243                        end: self_end,
244                    });
245                    self.insert_self_time(current, self_end, span_index, outdated_spans);
246                    self_time += self_end - current;
247                    break;
248                }
249                events.push(SpanEvent::SelfTime {
250                    start: current,
251                    end: start,
252                });
253                self.insert_self_time(current, start, span_index, outdated_spans);
254                self_time += start - current;
255            }
256            events.push(SpanEvent::Child { index });
257            current = max(current, end);
258        }
259        current -= start_time;
260        if current < total_time {
261            self_time += total_time - current;
262            events.push(SpanEvent::SelfTime {
263                start: current + start_time,
264                end: start_time + total_time,
265            });
266            self.insert_self_time(
267                current + start_time,
268                start_time + total_time,
269                span_index,
270                outdated_spans,
271            );
272        }
273        let span = &mut self.spans[span_index.get()];
274        outdated_spans.insert(span_index);
275        let time_data = span.time_data_mut();
276        time_data.self_time = self_time;
277        time_data.self_end = self_end;
278        span.events = events;
279        span.start = start_time;
280    }
281
282    pub fn set_parent(
283        &mut self,
284        span_index: SpanIndex,
285        parent: SpanIndex,
286        outdated_spans: &mut FxHashSet<SpanIndex>,
287    ) {
288        outdated_spans.insert(span_index);
289        let span = &mut self.spans[span_index.get()];
290
291        let old_parent = span.parent.replace(parent);
292        let old_parent = if let Some(parent) = old_parent {
293            outdated_spans.insert(parent);
294            &mut self.spans[parent.get()]
295        } else {
296            &mut self.spans[0]
297        };
298        if let Some(index) = old_parent
299            .events
300            .iter()
301            .position(|event| *event == SpanEvent::Child { index: span_index })
302        {
303            old_parent.events.remove(index);
304        }
305
306        outdated_spans.insert(parent);
307        let parent = &mut self.spans[parent.get()];
308        parent.events.push(SpanEvent::Child { index: span_index });
309    }
310
311    pub fn add_allocation(
312        &mut self,
313        span_index: SpanIndex,
314        allocation: u64,
315        count: u64,
316        outdated_spans: &mut FxHashSet<SpanIndex>,
317    ) {
318        let span = &mut self.spans[span_index.get()];
319        outdated_spans.insert(span_index);
320        span.self_allocations += allocation;
321        span.self_allocation_count += count;
322    }
323
324    pub fn add_deallocation(
325        &mut self,
326        span_index: SpanIndex,
327        deallocation: u64,
328        count: u64,
329        outdated_spans: &mut FxHashSet<SpanIndex>,
330    ) {
331        let span = &mut self.spans[span_index.get()];
332        outdated_spans.insert(span_index);
333        span.self_deallocations += deallocation;
334        span.self_deallocation_count += count;
335    }
336
337    pub fn add_memory_sample(&mut self, ts: Timestamp, memory: u64) {
338        // Samples arrive nearly sorted (roughly chronological from the trace
339        // writer), so an insertion-sort step is efficient: push to the end
340        // then swap backward until the timestamp ordering is restored.
341        self.memory_samples.push((ts, memory));
342        let mut i = self.memory_samples.len() - 1;
343        while i > 0 && self.memory_samples[i - 1].0 > ts {
344            self.memory_samples.swap(i, i - 1);
345            i -= 1;
346        }
347    }
348
349    /// Returns up to `MAX_MEMORY_SAMPLES` memory samples in the range
350    /// `[start, end]`. When more samples exist, groups of N consecutive
351    /// samples are merged by taking the maximum memory value in each group.
352    pub fn memory_samples_for_range(&self, start: Timestamp, end: Timestamp) -> Vec<u64> {
353        // Binary search for the first sample >= start
354        let lo = self.memory_samples.partition_point(|(ts, _)| *ts < start);
355        // Binary search for the first sample > end
356        let hi = self.memory_samples.partition_point(|(ts, _)| *ts <= end);
357
358        let slice = &self.memory_samples[lo..hi];
359        let count = slice.len();
360        if count == 0 {
361            return Vec::new();
362        }
363
364        if count <= MAX_MEMORY_SAMPLES {
365            return slice.iter().map(|(_, mem)| *mem).collect();
366        }
367
368        // Merge groups of N samples, taking the max memory in each group.
369        let n = count.div_ceil(MAX_MEMORY_SAMPLES);
370        slice
371            .chunks(n)
372            .map(|chunk| chunk.iter().map(|(_, mem)| *mem).max().unwrap())
373            .collect()
374    }
375
376    pub fn complete_span(&mut self, span_index: SpanIndex) {
377        let span = &mut self.spans[span_index.get()];
378        span.is_complete = true;
379    }
380
381    pub fn invalidate_outdated_spans(&mut self, outdated_spans: &FxHashSet<SpanId>) {
382        fn invalidate_span(span: &mut Span) {
383            if let Some(time_data) = span.time_data.get_mut() {
384                time_data.end.take();
385                time_data.total_time.take();
386                time_data.corrected_self_time.take();
387                time_data.corrected_total_time.take();
388            }
389            span.total_allocations.take();
390            span.total_deallocations.take();
391            span.total_persistent_allocations.take();
392            span.total_allocation_count.take();
393            span.total_span_count.take();
394            span.extra.take();
395        }
396
397        for id in outdated_spans.iter() {
398            let mut span = &mut self.spans[id.get()];
399            loop {
400                invalidate_span(span);
401                let Some(parent) = span.parent else {
402                    break;
403                };
404                if outdated_spans.contains(&parent) {
405                    break;
406                }
407                span = &mut self.spans[parent.get()];
408            }
409        }
410
411        invalidate_span(&mut self.spans[0]);
412    }
413
414    pub fn root_spans(&self) -> impl Iterator<Item = SpanRef<'_>> {
415        self.spans[0].events.iter().filter_map(|event| match event {
416            &SpanEvent::Child { index: id } => Some(SpanRef {
417                span: &self.spans[id.get()],
418                store: self,
419                index: id.get(),
420            }),
421            _ => None,
422        })
423    }
424
425    pub fn root_span(&self) -> SpanRef<'_> {
426        SpanRef {
427            span: &self.spans[0],
428            store: self,
429            index: 0,
430        }
431    }
432
433    pub fn span(&self, id: SpanId) -> Option<(SpanRef<'_>, bool)> {
434        let id = id.get();
435        let is_graph = id & 1 == 1;
436        let index = id >> 1;
437        self.spans.get(index).map(|span| {
438            (
439                SpanRef {
440                    span,
441                    store: self,
442                    index,
443                },
444                is_graph,
445            )
446        })
447    }
448}