turbopack_trace_utils/
trace_writer.rs

1use std::{debug_assert, io::Write, sync::Arc, thread::JoinHandle, time::Duration};
2
3use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, TryRecvError, bounded, unbounded};
4use crossbeam_utils::CachePadded;
5use parking_lot::{Mutex, MutexGuard};
6use thread_local::ThreadLocal;
7
8type ThreadLocalState = CachePadded<Mutex<Option<TraceInfoBuffer>>>;
9
10/// The amount of data that is accumulated in the thread local buffer before it is sent to the
11/// writer. The buffer might grow if a single write is larger than this size.
12const THREAD_LOCAL_INITIAL_BUFFER_SIZE: usize = 1024 * 1024;
13/// Data buffered by the write thread before issuing a filesystem write
14const WRITE_BUFFER_SIZE: usize = 100 * 1024 * 1024;
15
16struct TraceInfoBuffer {
17    buffer: Vec<u8>,
18}
19
20impl TraceInfoBuffer {
21    fn new(capacity: usize) -> Self {
22        Self {
23            buffer: Vec::with_capacity(capacity),
24        }
25    }
26
27    fn push(&mut self, data: u8) {
28        self.buffer.push(data);
29    }
30
31    fn extend(&mut self, data: &[u8]) {
32        self.buffer.extend_from_slice(data);
33    }
34
35    fn clear(&mut self) {
36        self.buffer.clear();
37    }
38}
39
40#[derive(Clone)]
41pub struct TraceWriter {
42    data_tx: Sender<Option<TraceInfoBuffer>>,
43    return_rx: Receiver<TraceInfoBuffer>,
44    thread_locals: Arc<ThreadLocal<ThreadLocalState>>,
45}
46
47impl TraceWriter {
48    /// This is a non-blocking writer that writes a file in a background thread.
49    /// This is inspired by tracing-appender non_blocking, but has some
50    /// differences:
51    /// * It allows writing an owned Vec<u8> instead of a reference, so avoiding additional
52    ///   allocation.
53    /// * It uses an unbounded channel to avoid slowing down the application at all (memory) cost.
54    /// * It issues less writes by buffering the data into chunks of WRITE_BUFFER_SIZE, when
55    ///   possible.
56    pub fn new<W: Write + Send + 'static>(mut writer: W) -> (Self, TraceWriterGuard) {
57        let (data_tx, data_rx) = unbounded::<Option<TraceInfoBuffer>>();
58        let (return_tx, return_rx) = bounded::<TraceInfoBuffer>(1024);
59        let thread_locals: Arc<ThreadLocal<ThreadLocalState>> = Default::default();
60
61        let trace_writer = Self {
62            data_tx: data_tx.clone(),
63            return_rx: return_rx.clone(),
64            thread_locals: thread_locals.clone(),
65        };
66
67        fn steal_from_thread_locals(
68            thread_locals: &Arc<ThreadLocal<ThreadLocalState>>,
69            stolen_buffers: &mut Vec<TraceInfoBuffer>,
70        ) {
71            for state in thread_locals.iter() {
72                let mut buffer = state.lock();
73                if let Some(buffer) = buffer.take() {
74                    stolen_buffers.push(buffer);
75                }
76            }
77        }
78
79        let handle: std::thread::JoinHandle<()> = std::thread::spawn(move || {
80            let _ = writer.write(b"TRACEv0");
81            let mut buf = Vec::with_capacity(WRITE_BUFFER_SIZE);
82            let mut stolen_buffers = Vec::new();
83            let mut should_exit = false;
84            'outer: loop {
85                if !buf.is_empty() {
86                    let _ = writer.write_all(&buf);
87                    let _ = writer.flush();
88                    buf.clear();
89                }
90
91                let recv = if should_exit {
92                    Ok(None)
93                } else {
94                    data_rx.recv_timeout(Duration::from_secs(1))
95                };
96
97                let mut data = match recv {
98                    Ok(Some(data)) => data,
99                    result => {
100                        if result.is_ok() {
101                            // On exit signal
102                            should_exit = true;
103                        }
104                        // When we receive no data for a second or we want to exit we poll the
105                        // thread local buffers to steal some data. This
106                        // prevents unsend data if a thread is hanging or the
107                        // system just go into idle.
108                        steal_from_thread_locals(&thread_locals, &mut stolen_buffers);
109                        if let Some(data) = stolen_buffers.pop() {
110                            data
111                        } else {
112                            match result {
113                                Ok(Some(_)) => unreachable!(),
114                                Ok(None) | Err(RecvTimeoutError::Disconnected) => {
115                                    // We should exit.
116                                    break 'outer;
117                                }
118                                Err(RecvTimeoutError::Timeout) => {
119                                    // No data stolen, wait again
120                                    continue;
121                                }
122                            }
123                        }
124                    }
125                };
126                if data.buffer.len() > buf.capacity() {
127                    let _ = writer.write_all(&data.buffer);
128                } else {
129                    buf.extend_from_slice(&data.buffer);
130                }
131                data.clear();
132                let _ = return_tx.try_send(data);
133                loop {
134                    let recv = stolen_buffers.pop().map(Some).ok_or(()).or_else(|_| {
135                        if should_exit {
136                            Ok(None)
137                        } else {
138                            data_rx.try_recv()
139                        }
140                    });
141                    match recv {
142                        Ok(Some(mut data)) => {
143                            let data_buffer = &data.buffer;
144                            if data_buffer.is_empty() {
145                                break 'outer;
146                            }
147                            if buf.len() + data_buffer.len() > buf.capacity() {
148                                let _ = writer.write_all(&buf);
149                                buf.clear();
150                                if data_buffer.len() > buf.capacity() {
151                                    let _ = writer.write_all(data_buffer);
152                                } else {
153                                    buf.extend_from_slice(data_buffer);
154                                }
155                            } else {
156                                buf.extend_from_slice(data_buffer);
157                            }
158                            data.clear();
159                            let _ = return_tx.try_send(data);
160                        }
161                        Ok(None) | Err(TryRecvError::Disconnected) => {
162                            should_exit = true;
163                            break;
164                        }
165                        Err(TryRecvError::Empty) => {
166                            break;
167                        }
168                    }
169                }
170            }
171            drop(writer);
172        });
173
174        let guard = TraceWriterGuard {
175            data_tx: Some(data_tx),
176            return_rx: Some(return_rx),
177            handle: Some(handle),
178        };
179        (trace_writer, guard)
180    }
181
182    fn send(&self, data: TraceInfoBuffer) {
183        debug_assert!(!data.buffer.is_empty());
184        let _ = self.data_tx.send(Some(data));
185    }
186
187    fn get_empty_buffer(&self, capacity: usize) -> TraceInfoBuffer {
188        self.return_rx
189            .try_recv()
190            .ok()
191            .unwrap_or_else(|| TraceInfoBuffer::new(capacity))
192    }
193
194    pub fn start_write(&self) -> WriteGuard<'_> {
195        let thread_local_buffer = self.thread_locals.get_or_default();
196        let buffer = thread_local_buffer.lock();
197        WriteGuard::new(buffer, self)
198    }
199}
200
201pub struct TraceWriterGuard {
202    data_tx: Option<Sender<Option<TraceInfoBuffer>>>,
203    return_rx: Option<Receiver<TraceInfoBuffer>>,
204    handle: Option<JoinHandle<()>>,
205}
206
207impl Drop for TraceWriterGuard {
208    fn drop(&mut self) {
209        // Send exit signal, we can't use disconnect since there is another instance in TraceWriter
210        let _ = self.data_tx.take().unwrap().send(None);
211        // Receive all return buffers and drop them here. The thread is already busy writing.
212        let return_rx = self.return_rx.take().unwrap();
213        while return_rx.recv().is_ok() {}
214        // Wait for the thread to finish completely
215        let _ = self.handle.take().unwrap().join();
216    }
217}
218
219pub struct WriteGuard<'l> {
220    // Safety: The buffer must not be None
221    buffer: MutexGuard<'l, Option<TraceInfoBuffer>>,
222    trace_writer: &'l TraceWriter,
223}
224
225impl<'l> WriteGuard<'l> {
226    fn new(
227        mut buffer: MutexGuard<'l, Option<TraceInfoBuffer>>,
228        trace_writer: &'l TraceWriter,
229    ) -> Self {
230        // Safety: The buffer must not be None, so we initialize it here
231        if buffer.is_none() {
232            *buffer = Some(trace_writer.get_empty_buffer(THREAD_LOCAL_INITIAL_BUFFER_SIZE));
233        };
234        Self {
235            buffer,
236            trace_writer,
237        }
238    }
239
240    fn buffer(&mut self) -> &mut TraceInfoBuffer {
241        // Safety: The struct invariant ensures that the buffer is not None
242        unsafe { self.buffer.as_mut().unwrap_unchecked() }
243    }
244
245    pub fn push(&mut self, data: u8) {
246        self.buffer().push(data);
247    }
248
249    pub fn extend(&mut self, data: &[u8]) {
250        self.buffer().extend(data);
251    }
252}
253
254impl Drop for WriteGuard<'_> {
255    fn drop(&mut self) {
256        if self.buffer().buffer.capacity() * 2 < self.buffer().buffer.len() * 3 {
257            let capacity = self.buffer().buffer.capacity();
258            let new_buffer = self.trace_writer.get_empty_buffer(capacity);
259            let buffer = std::mem::replace(self.buffer(), new_buffer);
260            self.trace_writer.send(buffer);
261        }
262    }
263}