turbo_trace_server/reader/
mod.rs1mod heaptrack;
2mod nextjs;
3mod turbopack;
4
5use std::{
6 any::Any,
7 env,
8 fs::File,
9 io::{self, BufReader, Read, Seek, SeekFrom},
10 path::PathBuf,
11 sync::Arc,
12 thread::{self, JoinHandle},
13 time::{Duration, Instant},
14};
15
16use anyhow::Result;
17use flate2::bufread::GzDecoder;
18
19use crate::{
20 reader::{heaptrack::HeaptrackFormat, nextjs::NextJsFormat, turbopack::TurbopackFormat},
21 store_container::StoreContainer,
22};
23
24const MIN_INITIAL_REPORT_SIZE: u64 = 100 * 1024 * 1024;
25
26trait TraceFormat {
27 type Reused: Default;
28 fn read(&mut self, buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize>;
29 fn stats(&self) -> String {
30 String::new()
31 }
32}
33
34type ErasedReused = Box<dyn Any>;
35
36struct ErasedTraceFormat(Box<dyn ObjectSafeTraceFormat>);
37
38trait ObjectSafeTraceFormat {
39 fn create_reused(&self) -> ErasedReused;
40 fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize>;
41 fn stats(&self) -> String;
42}
43
44impl<T: TraceFormat> ObjectSafeTraceFormat for T
45where
46 T::Reused: 'static,
47{
48 fn create_reused(&self) -> ErasedReused {
49 Box::new(T::Reused::default())
50 }
51
52 fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
53 let reuse = reuse.downcast_mut().expect("Type of reuse is invalid");
54 TraceFormat::read(self, buffer, reuse)
55 }
56
57 fn stats(&self) -> String {
58 TraceFormat::stats(self)
59 }
60}
61
62impl ObjectSafeTraceFormat for ErasedTraceFormat {
63 fn create_reused(&self) -> ErasedReused {
64 self.0.create_reused()
65 }
66
67 fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
68 self.0.read(buffer, reuse)
69 }
70
71 fn stats(&self) -> String {
72 self.0.stats()
73 }
74}
75
76#[derive(Default)]
77enum TraceFile {
78 Raw(File),
79 Zstd(zstd::Decoder<'static, BufReader<File>>),
80 Gz(GzDecoder<BufReader<File>>),
81 #[default]
82 Unloaded,
83}
84
85impl TraceFile {
86 fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
87 match self {
88 Self::Raw(file) => file.read(buffer),
89 Self::Zstd(decoder) => decoder.read(buffer),
90 Self::Gz(decoder) => decoder.read(buffer),
91 Self::Unloaded => unreachable!(),
92 }
93 }
94
95 fn stream_position(&mut self) -> io::Result<u64> {
96 match self {
97 Self::Raw(file) => file.stream_position(),
98 Self::Zstd(decoder) => decoder.get_mut().stream_position(),
99 Self::Gz(decoder) => decoder.get_mut().stream_position(),
100 Self::Unloaded => unreachable!(),
101 }
102 }
103
104 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
105 match self {
106 Self::Raw(file) => file.seek(pos),
107 Self::Zstd(decoder) => decoder.get_mut().seek(pos),
108 Self::Gz(decoder) => decoder.get_mut().seek(pos),
109 Self::Unloaded => unreachable!(),
110 }
111 }
112
113 fn size(&mut self) -> io::Result<u64> {
114 match self {
115 Self::Raw(file) => file.metadata().map(|m| m.len()),
116 Self::Zstd(decoder) => decoder.get_mut().get_ref().metadata().map(|m| m.len()),
117 Self::Gz(decoder) => decoder.get_mut().get_ref().metadata().map(|m| m.len()),
118 Self::Unloaded => unreachable!(),
119 }
120 }
121}
122
123pub struct TraceReader {
124 store: Arc<StoreContainer>,
125 path: PathBuf,
126}
127
128impl TraceReader {
129 pub fn spawn(store: Arc<StoreContainer>, path: PathBuf) -> JoinHandle<()> {
130 let mut reader = Self { store, path };
131 std::thread::spawn(move || reader.run())
132 }
133
134 pub fn run(&mut self) {
135 let mut file_warning_printed = false;
136 loop {
137 let read_success = self.try_read();
138 if !file_warning_printed && !read_success {
139 println!("Unable to read trace file at {:?}, waiting...", self.path);
140 file_warning_printed = true;
141 }
142 thread::sleep(Duration::from_millis(500));
143 }
144 }
145
146 fn trace_file_from_file(&self, file: File) -> io::Result<TraceFile> {
147 let path = &self.path.to_string_lossy();
148 Ok(if path.ends_with(".zst") {
149 TraceFile::Zstd(zstd::Decoder::new(file)?)
150 } else if path.ends_with(".gz") {
151 TraceFile::Gz(GzDecoder::new(BufReader::new(file)))
152 } else {
153 TraceFile::Raw(file)
154 })
155 }
156
157 fn try_read(&mut self) -> bool {
158 let Ok(mut file) = File::open(&self.path) else {
159 return false;
160 };
161 println!("Trace file opened");
162 let stop_at = env::var("STOP_AT")
163 .unwrap_or_default()
164 .parse()
165 .map_or(u64::MAX, |v: u64| v * 1024 * 1024);
166 if stop_at != u64::MAX {
167 println!("Will stop reading file at {} MB", stop_at / 1024 / 1024)
168 }
169
170 {
171 let mut store = self.store.write();
172 store.reset();
173 }
174
175 let mut format: Option<(ErasedTraceFormat, ErasedReused)> = None;
176
177 let mut current_read = 0;
178 let mut initial_read = file
179 .seek(SeekFrom::End(0))
180 .ok()
181 .map(|total| (total, Instant::now()));
182 if file.seek(SeekFrom::Start(0)).is_err() {
183 return false;
184 }
185 let mut file = match self.trace_file_from_file(file) {
186 Ok(f) => f,
187 Err(err) => {
188 println!("Error creating zstd decoder: {err}");
189 return false;
190 }
191 };
192
193 let mut buffer = Vec::new();
194 let mut index = 0;
195
196 let mut chunk = vec![0; 64 * 1024 * 1024];
197 loop {
198 match file.read(&mut chunk) {
199 Ok(bytes_read) => {
200 if bytes_read == 0 {
201 if let Some(value) = self.wait_for_more_data(
202 &mut file,
203 &mut initial_read,
204 format.as_ref().map(|(f, _)| f),
205 ) {
206 return value;
207 }
208 } else {
209 if index > 0 && buffer.len() + bytes_read > buffer.capacity() {
212 buffer.splice(..index, std::iter::empty());
213 index = 0;
214 }
215 buffer.extend_from_slice(&chunk[..bytes_read]);
216 if format.is_none() && buffer.len() >= 8 {
217 let erased_format = if buffer.starts_with(b"TRACEv0") {
218 index = 7;
219 ErasedTraceFormat(Box::new(TurbopackFormat::new(
220 self.store.clone(),
221 )))
222 } else if buffer.starts_with(b"[{\"name\"") {
223 ErasedTraceFormat(Box::new(NextJsFormat::new(self.store.clone())))
224 } else if buffer.starts_with(b"v ") {
225 ErasedTraceFormat(Box::new(HeaptrackFormat::new(
226 self.store.clone(),
227 )))
228 } else {
229 ErasedTraceFormat(Box::new(TurbopackFormat::new(
232 self.store.clone(),
233 )))
234 };
235 let reuse = erased_format.create_reused();
236 format = Some((erased_format, reuse));
237 }
238 if let Some((format, reuse)) = &mut format {
239 match format.read(&buffer[index..], reuse) {
240 Ok(bytes_read) => {
241 index += bytes_read;
242 }
243 Err(err) => {
244 println!("Trace file error: {err}");
245 return true;
246 }
247 }
248 if self.store.want_to_read() {
249 thread::yield_now();
250 }
251 let prev_read = current_read;
252 current_read += bytes_read as u64;
253 if let Some((total, start)) = &mut initial_read {
254 let old_mbs = prev_read / (97 * 1024 * 1024);
255 let new_mbs = current_read / (97 * 1024 * 1024);
256 if old_mbs != new_mbs {
257 let pos = file.stream_position().unwrap_or(current_read);
258 if pos > *total {
259 *total = file.size().unwrap_or(pos);
260 }
261 *total = (*total).max(pos);
262 let percentage = pos * 100 / *total;
263 let read = pos / (1024 * 1024);
264 let uncompressed = current_read / (1024 * 1024);
265 let total = *total / (1024 * 1024);
266 let stats = format.stats();
267 print!(
268 "{}% read ({}/{} MB, {} MB/s)",
269 percentage,
270 read,
271 total,
272 read * 1000 / (start.elapsed().as_millis() + 1) as u64
273 );
274 if uncompressed != read {
275 print!(" ({uncompressed} MB uncompressed)");
276 }
277 if stats.is_empty() {
278 println!();
279 } else {
280 println!(" - {stats}");
281 }
282 }
283 }
284 if current_read >= stop_at {
285 println!(
286 "Stopped reading file as requested by STOP_AT env var. \
287 Waiting for new file..."
288 );
289 self.wait_for_new_file(&mut file);
290 return true;
291 }
292 }
293 }
294 }
295 Err(err) => {
296 if err.kind() == io::ErrorKind::UnexpectedEof {
297 if let Some(value) = self.wait_for_more_data(
298 &mut file,
299 &mut initial_read,
300 format.as_ref().map(|(f, _)| f),
301 ) {
302 return value;
303 }
304 } else {
305 println!("Error reading trace file: {err:?}");
307 return true;
308 }
309 }
310 }
311 }
312 }
313
314 fn wait_for_more_data(
315 &mut self,
316 file: &mut TraceFile,
317 initial_read: &mut Option<(u64, Instant)>,
318 format: Option<&ErasedTraceFormat>,
319 ) -> Option<bool> {
320 let Ok(pos) = file.stream_position() else {
321 return Some(true);
322 };
323 if let Some((total, start)) = initial_read.take() {
324 if let Some(format) = format {
325 let stats = format.stats();
326 println!("{stats}");
327 }
328 if total > MIN_INITIAL_REPORT_SIZE {
329 println!(
330 "Initial read completed ({} MB, {}s)",
331 total / (1024 * 1024),
332 (start.elapsed().as_millis() / 100) as f32 / 10.0
333 );
334 }
335 }
336 loop {
337 thread::sleep(Duration::from_millis(100));
339 let Ok(mut real_file) = File::open(&self.path) else {
340 return Some(true);
341 };
342 let Ok(end) = real_file.seek(SeekFrom::End(0)) else {
343 return Some(true);
344 };
345 if end < pos {
346 return Some(true);
348 } else if end != pos {
349 return None;
351 }
352 }
353 }
354
355 fn wait_for_new_file(&self, file: &mut TraceFile) {
356 let Ok(pos) = file.stream_position() else {
357 return;
358 };
359 loop {
360 thread::sleep(Duration::from_millis(1000));
361 let Ok(end) = file.seek(SeekFrom::End(0)) else {
362 return;
363 };
364 if end < pos {
365 return;
366 }
367 }
368 }
369}