turbo_trace_server/reader/
mod.rs

1mod heaptrack;
2mod nextjs;
3mod turbopack;
4
5use std::{
6    any::Any,
7    env,
8    fs::File,
9    io::{self, BufReader, Read, Seek, SeekFrom},
10    path::PathBuf,
11    sync::Arc,
12    thread::{self, JoinHandle},
13    time::{Duration, Instant},
14};
15
16use anyhow::Result;
17use flate2::bufread::GzDecoder;
18
19use crate::{
20    reader::{heaptrack::HeaptrackFormat, nextjs::NextJsFormat, turbopack::TurbopackFormat},
21    store_container::StoreContainer,
22};
23
24const MIN_INITIAL_REPORT_SIZE: u64 = 100 * 1024 * 1024;
25
26trait TraceFormat {
27    type Reused: Default;
28    fn read(&mut self, buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize>;
29    fn stats(&self) -> String {
30        String::new()
31    }
32}
33
34type ErasedReused = Box<dyn Any>;
35
36struct ErasedTraceFormat(Box<dyn ObjectSafeTraceFormat>);
37
38trait ObjectSafeTraceFormat {
39    fn create_reused(&self) -> ErasedReused;
40    fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize>;
41    fn stats(&self) -> String;
42}
43
44impl<T: TraceFormat> ObjectSafeTraceFormat for T
45where
46    T::Reused: 'static,
47{
48    fn create_reused(&self) -> ErasedReused {
49        Box::new(T::Reused::default())
50    }
51
52    fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
53        let reuse = reuse.downcast_mut().expect("Type of reuse is invalid");
54        TraceFormat::read(self, buffer, reuse)
55    }
56
57    fn stats(&self) -> String {
58        TraceFormat::stats(self)
59    }
60}
61
62impl ObjectSafeTraceFormat for ErasedTraceFormat {
63    fn create_reused(&self) -> ErasedReused {
64        self.0.create_reused()
65    }
66
67    fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
68        self.0.read(buffer, reuse)
69    }
70
71    fn stats(&self) -> String {
72        self.0.stats()
73    }
74}
75
76#[derive(Default)]
77enum TraceFile {
78    Raw(BufReader<File>),
79    Zstd(zstd::Decoder<'static, BufReader<File>>),
80    Gz(GzDecoder<BufReader<File>>),
81    #[default]
82    Unloaded,
83}
84
85impl TraceFile {
86    fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
87        match self {
88            Self::Raw(file) => file.read(buffer),
89            Self::Zstd(decoder) => decoder.read(buffer),
90            Self::Gz(decoder) => decoder.read(buffer),
91            Self::Unloaded => unreachable!(),
92        }
93    }
94
95    fn stream_position(&mut self) -> io::Result<u64> {
96        match self {
97            Self::Raw(file) => file.stream_position(),
98            Self::Zstd(decoder) => decoder.get_mut().stream_position(),
99            Self::Gz(decoder) => decoder.get_mut().stream_position(),
100            Self::Unloaded => unreachable!(),
101        }
102    }
103
104    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
105        match self {
106            Self::Raw(file) => file.seek(pos),
107            Self::Zstd(decoder) => decoder.get_mut().seek(pos),
108            Self::Gz(decoder) => decoder.get_mut().seek(pos),
109            Self::Unloaded => unreachable!(),
110        }
111    }
112
113    fn size(&mut self) -> io::Result<u64> {
114        match self {
115            Self::Raw(file) => file.get_ref().metadata().map(|m| m.len()),
116            Self::Zstd(decoder) => decoder.get_mut().get_ref().metadata().map(|m| m.len()),
117            Self::Gz(decoder) => decoder.get_mut().get_ref().metadata().map(|m| m.len()),
118            Self::Unloaded => unreachable!(),
119        }
120    }
121}
122
123pub struct TraceReader {
124    store: Arc<StoreContainer>,
125    path: PathBuf,
126}
127
128impl TraceReader {
129    pub fn spawn(store: Arc<StoreContainer>, path: PathBuf) -> JoinHandle<()> {
130        let mut reader = Self { store, path };
131        std::thread::spawn(move || reader.run())
132    }
133
134    pub fn run(&mut self) {
135        let mut file_warning_printed = false;
136        loop {
137            let read_success = self.try_read();
138            if !file_warning_printed && !read_success {
139                println!("Unable to read trace file at {:?}, waiting...", self.path);
140                file_warning_printed = true;
141            }
142            thread::sleep(Duration::from_millis(500));
143        }
144    }
145
146    fn trace_file_from_file(&self, file: File) -> io::Result<TraceFile> {
147        let path = &self.path.to_string_lossy();
148        let mut file = BufReader::with_capacity(
149            // zstd max block size (1 << 17) + block header (3) + magic bytes (4)
150            (1 << 17) + 7,
151            file,
152        );
153        let magic_bytes = file.peek(4)?;
154        Ok(
155            if path.ends_with(".zst") || magic_bytes == [0x28, 0xb5, 0x2f, 0xfd] {
156                TraceFile::Zstd(zstd::Decoder::with_buffer(file)?)
157            } else if path.ends_with(".gz") || matches!(magic_bytes, [0x1f, 0x8b, _, _]) {
158                TraceFile::Gz(GzDecoder::new(file))
159            } else {
160                TraceFile::Raw(file)
161            },
162        )
163    }
164
165    fn try_read(&mut self) -> bool {
166        let Ok(mut file) = File::open(&self.path) else {
167            return false;
168        };
169        println!("Trace file opened");
170        let stop_at = env::var("STOP_AT")
171            .unwrap_or_default()
172            .parse()
173            .map_or(u64::MAX, |v: u64| v * 1024 * 1024);
174        if stop_at != u64::MAX {
175            println!("Will stop reading file at {} MB", stop_at / 1024 / 1024)
176        }
177
178        {
179            let mut store = self.store.write();
180            store.reset();
181        }
182
183        let mut format: Option<(ErasedTraceFormat, ErasedReused)> = None;
184
185        let mut current_read = 0;
186        let mut initial_read = file
187            .seek(SeekFrom::End(0))
188            .ok()
189            .map(|total| (total, Instant::now()));
190        if file.seek(SeekFrom::Start(0)).is_err() {
191            return false;
192        }
193        let mut file = match self.trace_file_from_file(file) {
194            Ok(f) => f,
195            Err(err) => {
196                println!("Error creating zstd decoder: {err}");
197                return false;
198            }
199        };
200
201        let mut buffer = Vec::new();
202        let mut index = 0;
203
204        let mut chunk = vec![0; 64 * 1024 * 1024];
205        loop {
206            match file.read(&mut chunk) {
207                Ok(bytes_read) => {
208                    if bytes_read == 0 {
209                        if let Some(value) = self.wait_for_more_data(
210                            &mut file,
211                            &mut initial_read,
212                            format.as_ref().map(|(f, _)| f),
213                        ) {
214                            return value;
215                        }
216                    } else {
217                        // If we have partially consumed some data, and we are at buffer capacity,
218                        // remove the consumed data to make more space.
219                        if index > 0 && buffer.len() + bytes_read > buffer.capacity() {
220                            buffer.splice(..index, std::iter::empty());
221                            index = 0;
222                        }
223                        buffer.extend_from_slice(&chunk[..bytes_read]);
224                        if format.is_none() && buffer.len() >= 8 {
225                            let erased_format = if buffer.starts_with(b"TRACEv0") {
226                                index = 7;
227                                ErasedTraceFormat(Box::new(TurbopackFormat::new(
228                                    self.store.clone(),
229                                )))
230                            } else if buffer.starts_with(b"[{\"name\"") {
231                                ErasedTraceFormat(Box::new(NextJsFormat::new(self.store.clone())))
232                            } else if buffer.starts_with(b"v ") {
233                                ErasedTraceFormat(Box::new(HeaptrackFormat::new(
234                                    self.store.clone(),
235                                )))
236                            } else {
237                                // Fallback to the format without magic bytes
238                                // TODO Remove this after a while and show an error instead
239                                ErasedTraceFormat(Box::new(TurbopackFormat::new(
240                                    self.store.clone(),
241                                )))
242                            };
243                            let reuse = erased_format.create_reused();
244                            format = Some((erased_format, reuse));
245                        }
246                        if let Some((format, reuse)) = &mut format {
247                            match format.read(&buffer[index..], reuse) {
248                                Ok(bytes_read) => {
249                                    index += bytes_read;
250                                }
251                                Err(err) => {
252                                    println!("Trace file error: {err}");
253                                    return true;
254                                }
255                            }
256                            if self.store.want_to_read() {
257                                thread::yield_now();
258                            }
259                            let prev_read = current_read;
260                            current_read += bytes_read as u64;
261                            if let Some((total, start)) = &mut initial_read {
262                                let old_mbs = prev_read / (97 * 1024 * 1024);
263                                let new_mbs = current_read / (97 * 1024 * 1024);
264                                if old_mbs != new_mbs {
265                                    let pos = file.stream_position().unwrap_or(current_read);
266                                    if pos > *total {
267                                        *total = file.size().unwrap_or(pos);
268                                    }
269                                    *total = (*total).max(pos);
270                                    let percentage = pos * 100 / *total;
271                                    let read = pos / (1024 * 1024);
272                                    let uncompressed = current_read / (1024 * 1024);
273                                    let total = *total / (1024 * 1024);
274                                    let stats = format.stats();
275                                    print!(
276                                        "{}% read ({}/{} MB, {} MB/s)",
277                                        percentage,
278                                        read,
279                                        total,
280                                        read * 1000 / (start.elapsed().as_millis() + 1) as u64
281                                    );
282                                    if uncompressed != read {
283                                        print!(" ({uncompressed} MB uncompressed)");
284                                    }
285                                    if stats.is_empty() {
286                                        println!();
287                                    } else {
288                                        println!(" - {stats}");
289                                    }
290                                }
291                            }
292                            if current_read >= stop_at {
293                                println!(
294                                    "Stopped reading file as requested by STOP_AT env var. \
295                                     Waiting for new file..."
296                                );
297                                self.wait_for_new_file(&mut file);
298                                return true;
299                            }
300                        }
301                    }
302                }
303                Err(err) => {
304                    if err.kind() == io::ErrorKind::UnexpectedEof
305                        || err.kind() == io::ErrorKind::InvalidInput
306                    {
307                        if let Some(value) = self.wait_for_more_data(
308                            &mut file,
309                            &mut initial_read,
310                            format.as_ref().map(|(f, _)| f),
311                        ) {
312                            return value;
313                        }
314                    } else {
315                        // Error reading file, maybe it was removed
316                        println!("Error reading trace file: {err:?}");
317                        return true;
318                    }
319                }
320            }
321        }
322    }
323
324    fn wait_for_more_data(
325        &mut self,
326        file: &mut TraceFile,
327        initial_read: &mut Option<(u64, Instant)>,
328        format: Option<&ErasedTraceFormat>,
329    ) -> Option<bool> {
330        let Ok(pos) = file.stream_position() else {
331            return Some(true);
332        };
333        if let Some((total, start)) = initial_read.take() {
334            if let Some(format) = format {
335                let stats = format.stats();
336                println!("{stats}");
337            }
338            if total > MIN_INITIAL_REPORT_SIZE {
339                println!(
340                    "Initial read completed ({} MB, {}s)",
341                    total / (1024 * 1024),
342                    (start.elapsed().as_millis() / 100) as f32 / 10.0
343                );
344            }
345        }
346        loop {
347            // No more data to read, sleep for a while to wait for more data
348            thread::sleep(Duration::from_millis(100));
349            let Ok(mut real_file) = File::open(&self.path) else {
350                return Some(true);
351            };
352            let Ok(end) = real_file.seek(SeekFrom::End(0)) else {
353                return Some(true);
354            };
355            if end < pos {
356                // new file
357                return Some(true);
358            } else if end != pos {
359                // file has more data
360                return None;
361            }
362        }
363    }
364
365    fn wait_for_new_file(&self, file: &mut TraceFile) {
366        let Ok(pos) = file.stream_position() else {
367            return;
368        };
369        loop {
370            thread::sleep(Duration::from_millis(1000));
371            let Ok(end) = file.seek(SeekFrom::End(0)) else {
372                return;
373            };
374            if end < pos {
375                return;
376            }
377        }
378    }
379}