turbopack_trace_utils/
trace_writer.rs1use std::{debug_assert, io::Write, sync::Arc, thread::JoinHandle, time::Duration};
2
3use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, TryRecvError, bounded, unbounded};
4use crossbeam_utils::CachePadded;
5use parking_lot::{Mutex, MutexGuard};
6use thread_local::ThreadLocal;
7
8type ThreadLocalState = CachePadded<Mutex<Option<TraceInfoBuffer>>>;
9
10const THREAD_LOCAL_INITIAL_BUFFER_SIZE: usize = 1024 * 1024;
13const WRITE_BUFFER_SIZE: usize = 100 * 1024 * 1024;
15
16struct TraceInfoBuffer {
17 buffer: Vec<u8>,
18}
19
20impl TraceInfoBuffer {
21 fn new(capacity: usize) -> Self {
22 Self {
23 buffer: Vec::with_capacity(capacity),
24 }
25 }
26
27 fn push(&mut self, data: u8) {
28 self.buffer.push(data);
29 }
30
31 fn extend(&mut self, data: &[u8]) {
32 self.buffer.extend_from_slice(data);
33 }
34
35 fn clear(&mut self) {
36 self.buffer.clear();
37 }
38}
39
40#[derive(Clone)]
41pub struct TraceWriter {
42 data_tx: Sender<Option<TraceInfoBuffer>>,
43 return_rx: Receiver<TraceInfoBuffer>,
44 thread_locals: Arc<ThreadLocal<ThreadLocalState>>,
45}
46
47impl TraceWriter {
48 pub fn new<W: Write + Send + 'static>(mut writer: W) -> (Self, TraceWriterGuard) {
57 let (data_tx, data_rx) = unbounded::<Option<TraceInfoBuffer>>();
58 let (return_tx, return_rx) = bounded::<TraceInfoBuffer>(1024);
59 let thread_locals: Arc<ThreadLocal<ThreadLocalState>> = Default::default();
60
61 let trace_writer = Self {
62 data_tx: data_tx.clone(),
63 return_rx: return_rx.clone(),
64 thread_locals: thread_locals.clone(),
65 };
66
67 fn steal_from_thread_locals(
68 thread_locals: &Arc<ThreadLocal<ThreadLocalState>>,
69 stolen_buffers: &mut Vec<TraceInfoBuffer>,
70 ) {
71 for state in thread_locals.iter() {
72 let mut buffer = state.lock();
73 if let Some(buffer) = buffer.take() {
74 stolen_buffers.push(buffer);
75 }
76 }
77 }
78
79 let handle: std::thread::JoinHandle<()> = std::thread::spawn(move || {
80 let _ = writer.write(b"TRACEv0");
81 let mut buf = Vec::with_capacity(WRITE_BUFFER_SIZE);
82 let mut stolen_buffers = Vec::new();
83 let mut should_exit = false;
84 'outer: loop {
85 if !buf.is_empty() {
86 let _ = writer.write_all(&buf);
87 let _ = writer.flush();
88 buf.clear();
89 }
90
91 let recv = if should_exit {
92 Ok(None)
93 } else {
94 data_rx.recv_timeout(Duration::from_secs(1))
95 };
96
97 let mut data = match recv {
98 Ok(Some(data)) => data,
99 result => {
100 if result.is_ok() {
101 should_exit = true;
103 }
104 steal_from_thread_locals(&thread_locals, &mut stolen_buffers);
109 if let Some(data) = stolen_buffers.pop() {
110 data
111 } else {
112 match result {
113 Ok(Some(_)) => unreachable!(),
114 Ok(None) | Err(RecvTimeoutError::Disconnected) => {
115 break 'outer;
117 }
118 Err(RecvTimeoutError::Timeout) => {
119 continue;
121 }
122 }
123 }
124 }
125 };
126 if data.buffer.len() > buf.capacity() {
127 let _ = writer.write_all(&data.buffer);
128 } else {
129 buf.extend_from_slice(&data.buffer);
130 }
131 data.clear();
132 let _ = return_tx.try_send(data);
133 loop {
134 let recv = stolen_buffers.pop().map(Some).ok_or(()).or_else(|_| {
135 if should_exit {
136 Ok(None)
137 } else {
138 data_rx.try_recv()
139 }
140 });
141 match recv {
142 Ok(Some(mut data)) => {
143 let data_buffer = &data.buffer;
144 if data_buffer.is_empty() {
145 break 'outer;
146 }
147 if buf.len() + data_buffer.len() > buf.capacity() {
148 let _ = writer.write_all(&buf);
149 buf.clear();
150 if data_buffer.len() > buf.capacity() {
151 let _ = writer.write_all(data_buffer);
152 } else {
153 buf.extend_from_slice(data_buffer);
154 }
155 } else {
156 buf.extend_from_slice(data_buffer);
157 }
158 data.clear();
159 let _ = return_tx.try_send(data);
160 }
161 Ok(None) | Err(TryRecvError::Disconnected) => {
162 should_exit = true;
163 break;
164 }
165 Err(TryRecvError::Empty) => {
166 break;
167 }
168 }
169 }
170 }
171 drop(writer);
172 });
173
174 let guard = TraceWriterGuard {
175 data_tx: Some(data_tx),
176 return_rx: Some(return_rx),
177 handle: Some(handle),
178 };
179 (trace_writer, guard)
180 }
181
182 fn send(&self, data: TraceInfoBuffer) {
183 debug_assert!(!data.buffer.is_empty());
184 let _ = self.data_tx.send(Some(data));
185 }
186
187 fn get_empty_buffer(&self, capacity: usize) -> TraceInfoBuffer {
188 self.return_rx
189 .try_recv()
190 .ok()
191 .unwrap_or_else(|| TraceInfoBuffer::new(capacity))
192 }
193
194 pub fn start_write(&self) -> WriteGuard<'_> {
195 let thread_local_buffer = self.thread_locals.get_or_default();
196 let buffer = thread_local_buffer.lock();
197 WriteGuard::new(buffer, self)
198 }
199}
200
201pub struct TraceWriterGuard {
202 data_tx: Option<Sender<Option<TraceInfoBuffer>>>,
203 return_rx: Option<Receiver<TraceInfoBuffer>>,
204 handle: Option<JoinHandle<()>>,
205}
206
207impl Drop for TraceWriterGuard {
208 fn drop(&mut self) {
209 let _ = self.data_tx.take().unwrap().send(None);
211 let return_rx = self.return_rx.take().unwrap();
213 while return_rx.recv().is_ok() {}
214 let _ = self.handle.take().unwrap().join();
216 }
217}
218
219pub struct WriteGuard<'l> {
220 buffer: MutexGuard<'l, Option<TraceInfoBuffer>>,
222 trace_writer: &'l TraceWriter,
223}
224
225impl<'l> WriteGuard<'l> {
226 fn new(
227 mut buffer: MutexGuard<'l, Option<TraceInfoBuffer>>,
228 trace_writer: &'l TraceWriter,
229 ) -> Self {
230 if buffer.is_none() {
232 *buffer = Some(trace_writer.get_empty_buffer(THREAD_LOCAL_INITIAL_BUFFER_SIZE));
233 };
234 Self {
235 buffer,
236 trace_writer,
237 }
238 }
239
240 fn buffer(&mut self) -> &mut TraceInfoBuffer {
241 unsafe { self.buffer.as_mut().unwrap_unchecked() }
243 }
244
245 pub fn push(&mut self, data: u8) {
246 self.buffer().push(data);
247 }
248
249 pub fn extend(&mut self, data: &[u8]) {
250 self.buffer().extend(data);
251 }
252}
253
254impl Drop for WriteGuard<'_> {
255 fn drop(&mut self) {
256 if self.buffer().buffer.capacity() * 2 < self.buffer().buffer.len() * 3 {
257 let capacity = self.buffer().buffer.capacity();
258 let new_buffer = self.trace_writer.get_empty_buffer(capacity);
259 let buffer = std::mem::replace(self.buffer(), new_buffer);
260 self.trace_writer.send(buffer);
261 }
262 }
263}