turbopack_trace_utils/exit.rs
1use std::{
2 future::Future,
3 pin::Pin,
4 sync::{Arc, Mutex, OnceLock},
5};
6
7use anyhow::Result;
8use tokio::{select, sync::mpsc, task::JoinSet};
9
10/// A guard for the exit handler. When dropped, the exit guard will be dropped.
11/// It might also be dropped on Ctrl-C.
12pub struct ExitGuard<T>(Arc<Mutex<Option<T>>>);
13
14impl<T> Drop for ExitGuard<T> {
15 fn drop(&mut self) {
16 drop(self.0.lock().unwrap().take())
17 }
18}
19
20impl<T: Send + 'static> ExitGuard<T> {
21 /// Drop a guard when Ctrl-C is pressed or the [ExitGuard] is dropped.
22 pub fn new(guard: T) -> Result<Self> {
23 let guard = Arc::new(Mutex::new(Some(guard)));
24 {
25 let guard = guard.clone();
26 tokio::spawn(async move {
27 tokio::signal::ctrl_c().await.unwrap();
28 drop(guard.lock().unwrap().take());
29 std::process::exit(0);
30 });
31 }
32 Ok(ExitGuard(guard))
33 }
34}
35
36type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
37
38/// The singular global ExitHandler. This is primarily used to ensure
39/// `ExitHandler::listen` is only called once.
40///
41/// The global handler is intentionally not exposed, so that APIs that depend on
42/// exit behavior are required to take the `ExitHandler`. This ensures that the
43/// `ExitHandler` is configured before these APIs are run, and that these
44/// consumers can be used with a callback (e.g. a mock) instead.
45static GLOBAL_EXIT_HANDLER: OnceLock<Arc<ExitHandler>> = OnceLock::new();
46
47pub struct ExitHandler {
48 tx: mpsc::UnboundedSender<BoxExitFuture>,
49}
50
51impl ExitHandler {
52 /// Waits for `SIGINT` using [`tokio::signal::ctrl_c`], and exits the
53 /// process with exit code `0` after running any futures scheduled with
54 /// [`ExitHandler::on_exit`].
55 ///
56 /// As this uses global process signals, this must only be called once, and
57 /// will panic if called multiple times. Use this when you own the
58 /// process (e.g. `turbopack-cli`).
59 ///
60 /// If you don't own the process (e.g. you're called as a library, such as
61 /// in `next-swc`), use [`ExitHandler::new_trigger`] instead.
62 ///
63 /// This may listen for other signals, like `SIGTERM` or `SIGPIPE` in the
64 /// future.
65 pub fn listen() -> &'static Arc<ExitHandler> {
66 let (handler, receiver) = Self::new_receiver();
67 if GLOBAL_EXIT_HANDLER.set(handler).is_err() {
68 panic!("ExitHandler::listen must only be called once");
69 }
70 tokio::spawn(async move {
71 tokio::signal::ctrl_c()
72 .await
73 .expect("failed to set ctrl_c handler");
74 receiver.run_exit_handler().await;
75 std::process::exit(0);
76 });
77 GLOBAL_EXIT_HANDLER.get().expect("value is set")
78 }
79
80 /// Creates an [`ExitHandler`] that can be manually controlled with an
81 /// [`ExitReceiver`].
82 ///
83 /// This does not actually exit the process or listen for any signals. If
84 /// you'd like that behavior, use [`ExitHandler::listen`].
85 ///
86 /// Because this API has no global side-effects and can be called many times
87 /// within the same process, it is possible to use it to provide a mock
88 /// [`ExitHandler`] inside unit tests.
89 pub fn new_receiver() -> (Arc<ExitHandler>, ExitReceiver) {
90 let (tx, rx) = mpsc::unbounded_channel();
91 (Arc::new(ExitHandler { tx }), ExitReceiver { rx })
92 }
93
94 /// Register this given [`Future`] to run upon process exit.
95 ///
96 /// As there are many ways for a process be killed that are outside of a
97 /// process's own control (e.g. `SIGKILL` or `SIGSEGV`), this API is
98 /// provided on a best-effort basis.
99 pub fn on_exit(&self, fut: impl Future<Output = ()> + Send + 'static) {
100 // realistically, this error case can only happen with the `new_receiver` API.
101 self.tx
102 .send(Box::pin(fut))
103 .expect("cannot send future after process exit");
104 }
105}
106
107/// Provides a way to run futures scheduled with an [`ExitHandler`].
108pub struct ExitReceiver {
109 rx: mpsc::UnboundedReceiver<BoxExitFuture>,
110}
111
112impl ExitReceiver {
113 /// Call this when the process exits to run the futures scheduled via
114 /// [`ExitHandler::on_exit`].
115 ///
116 /// As this is intended to be used in a library context, this does not exit
117 /// the process. It is expected that the process will not exit until
118 /// this async method finishes executing.
119 ///
120 /// Additional work can be scheduled using [`ExitHandler::on_exit`] even
121 /// while this is running, and it will execute before this function
122 /// finishes. Work attempted to be scheduled after this finishes will panic.
123 pub async fn run_exit_handler(mut self) {
124 let mut set = JoinSet::new();
125 while let Ok(fut) = self.rx.try_recv() {
126 set.spawn(fut);
127 }
128 loop {
129 select! {
130 biased;
131 Some(fut) = self.rx.recv() => {
132 set.spawn(fut);
133 },
134 val = set.join_next() => {
135 match val {
136 Some(Ok(())) => {}
137 Some(Err(_)) => panic!("ExitHandler future panicked!"),
138 None => return,
139 }
140 },
141 }
142 }
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this
149 use std::{
150 future::Future,
151 pin::Pin,
152 sync::{
153 Arc,
154 atomic::{AtomicBool, AtomicU32, Ordering},
155 },
156 };
157
158 use super::ExitHandler;
159
160 #[tokio::test]
161 async fn test_on_exit() {
162 let (handler, receiver) = ExitHandler::new_receiver();
163
164 let called = Arc::new(AtomicBool::new(false));
165 handler.on_exit({
166 let called = Arc::clone(&called);
167 async move {
168 tokio::task::yield_now().await;
169 called.store(true, Ordering::SeqCst);
170 }
171 });
172
173 receiver.run_exit_handler().await;
174 assert!(called.load(Ordering::SeqCst));
175 }
176
177 #[tokio::test]
178 async fn test_queue_while_exiting() {
179 let (handler, receiver) = ExitHandler::new_receiver();
180 let call_count = Arc::new(AtomicU32::new(0));
181
182 type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
183
184 // this struct is needed to construct the recursive closure type
185 #[derive(Clone)]
186 struct GetFut {
187 handler: Arc<ExitHandler>,
188 call_count: Arc<AtomicU32>,
189 }
190
191 impl GetFut {
192 fn get(self) -> BoxExitFuture {
193 Box::pin(async move {
194 tokio::task::yield_now().await;
195 if self.call_count.fetch_add(1, Ordering::SeqCst) < 99 {
196 // queue more work while the exit handler is running
197 Arc::clone(&self.handler).on_exit(self.get())
198 }
199 })
200 }
201 }
202
203 handler.on_exit(
204 GetFut {
205 handler: Arc::clone(&handler),
206 call_count: Arc::clone(&call_count),
207 }
208 .get(),
209 );
210 receiver.run_exit_handler().await;
211 assert_eq!(call_count.load(Ordering::SeqCst), 100);
212 }
213}