turbopack_trace_utils/
exit.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    sync::{Arc, Mutex, OnceLock},
5};
6
7use anyhow::Result;
8use tokio::{select, sync::mpsc, task::JoinSet};
9
10/// A guard for the exit handler. When dropped, the exit guard will be dropped.
11/// It might also be dropped on Ctrl-C.
12pub struct ExitGuard<T>(Arc<Mutex<Option<T>>>);
13
14impl<T> Drop for ExitGuard<T> {
15    fn drop(&mut self) {
16        drop(self.0.lock().unwrap().take())
17    }
18}
19
20impl<T: Send + 'static> ExitGuard<T> {
21    /// Drop a guard when Ctrl-C is pressed or the [ExitGuard] is dropped.
22    pub fn new(guard: T) -> Result<Self> {
23        let guard = Arc::new(Mutex::new(Some(guard)));
24        {
25            let guard = guard.clone();
26            tokio::spawn(async move {
27                tokio::signal::ctrl_c().await.unwrap();
28                drop(guard.lock().unwrap().take());
29                std::process::exit(0);
30            });
31        }
32        Ok(ExitGuard(guard))
33    }
34}
35
36type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
37
38/// The singular global ExitHandler. This is primarily used to ensure
39/// `ExitHandler::listen` is only called once.
40///
41/// The global handler is intentionally not exposed, so that APIs that depend on
42/// exit behavior are required to take the `ExitHandler`. This ensures that the
43/// `ExitHandler` is configured before these APIs are run, and that these
44/// consumers can be used with a callback (e.g. a mock) instead.
45static GLOBAL_EXIT_HANDLER: OnceLock<Arc<ExitHandler>> = OnceLock::new();
46
47pub struct ExitHandler {
48    tx: mpsc::UnboundedSender<BoxExitFuture>,
49}
50
51impl ExitHandler {
52    /// Waits for `SIGINT` using [`tokio::signal::ctrl_c`], and exits the
53    /// process with exit code `0` after running any futures scheduled with
54    /// [`ExitHandler::on_exit`].
55    ///
56    /// As this uses global process signals, this must only be called once, and
57    /// will panic if called multiple times. Use this when you own the
58    /// process (e.g. `turbopack-cli`).
59    ///
60    /// If you don't own the process (e.g. you're called as a library, such as
61    /// in `next-swc`), use [`ExitHandler::new_trigger`] instead.
62    ///
63    /// This may listen for other signals, like `SIGTERM` or `SIGPIPE` in the
64    /// future.
65    pub fn listen() -> &'static Arc<ExitHandler> {
66        let (handler, receiver) = Self::new_receiver();
67        if GLOBAL_EXIT_HANDLER.set(handler).is_err() {
68            panic!("ExitHandler::listen must only be called once");
69        }
70        tokio::spawn(async move {
71            tokio::signal::ctrl_c()
72                .await
73                .expect("failed to set ctrl_c handler");
74            receiver.run_exit_handler().await;
75            std::process::exit(0);
76        });
77        GLOBAL_EXIT_HANDLER.get().expect("value is set")
78    }
79
80    /// Creates an [`ExitHandler`] that can be manually controlled with an
81    /// [`ExitReceiver`].
82    ///
83    /// This does not actually exit the process or listen for any signals. If
84    /// you'd like that behavior, use [`ExitHandler::listen`].
85    ///
86    /// Because this API has no global side-effects and can be called many times
87    /// within the same process, it is possible to use it to provide a mock
88    /// [`ExitHandler`] inside unit tests.
89    pub fn new_receiver() -> (Arc<ExitHandler>, ExitReceiver) {
90        let (tx, rx) = mpsc::unbounded_channel();
91        (Arc::new(ExitHandler { tx }), ExitReceiver { rx })
92    }
93
94    /// Register this given [`Future`] to run upon process exit.
95    ///
96    /// As there are many ways for a process be killed that are outside of a
97    /// process's own control (e.g. `SIGKILL` or `SIGSEGV`), this API is
98    /// provided on a best-effort basis.
99    pub fn on_exit(&self, fut: impl Future<Output = ()> + Send + 'static) {
100        // realistically, this error case can only happen with the `new_receiver` API.
101        self.tx
102            .send(Box::pin(fut))
103            .expect("cannot send future after process exit");
104    }
105}
106
107/// Provides a way to run futures scheduled with an [`ExitHandler`].
108pub struct ExitReceiver {
109    rx: mpsc::UnboundedReceiver<BoxExitFuture>,
110}
111
112impl ExitReceiver {
113    /// Call this when the process exits to run the futures scheduled via
114    /// [`ExitHandler::on_exit`].
115    ///
116    /// As this is intended to be used in a library context, this does not exit
117    /// the process. It is expected that the process will not exit until
118    /// this async method finishes executing.
119    ///
120    /// Additional work can be scheduled using [`ExitHandler::on_exit`] even
121    /// while this is running, and it will execute before this function
122    /// finishes. Work attempted to be scheduled after this finishes will panic.
123    pub async fn run_exit_handler(mut self) {
124        let mut set = JoinSet::new();
125        while let Ok(fut) = self.rx.try_recv() {
126            set.spawn(fut);
127        }
128        loop {
129            select! {
130                biased;
131                Some(fut) = self.rx.recv() => {
132                    set.spawn(fut);
133                },
134                val = set.join_next() => {
135                    match val {
136                        Some(Ok(())) => {}
137                        Some(Err(_)) => panic!("ExitHandler future panicked!"),
138                        None => return,
139                    }
140                },
141            }
142        }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this
149    use std::{
150        future::Future,
151        pin::Pin,
152        sync::{
153            Arc,
154            atomic::{AtomicBool, AtomicU32, Ordering},
155        },
156    };
157
158    use super::ExitHandler;
159
160    #[tokio::test]
161    async fn test_on_exit() {
162        let (handler, receiver) = ExitHandler::new_receiver();
163
164        let called = Arc::new(AtomicBool::new(false));
165        handler.on_exit({
166            let called = Arc::clone(&called);
167            async move {
168                tokio::task::yield_now().await;
169                called.store(true, Ordering::SeqCst);
170            }
171        });
172
173        receiver.run_exit_handler().await;
174        assert!(called.load(Ordering::SeqCst));
175    }
176
177    #[tokio::test]
178    async fn test_queue_while_exiting() {
179        let (handler, receiver) = ExitHandler::new_receiver();
180        let call_count = Arc::new(AtomicU32::new(0));
181
182        type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
183
184        // this struct is needed to construct the recursive closure type
185        #[derive(Clone)]
186        struct GetFut {
187            handler: Arc<ExitHandler>,
188            call_count: Arc<AtomicU32>,
189        }
190
191        impl GetFut {
192            fn get(self) -> BoxExitFuture {
193                Box::pin(async move {
194                    tokio::task::yield_now().await;
195                    if self.call_count.fetch_add(1, Ordering::SeqCst) < 99 {
196                        // queue more work while the exit handler is running
197                        Arc::clone(&self.handler).on_exit(self.get())
198                    }
199                })
200            }
201        }
202
203        handler.on_exit(
204            GetFut {
205                handler: Arc::clone(&handler),
206                call_count: Arc::clone(&call_count),
207            }
208            .get(),
209        );
210        receiver.run_exit_handler().await;
211        assert_eq!(call_count.load(Ordering::SeqCst), 100);
212    }
213}