turbo_trace_server/reader/
mod.rs

1mod heaptrack;
2mod nextjs;
3mod turbopack;
4
5use std::{
6    any::Any,
7    env,
8    fs::File,
9    io::{self, BufReader, Read, Seek, SeekFrom},
10    path::PathBuf,
11    sync::Arc,
12    thread::{self, JoinHandle},
13    time::{Duration, Instant},
14};
15
16use anyhow::Result;
17use flate2::bufread::GzDecoder;
18
19use crate::{
20    reader::{heaptrack::HeaptrackFormat, nextjs::NextJsFormat, turbopack::TurbopackFormat},
21    store_container::StoreContainer,
22};
23
24const MIN_INITIAL_REPORT_SIZE: u64 = 100 * 1024 * 1024;
25
26trait TraceFormat {
27    type Reused: Default;
28    fn read(&mut self, buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize>;
29    fn stats(&self) -> String {
30        String::new()
31    }
32}
33
34type ErasedReused = Box<dyn Any>;
35
36struct ErasedTraceFormat(Box<dyn ObjectSafeTraceFormat>);
37
38trait ObjectSafeTraceFormat {
39    fn create_reused(&self) -> ErasedReused;
40    fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize>;
41    fn stats(&self) -> String;
42}
43
44impl<T: TraceFormat> ObjectSafeTraceFormat for T
45where
46    T::Reused: 'static,
47{
48    fn create_reused(&self) -> ErasedReused {
49        Box::new(T::Reused::default())
50    }
51
52    fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
53        let reuse = reuse.downcast_mut().expect("Type of reuse is invalid");
54        TraceFormat::read(self, buffer, reuse)
55    }
56
57    fn stats(&self) -> String {
58        TraceFormat::stats(self)
59    }
60}
61
62impl ObjectSafeTraceFormat for ErasedTraceFormat {
63    fn create_reused(&self) -> ErasedReused {
64        self.0.create_reused()
65    }
66
67    fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
68        self.0.read(buffer, reuse)
69    }
70
71    fn stats(&self) -> String {
72        self.0.stats()
73    }
74}
75
76#[derive(Default)]
77enum TraceFile {
78    Raw(File),
79    Zstd(zstd::Decoder<'static, BufReader<File>>),
80    Gz(GzDecoder<BufReader<File>>),
81    #[default]
82    Unloaded,
83}
84
85impl TraceFile {
86    fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
87        match self {
88            Self::Raw(file) => file.read(buffer),
89            Self::Zstd(decoder) => decoder.read(buffer),
90            Self::Gz(decoder) => decoder.read(buffer),
91            Self::Unloaded => unreachable!(),
92        }
93    }
94
95    fn stream_position(&mut self) -> io::Result<u64> {
96        match self {
97            Self::Raw(file) => file.stream_position(),
98            Self::Zstd(decoder) => decoder.get_mut().stream_position(),
99            Self::Gz(decoder) => decoder.get_mut().stream_position(),
100            Self::Unloaded => unreachable!(),
101        }
102    }
103
104    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
105        match self {
106            Self::Raw(file) => file.seek(pos),
107            Self::Zstd(decoder) => decoder.get_mut().seek(pos),
108            Self::Gz(decoder) => decoder.get_mut().seek(pos),
109            Self::Unloaded => unreachable!(),
110        }
111    }
112
113    fn size(&mut self) -> io::Result<u64> {
114        match self {
115            Self::Raw(file) => file.metadata().map(|m| m.len()),
116            Self::Zstd(decoder) => decoder.get_mut().get_ref().metadata().map(|m| m.len()),
117            Self::Gz(decoder) => decoder.get_mut().get_ref().metadata().map(|m| m.len()),
118            Self::Unloaded => unreachable!(),
119        }
120    }
121}
122
123pub struct TraceReader {
124    store: Arc<StoreContainer>,
125    path: PathBuf,
126}
127
128impl TraceReader {
129    pub fn spawn(store: Arc<StoreContainer>, path: PathBuf) -> JoinHandle<()> {
130        let mut reader = Self { store, path };
131        std::thread::spawn(move || reader.run())
132    }
133
134    pub fn run(&mut self) {
135        let mut file_warning_printed = false;
136        loop {
137            let read_success = self.try_read();
138            if !file_warning_printed && !read_success {
139                println!("Unable to read trace file at {:?}, waiting...", self.path);
140                file_warning_printed = true;
141            }
142            thread::sleep(Duration::from_millis(500));
143        }
144    }
145
146    fn trace_file_from_file(&self, file: File) -> io::Result<TraceFile> {
147        let path = &self.path.to_string_lossy();
148        Ok(if path.ends_with(".zst") {
149            TraceFile::Zstd(zstd::Decoder::new(file)?)
150        } else if path.ends_with(".gz") {
151            TraceFile::Gz(GzDecoder::new(BufReader::new(file)))
152        } else {
153            TraceFile::Raw(file)
154        })
155    }
156
157    fn try_read(&mut self) -> bool {
158        let Ok(mut file) = File::open(&self.path) else {
159            return false;
160        };
161        println!("Trace file opened");
162        let stop_at = env::var("STOP_AT")
163            .unwrap_or_default()
164            .parse()
165            .map_or(u64::MAX, |v: u64| v * 1024 * 1024);
166        if stop_at != u64::MAX {
167            println!("Will stop reading file at {} MB", stop_at / 1024 / 1024)
168        }
169
170        {
171            let mut store = self.store.write();
172            store.reset();
173        }
174
175        let mut format: Option<(ErasedTraceFormat, ErasedReused)> = None;
176
177        let mut current_read = 0;
178        let mut initial_read = file
179            .seek(SeekFrom::End(0))
180            .ok()
181            .map(|total| (total, Instant::now()));
182        if file.seek(SeekFrom::Start(0)).is_err() {
183            return false;
184        }
185        let mut file = match self.trace_file_from_file(file) {
186            Ok(f) => f,
187            Err(err) => {
188                println!("Error creating zstd decoder: {err}");
189                return false;
190            }
191        };
192
193        let mut buffer = Vec::new();
194        let mut index = 0;
195
196        let mut chunk = vec![0; 64 * 1024 * 1024];
197        loop {
198            match file.read(&mut chunk) {
199                Ok(bytes_read) => {
200                    if bytes_read == 0 {
201                        if let Some(value) = self.wait_for_more_data(
202                            &mut file,
203                            &mut initial_read,
204                            format.as_ref().map(|(f, _)| f),
205                        ) {
206                            return value;
207                        }
208                    } else {
209                        // If we have partially consumed some data, and we are at buffer capacity,
210                        // remove the consumed data to make more space.
211                        if index > 0 && buffer.len() + bytes_read > buffer.capacity() {
212                            buffer.splice(..index, std::iter::empty());
213                            index = 0;
214                        }
215                        buffer.extend_from_slice(&chunk[..bytes_read]);
216                        if format.is_none() && buffer.len() >= 8 {
217                            let erased_format = if buffer.starts_with(b"TRACEv0") {
218                                index = 7;
219                                ErasedTraceFormat(Box::new(TurbopackFormat::new(
220                                    self.store.clone(),
221                                )))
222                            } else if buffer.starts_with(b"[{\"name\"") {
223                                ErasedTraceFormat(Box::new(NextJsFormat::new(self.store.clone())))
224                            } else if buffer.starts_with(b"v ") {
225                                ErasedTraceFormat(Box::new(HeaptrackFormat::new(
226                                    self.store.clone(),
227                                )))
228                            } else {
229                                // Fallback to the format without magic bytes
230                                // TODO Remove this after a while and show an error instead
231                                ErasedTraceFormat(Box::new(TurbopackFormat::new(
232                                    self.store.clone(),
233                                )))
234                            };
235                            let reuse = erased_format.create_reused();
236                            format = Some((erased_format, reuse));
237                        }
238                        if let Some((format, reuse)) = &mut format {
239                            match format.read(&buffer[index..], reuse) {
240                                Ok(bytes_read) => {
241                                    index += bytes_read;
242                                }
243                                Err(err) => {
244                                    println!("Trace file error: {err}");
245                                    return true;
246                                }
247                            }
248                            if self.store.want_to_read() {
249                                thread::yield_now();
250                            }
251                            let prev_read = current_read;
252                            current_read += bytes_read as u64;
253                            if let Some((total, start)) = &mut initial_read {
254                                let old_mbs = prev_read / (97 * 1024 * 1024);
255                                let new_mbs = current_read / (97 * 1024 * 1024);
256                                if old_mbs != new_mbs {
257                                    let pos = file.stream_position().unwrap_or(current_read);
258                                    if pos > *total {
259                                        *total = file.size().unwrap_or(pos);
260                                    }
261                                    *total = (*total).max(pos);
262                                    let percentage = pos * 100 / *total;
263                                    let read = pos / (1024 * 1024);
264                                    let uncompressed = current_read / (1024 * 1024);
265                                    let total = *total / (1024 * 1024);
266                                    let stats = format.stats();
267                                    print!(
268                                        "{}% read ({}/{} MB, {} MB/s)",
269                                        percentage,
270                                        read,
271                                        total,
272                                        read * 1000 / (start.elapsed().as_millis() + 1) as u64
273                                    );
274                                    if uncompressed != read {
275                                        print!(" ({uncompressed} MB uncompressed)");
276                                    }
277                                    if stats.is_empty() {
278                                        println!();
279                                    } else {
280                                        println!(" - {stats}");
281                                    }
282                                }
283                            }
284                            if current_read >= stop_at {
285                                println!(
286                                    "Stopped reading file as requested by STOP_AT env var. \
287                                     Waiting for new file..."
288                                );
289                                self.wait_for_new_file(&mut file);
290                                return true;
291                            }
292                        }
293                    }
294                }
295                Err(err) => {
296                    if err.kind() == io::ErrorKind::UnexpectedEof {
297                        if let Some(value) = self.wait_for_more_data(
298                            &mut file,
299                            &mut initial_read,
300                            format.as_ref().map(|(f, _)| f),
301                        ) {
302                            return value;
303                        }
304                    } else {
305                        // Error reading file, maybe it was removed
306                        println!("Error reading trace file: {err:?}");
307                        return true;
308                    }
309                }
310            }
311        }
312    }
313
314    fn wait_for_more_data(
315        &mut self,
316        file: &mut TraceFile,
317        initial_read: &mut Option<(u64, Instant)>,
318        format: Option<&ErasedTraceFormat>,
319    ) -> Option<bool> {
320        let Ok(pos) = file.stream_position() else {
321            return Some(true);
322        };
323        if let Some((total, start)) = initial_read.take() {
324            if let Some(format) = format {
325                let stats = format.stats();
326                println!("{stats}");
327            }
328            if total > MIN_INITIAL_REPORT_SIZE {
329                println!(
330                    "Initial read completed ({} MB, {}s)",
331                    total / (1024 * 1024),
332                    (start.elapsed().as_millis() / 100) as f32 / 10.0
333                );
334            }
335        }
336        loop {
337            // No more data to read, sleep for a while to wait for more data
338            thread::sleep(Duration::from_millis(100));
339            let Ok(mut real_file) = File::open(&self.path) else {
340                return Some(true);
341            };
342            let Ok(end) = real_file.seek(SeekFrom::End(0)) else {
343                return Some(true);
344            };
345            if end < pos {
346                // new file
347                return Some(true);
348            } else if end != pos {
349                // file has more data
350                return None;
351            }
352        }
353    }
354
355    fn wait_for_new_file(&self, file: &mut TraceFile) {
356        let Ok(pos) = file.stream_position() else {
357            return;
358        };
359        loop {
360            thread::sleep(Duration::from_millis(1000));
361            let Ok(end) = file.seek(SeekFrom::End(0)) else {
362                return;
363            };
364            if end < pos {
365                return;
366            }
367        }
368    }
369}