turbo_trace_server/reader/
mod.rs1mod heaptrack;
2mod nextjs;
3mod turbopack;
4
5use std::{
6 any::Any,
7 env,
8 fs::File,
9 io::{self, BufReader, Read, Seek, SeekFrom},
10 path::PathBuf,
11 sync::Arc,
12 thread::{self, JoinHandle},
13 time::{Duration, Instant},
14};
15
16use anyhow::Result;
17use flate2::bufread::GzDecoder;
18
19use crate::{
20 reader::{heaptrack::HeaptrackFormat, nextjs::NextJsFormat, turbopack::TurbopackFormat},
21 store_container::StoreContainer,
22};
23
24const MIN_INITIAL_REPORT_SIZE: u64 = 100 * 1024 * 1024;
25
26trait TraceFormat {
27 type Reused: Default;
28 fn read(&mut self, buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize>;
29 fn stats(&self) -> String {
30 String::new()
31 }
32}
33
34type ErasedReused = Box<dyn Any>;
35
36struct ErasedTraceFormat(Box<dyn ObjectSafeTraceFormat>);
37
38trait ObjectSafeTraceFormat {
39 fn create_reused(&self) -> ErasedReused;
40 fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize>;
41 fn stats(&self) -> String;
42}
43
44impl<T: TraceFormat> ObjectSafeTraceFormat for T
45where
46 T::Reused: 'static,
47{
48 fn create_reused(&self) -> ErasedReused {
49 Box::new(T::Reused::default())
50 }
51
52 fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
53 let reuse = reuse.downcast_mut().expect("Type of reuse is invalid");
54 TraceFormat::read(self, buffer, reuse)
55 }
56
57 fn stats(&self) -> String {
58 TraceFormat::stats(self)
59 }
60}
61
62impl ObjectSafeTraceFormat for ErasedTraceFormat {
63 fn create_reused(&self) -> ErasedReused {
64 self.0.create_reused()
65 }
66
67 fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
68 self.0.read(buffer, reuse)
69 }
70
71 fn stats(&self) -> String {
72 self.0.stats()
73 }
74}
75
76#[derive(Default)]
77enum TraceFile {
78 Raw(BufReader<File>),
79 Zstd(zstd::Decoder<'static, BufReader<File>>),
80 Gz(GzDecoder<BufReader<File>>),
81 #[default]
82 Unloaded,
83}
84
85impl TraceFile {
86 fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
87 match self {
88 Self::Raw(file) => file.read(buffer),
89 Self::Zstd(decoder) => decoder.read(buffer),
90 Self::Gz(decoder) => decoder.read(buffer),
91 Self::Unloaded => unreachable!(),
92 }
93 }
94
95 fn stream_position(&mut self) -> io::Result<u64> {
96 match self {
97 Self::Raw(file) => file.stream_position(),
98 Self::Zstd(decoder) => decoder.get_mut().stream_position(),
99 Self::Gz(decoder) => decoder.get_mut().stream_position(),
100 Self::Unloaded => unreachable!(),
101 }
102 }
103
104 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
105 match self {
106 Self::Raw(file) => file.seek(pos),
107 Self::Zstd(decoder) => decoder.get_mut().seek(pos),
108 Self::Gz(decoder) => decoder.get_mut().seek(pos),
109 Self::Unloaded => unreachable!(),
110 }
111 }
112
113 fn size(&mut self) -> io::Result<u64> {
114 match self {
115 Self::Raw(file) => file.get_ref().metadata().map(|m| m.len()),
116 Self::Zstd(decoder) => decoder.get_mut().get_ref().metadata().map(|m| m.len()),
117 Self::Gz(decoder) => decoder.get_mut().get_ref().metadata().map(|m| m.len()),
118 Self::Unloaded => unreachable!(),
119 }
120 }
121}
122
123pub struct TraceReader {
124 store: Arc<StoreContainer>,
125 path: PathBuf,
126}
127
128impl TraceReader {
129 pub fn spawn(store: Arc<StoreContainer>, path: PathBuf) -> JoinHandle<()> {
130 let mut reader = Self { store, path };
131 std::thread::spawn(move || reader.run())
132 }
133
134 pub fn run(&mut self) {
135 let mut file_warning_printed = false;
136 loop {
137 let read_success = self.try_read();
138 if !file_warning_printed && !read_success {
139 println!("Unable to read trace file at {:?}, waiting...", self.path);
140 file_warning_printed = true;
141 }
142 thread::sleep(Duration::from_millis(500));
143 }
144 }
145
146 fn trace_file_from_file(&self, file: File) -> io::Result<TraceFile> {
147 let path = &self.path.to_string_lossy();
148 let mut file = BufReader::with_capacity(
149 (1 << 17) + 7,
151 file,
152 );
153 let magic_bytes = file.peek(4)?;
154 Ok(
155 if path.ends_with(".zst") || magic_bytes == [0x28, 0xb5, 0x2f, 0xfd] {
156 TraceFile::Zstd(zstd::Decoder::with_buffer(file)?)
157 } else if path.ends_with(".gz") || matches!(magic_bytes, [0x1f, 0x8b, _, _]) {
158 TraceFile::Gz(GzDecoder::new(file))
159 } else {
160 TraceFile::Raw(file)
161 },
162 )
163 }
164
165 fn try_read(&mut self) -> bool {
166 let Ok(mut file) = File::open(&self.path) else {
167 return false;
168 };
169 println!("Trace file opened");
170 let stop_at = env::var("STOP_AT")
171 .unwrap_or_default()
172 .parse()
173 .map_or(u64::MAX, |v: u64| v * 1024 * 1024);
174 if stop_at != u64::MAX {
175 println!("Will stop reading file at {} MB", stop_at / 1024 / 1024)
176 }
177
178 {
179 let mut store = self.store.write();
180 store.reset();
181 }
182
183 let mut format: Option<(ErasedTraceFormat, ErasedReused)> = None;
184
185 let mut current_read = 0;
186 let mut initial_read = file
187 .seek(SeekFrom::End(0))
188 .ok()
189 .map(|total| (total, Instant::now()));
190 if file.seek(SeekFrom::Start(0)).is_err() {
191 return false;
192 }
193 let mut file = match self.trace_file_from_file(file) {
194 Ok(f) => f,
195 Err(err) => {
196 println!("Error creating zstd decoder: {err}");
197 return false;
198 }
199 };
200
201 let mut buffer = Vec::new();
202 let mut index = 0;
203
204 let mut chunk = vec![0; 64 * 1024 * 1024];
205 loop {
206 match file.read(&mut chunk) {
207 Ok(bytes_read) => {
208 if bytes_read == 0 {
209 if let Some(value) = self.wait_for_more_data(
210 &mut file,
211 &mut initial_read,
212 format.as_ref().map(|(f, _)| f),
213 ) {
214 return value;
215 }
216 } else {
217 if index > 0 && buffer.len() + bytes_read > buffer.capacity() {
220 buffer.splice(..index, std::iter::empty());
221 index = 0;
222 }
223 buffer.extend_from_slice(&chunk[..bytes_read]);
224 if format.is_none() && buffer.len() >= 8 {
225 let erased_format = if buffer.starts_with(b"TRACEv0") {
226 index = 7;
227 ErasedTraceFormat(Box::new(TurbopackFormat::new(
228 self.store.clone(),
229 )))
230 } else if buffer.starts_with(b"[{\"name\"") {
231 ErasedTraceFormat(Box::new(NextJsFormat::new(self.store.clone())))
232 } else if buffer.starts_with(b"v ") {
233 ErasedTraceFormat(Box::new(HeaptrackFormat::new(
234 self.store.clone(),
235 )))
236 } else {
237 ErasedTraceFormat(Box::new(TurbopackFormat::new(
240 self.store.clone(),
241 )))
242 };
243 let reuse = erased_format.create_reused();
244 format = Some((erased_format, reuse));
245 }
246 if let Some((format, reuse)) = &mut format {
247 match format.read(&buffer[index..], reuse) {
248 Ok(bytes_read) => {
249 index += bytes_read;
250 }
251 Err(err) => {
252 println!("Trace file error: {err}");
253 return true;
254 }
255 }
256 if self.store.want_to_read() {
257 thread::yield_now();
258 }
259 let prev_read = current_read;
260 current_read += bytes_read as u64;
261 if let Some((total, start)) = &mut initial_read {
262 let old_mbs = prev_read / (97 * 1024 * 1024);
263 let new_mbs = current_read / (97 * 1024 * 1024);
264 if old_mbs != new_mbs {
265 let pos = file.stream_position().unwrap_or(current_read);
266 if pos > *total {
267 *total = file.size().unwrap_or(pos);
268 }
269 *total = (*total).max(pos);
270 let percentage = pos * 100 / *total;
271 let read = pos / (1024 * 1024);
272 let uncompressed = current_read / (1024 * 1024);
273 let total = *total / (1024 * 1024);
274 let stats = format.stats();
275 print!(
276 "{}% read ({}/{} MB, {} MB/s)",
277 percentage,
278 read,
279 total,
280 read * 1000 / (start.elapsed().as_millis() + 1) as u64
281 );
282 if uncompressed != read {
283 print!(" ({uncompressed} MB uncompressed)");
284 }
285 if stats.is_empty() {
286 println!();
287 } else {
288 println!(" - {stats}");
289 }
290 }
291 }
292 if current_read >= stop_at {
293 println!(
294 "Stopped reading file as requested by STOP_AT env var. \
295 Waiting for new file..."
296 );
297 self.wait_for_new_file(&mut file);
298 return true;
299 }
300 }
301 }
302 }
303 Err(err) => {
304 if err.kind() == io::ErrorKind::UnexpectedEof
305 || err.kind() == io::ErrorKind::InvalidInput
306 {
307 if let Some(value) = self.wait_for_more_data(
308 &mut file,
309 &mut initial_read,
310 format.as_ref().map(|(f, _)| f),
311 ) {
312 return value;
313 }
314 } else {
315 println!("Error reading trace file: {err:?}");
317 return true;
318 }
319 }
320 }
321 }
322 }
323
324 fn wait_for_more_data(
325 &mut self,
326 file: &mut TraceFile,
327 initial_read: &mut Option<(u64, Instant)>,
328 format: Option<&ErasedTraceFormat>,
329 ) -> Option<bool> {
330 let Ok(pos) = file.stream_position() else {
331 return Some(true);
332 };
333 if let Some((total, start)) = initial_read.take() {
334 if let Some(format) = format {
335 let stats = format.stats();
336 println!("{stats}");
337 }
338 if total > MIN_INITIAL_REPORT_SIZE {
339 println!(
340 "Initial read completed ({} MB, {}s)",
341 total / (1024 * 1024),
342 (start.elapsed().as_millis() / 100) as f32 / 10.0
343 );
344 }
345 }
346 loop {
347 thread::sleep(Duration::from_millis(100));
349 let Ok(mut real_file) = File::open(&self.path) else {
350 return Some(true);
351 };
352 let Ok(end) = real_file.seek(SeekFrom::End(0)) else {
353 return Some(true);
354 };
355 if end < pos {
356 return Some(true);
358 } else if end != pos {
359 return None;
361 }
362 }
363 }
364
365 fn wait_for_new_file(&self, file: &mut TraceFile) {
366 let Ok(pos) = file.stream_position() else {
367 return;
368 };
369 loop {
370 thread::sleep(Duration::from_millis(1000));
371 let Ok(end) = file.seek(SeekFrom::End(0)) else {
372 return;
373 };
374 if end < pos {
375 return;
376 }
377 }
378 }
379}