Skip to main content

turbopack_trace_utils/
exit.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    sync::{Arc, Mutex, OnceLock},
5};
6
7use anyhow::Result;
8use tokio::{select, sync::mpsc, task::JoinSet};
9
10/// A guard for the exit handler. When dropped, the exit guard will be dropped.
11/// It might also be dropped on Ctrl-C.
12pub struct ExitGuard<T>(Arc<Mutex<Option<T>>>);
13
14impl<T> Drop for ExitGuard<T> {
15    fn drop(&mut self) {
16        drop(self.0.lock().unwrap().take())
17    }
18}
19
20impl<T: Send + 'static> ExitGuard<T> {
21    /// Drop a guard when Ctrl-C is pressed or the [ExitGuard] is dropped.
22    pub fn new(guard: T) -> Result<Self> {
23        let guard = Arc::new(Mutex::new(Some(guard)));
24        {
25            let guard = guard.clone();
26            tokio::spawn(async move {
27                tokio::signal::ctrl_c().await.unwrap();
28                drop(guard.lock().unwrap().take());
29                std::process::exit(0);
30            });
31        }
32        Ok(ExitGuard(guard))
33    }
34}
35
36type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
37
38/// The singular global ExitHandler. This is primarily used to ensure `ExitHandler::listen` is only
39/// called once.
40///
41/// The global handler is intentionally not exposed, so that APIs that depend on exit behavior are
42/// required to take the `ExitHandler`. This ensures that the `ExitHandler` is configured before
43/// these APIs are run, and that these consumers can be used with a callback (e.g. a mock) instead.
44static GLOBAL_EXIT_HANDLER: OnceLock<Arc<ExitHandler>> = OnceLock::new();
45
46pub struct ExitHandler {
47    tx: mpsc::UnboundedSender<BoxExitFuture>,
48}
49
50impl ExitHandler {
51    /// Waits for `SIGINT` using [`tokio::signal::ctrl_c`], and exits the process with exit code `0`
52    /// after running any futures scheduled with [`ExitHandler::on_exit`].
53    ///
54    /// As this uses global process signals, this must only be called once, and will panic if called
55    /// multiple times. Use this when you own the process (e.g. `turbopack-cli`).
56    ///
57    /// If you don't own the process (e.g. you're called as a library, such as in `next-swc`), use
58    /// [`ExitHandler::new_receiver`] instead.
59    ///
60    /// This may listen for other signals, like `SIGTERM` or `SIGPIPE` in the future.
61    pub fn listen() -> &'static Arc<ExitHandler> {
62        let (handler, receiver) = Self::new_receiver();
63        if GLOBAL_EXIT_HANDLER.set(handler).is_err() {
64            panic!("ExitHandler::listen must only be called once");
65        }
66        tokio::spawn(async move {
67            tokio::signal::ctrl_c()
68                .await
69                .expect("failed to set ctrl_c handler");
70            receiver.run_exit_handler().await;
71            std::process::exit(0);
72        });
73        GLOBAL_EXIT_HANDLER.get().expect("value is set")
74    }
75
76    /// Creates an [`ExitHandler`] that can be manually controlled with an [`ExitReceiver`].
77    ///
78    /// This does not actually exit the process or listen for any signals. If you'd like that
79    /// behavior, use [`ExitHandler::listen`].
80    ///
81    /// Because this API has no global side-effects and can be called many times within the same
82    /// process, it is possible to use it to provide a mock [`ExitHandler`] inside unit tests.
83    pub fn new_receiver() -> (Arc<ExitHandler>, ExitReceiver) {
84        let (tx, rx) = mpsc::unbounded_channel();
85        (Arc::new(ExitHandler { tx }), ExitReceiver { rx })
86    }
87
88    /// Register this given [`Future`] to run upon process exit.
89    ///
90    /// As there are many ways for a process be killed that are outside of a process's own control
91    /// (e.g. `SIGKILL` or `SIGSEGV`), this API is provided on a best-effort basis.
92    pub fn on_exit(&self, fut: impl Future<Output = ()> + Send + 'static) {
93        // realistically, this error case can only happen with the `new_receiver` API.
94        self.tx
95            .send(Box::pin(fut))
96            .expect("cannot send future after process exit");
97    }
98}
99
100/// Provides a way to run futures scheduled with an [`ExitHandler`].
101pub struct ExitReceiver {
102    rx: mpsc::UnboundedReceiver<BoxExitFuture>,
103}
104
105impl ExitReceiver {
106    /// Call this when the process exits to run the futures scheduled via [`ExitHandler::on_exit`].
107    ///
108    /// As this is intended to be used in a library context, this does not exit the process. It is
109    /// expected that the process will not exit until this async method finishes executing.
110    ///
111    /// Additional work can be scheduled using [`ExitHandler::on_exit`] even while this is running,
112    /// and it will execute before this function finishes. Work attempted to be scheduled after this
113    /// finishes will panic.
114    pub async fn run_exit_handler(mut self) {
115        let mut set = JoinSet::new();
116        while let Ok(fut) = self.rx.try_recv() {
117            set.spawn(fut);
118        }
119        loop {
120            select! {
121                biased;
122                Some(fut) = self.rx.recv() => {
123                    set.spawn(fut);
124                },
125                val = set.join_next() => {
126                    match val {
127                        Some(Ok(())) => {}
128                        Some(Err(_)) => panic!("ExitHandler future panicked!"),
129                        None => return,
130                    }
131                },
132            }
133        }
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this
140    use std::{
141        future::Future,
142        pin::Pin,
143        sync::{
144            Arc,
145            atomic::{AtomicBool, AtomicU32, Ordering},
146        },
147    };
148
149    use super::ExitHandler;
150
151    #[tokio::test]
152    async fn test_on_exit() {
153        let (handler, receiver) = ExitHandler::new_receiver();
154
155        let called = Arc::new(AtomicBool::new(false));
156        handler.on_exit({
157            let called = Arc::clone(&called);
158            async move {
159                tokio::task::yield_now().await;
160                called.store(true, Ordering::SeqCst);
161            }
162        });
163
164        receiver.run_exit_handler().await;
165        assert!(called.load(Ordering::SeqCst));
166    }
167
168    #[tokio::test]
169    async fn test_queue_while_exiting() {
170        let (handler, receiver) = ExitHandler::new_receiver();
171        let call_count = Arc::new(AtomicU32::new(0));
172
173        type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
174
175        // this struct is needed to construct the recursive closure type
176        #[derive(Clone)]
177        struct GetFut {
178            handler: Arc<ExitHandler>,
179            call_count: Arc<AtomicU32>,
180        }
181
182        impl GetFut {
183            fn get(self) -> BoxExitFuture {
184                Box::pin(async move {
185                    tokio::task::yield_now().await;
186                    if self.call_count.fetch_add(1, Ordering::SeqCst) < 99 {
187                        // queue more work while the exit handler is running
188                        Arc::clone(&self.handler).on_exit(self.get())
189                    }
190                })
191            }
192        }
193
194        handler.on_exit(
195            GetFut {
196                handler: Arc::clone(&handler),
197                call_count: Arc::clone(&call_count),
198            }
199            .get(),
200        );
201        receiver.run_exit_handler().await;
202        assert_eq!(call_count.load(Ordering::SeqCst), 100);
203    }
204}