turbopack_trace_utils/exit.rs
1use std::{
2 future::Future,
3 pin::Pin,
4 sync::{Arc, Mutex, OnceLock},
5};
6
7use anyhow::Result;
8use tokio::{select, sync::mpsc, task::JoinSet};
9
10/// A guard for the exit handler. When dropped, the exit guard will be dropped.
11/// It might also be dropped on Ctrl-C.
12pub struct ExitGuard<T>(Arc<Mutex<Option<T>>>);
13
14impl<T> Drop for ExitGuard<T> {
15 fn drop(&mut self) {
16 drop(self.0.lock().unwrap().take())
17 }
18}
19
20impl<T: Send + 'static> ExitGuard<T> {
21 /// Drop a guard when Ctrl-C is pressed or the [ExitGuard] is dropped.
22 pub fn new(guard: T) -> Result<Self> {
23 let guard = Arc::new(Mutex::new(Some(guard)));
24 {
25 let guard = guard.clone();
26 tokio::spawn(async move {
27 tokio::signal::ctrl_c().await.unwrap();
28 drop(guard.lock().unwrap().take());
29 std::process::exit(0);
30 });
31 }
32 Ok(ExitGuard(guard))
33 }
34}
35
36type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
37
38/// The singular global ExitHandler. This is primarily used to ensure `ExitHandler::listen` is only
39/// called once.
40///
41/// The global handler is intentionally not exposed, so that APIs that depend on exit behavior are
42/// required to take the `ExitHandler`. This ensures that the `ExitHandler` is configured before
43/// these APIs are run, and that these consumers can be used with a callback (e.g. a mock) instead.
44static GLOBAL_EXIT_HANDLER: OnceLock<Arc<ExitHandler>> = OnceLock::new();
45
46pub struct ExitHandler {
47 tx: mpsc::UnboundedSender<BoxExitFuture>,
48}
49
50impl ExitHandler {
51 /// Waits for `SIGINT` using [`tokio::signal::ctrl_c`], and exits the process with exit code `0`
52 /// after running any futures scheduled with [`ExitHandler::on_exit`].
53 ///
54 /// As this uses global process signals, this must only be called once, and will panic if called
55 /// multiple times. Use this when you own the process (e.g. `turbopack-cli`).
56 ///
57 /// If you don't own the process (e.g. you're called as a library, such as in `next-swc`), use
58 /// [`ExitHandler::new_receiver`] instead.
59 ///
60 /// This may listen for other signals, like `SIGTERM` or `SIGPIPE` in the future.
61 pub fn listen() -> &'static Arc<ExitHandler> {
62 let (handler, receiver) = Self::new_receiver();
63 if GLOBAL_EXIT_HANDLER.set(handler).is_err() {
64 panic!("ExitHandler::listen must only be called once");
65 }
66 tokio::spawn(async move {
67 tokio::signal::ctrl_c()
68 .await
69 .expect("failed to set ctrl_c handler");
70 receiver.run_exit_handler().await;
71 std::process::exit(0);
72 });
73 GLOBAL_EXIT_HANDLER.get().expect("value is set")
74 }
75
76 /// Creates an [`ExitHandler`] that can be manually controlled with an [`ExitReceiver`].
77 ///
78 /// This does not actually exit the process or listen for any signals. If you'd like that
79 /// behavior, use [`ExitHandler::listen`].
80 ///
81 /// Because this API has no global side-effects and can be called many times within the same
82 /// process, it is possible to use it to provide a mock [`ExitHandler`] inside unit tests.
83 pub fn new_receiver() -> (Arc<ExitHandler>, ExitReceiver) {
84 let (tx, rx) = mpsc::unbounded_channel();
85 (Arc::new(ExitHandler { tx }), ExitReceiver { rx })
86 }
87
88 /// Register this given [`Future`] to run upon process exit.
89 ///
90 /// As there are many ways for a process be killed that are outside of a process's own control
91 /// (e.g. `SIGKILL` or `SIGSEGV`), this API is provided on a best-effort basis.
92 pub fn on_exit(&self, fut: impl Future<Output = ()> + Send + 'static) {
93 // realistically, this error case can only happen with the `new_receiver` API.
94 self.tx
95 .send(Box::pin(fut))
96 .expect("cannot send future after process exit");
97 }
98}
99
100/// Provides a way to run futures scheduled with an [`ExitHandler`].
101pub struct ExitReceiver {
102 rx: mpsc::UnboundedReceiver<BoxExitFuture>,
103}
104
105impl ExitReceiver {
106 /// Call this when the process exits to run the futures scheduled via [`ExitHandler::on_exit`].
107 ///
108 /// As this is intended to be used in a library context, this does not exit the process. It is
109 /// expected that the process will not exit until this async method finishes executing.
110 ///
111 /// Additional work can be scheduled using [`ExitHandler::on_exit`] even while this is running,
112 /// and it will execute before this function finishes. Work attempted to be scheduled after this
113 /// finishes will panic.
114 pub async fn run_exit_handler(mut self) {
115 let mut set = JoinSet::new();
116 while let Ok(fut) = self.rx.try_recv() {
117 set.spawn(fut);
118 }
119 loop {
120 select! {
121 biased;
122 Some(fut) = self.rx.recv() => {
123 set.spawn(fut);
124 },
125 val = set.join_next() => {
126 match val {
127 Some(Ok(())) => {}
128 Some(Err(_)) => panic!("ExitHandler future panicked!"),
129 None => return,
130 }
131 },
132 }
133 }
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this
140 use std::{
141 future::Future,
142 pin::Pin,
143 sync::{
144 Arc,
145 atomic::{AtomicBool, AtomicU32, Ordering},
146 },
147 };
148
149 use super::ExitHandler;
150
151 #[tokio::test]
152 async fn test_on_exit() {
153 let (handler, receiver) = ExitHandler::new_receiver();
154
155 let called = Arc::new(AtomicBool::new(false));
156 handler.on_exit({
157 let called = Arc::clone(&called);
158 async move {
159 tokio::task::yield_now().await;
160 called.store(true, Ordering::SeqCst);
161 }
162 });
163
164 receiver.run_exit_handler().await;
165 assert!(called.load(Ordering::SeqCst));
166 }
167
168 #[tokio::test]
169 async fn test_queue_while_exiting() {
170 let (handler, receiver) = ExitHandler::new_receiver();
171 let call_count = Arc::new(AtomicU32::new(0));
172
173 type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
174
175 // this struct is needed to construct the recursive closure type
176 #[derive(Clone)]
177 struct GetFut {
178 handler: Arc<ExitHandler>,
179 call_count: Arc<AtomicU32>,
180 }
181
182 impl GetFut {
183 fn get(self) -> BoxExitFuture {
184 Box::pin(async move {
185 tokio::task::yield_now().await;
186 if self.call_count.fetch_add(1, Ordering::SeqCst) < 99 {
187 // queue more work while the exit handler is running
188 Arc::clone(&self.handler).on_exit(self.get())
189 }
190 })
191 }
192 }
193
194 handler.on_exit(
195 GetFut {
196 handler: Arc::clone(&handler),
197 call_count: Arc::clone(&call_count),
198 }
199 .get(),
200 );
201 receiver.run_exit_handler().await;
202 assert_eq!(call_count.load(Ordering::SeqCst), 100);
203 }
204}