turbo_tasks/
event.rs

1use std::{
2    fmt::{Debug, Formatter},
3    future::Future,
4    mem::replace,
5    pin::Pin,
6};
7#[cfg(feature = "hanging_detection")]
8use std::{
9    sync::Arc,
10    task::{Poll, ready},
11    time::Duration,
12};
13
14#[cfg(feature = "hanging_detection")]
15use tokio::time::{Timeout, timeout};
16
17pub struct Event {
18    #[cfg(feature = "hanging_detection")]
19    description: Arc<dyn Fn() -> String + Sync + Send>,
20    event: event_listener::Event,
21}
22
23impl Event {
24    /// See [`event_listener::Event::new`]. May attach a description that may optionally be read
25    /// later.
26    ///
27    /// This confusingly takes a closure ([`FnOnce`]) that returns a nested closure ([`Fn`]).
28    ///
29    /// When `hanging_detection` is disabled, `description` is never called.
30    ///
31    /// When `hanging_detection` is enabled, the outer closure is called immediately. The outer
32    /// closure can have an ephemeral lifetime. The inner closure must be `'static`, but is called
33    /// only when the `description` is actually read.
34    ///
35    /// The outer closure allows avoiding extra lookups (e.g. task type info) that may be needed to
36    /// capture information needed for constructing (moving into) the inner closure.
37    #[inline(always)]
38    pub fn new<InnerFn>(_description: impl FnOnce() -> InnerFn) -> Self
39    where
40        InnerFn: Fn() -> String + Sync + Send + 'static,
41    {
42        #[cfg(not(feature = "hanging_detection"))]
43        return Self {
44            event: event_listener::Event::new(),
45        };
46        #[cfg(feature = "hanging_detection")]
47        return Self {
48            description: Arc::new((_description)()),
49            event: event_listener::Event::new(),
50        };
51    }
52
53    /// See [`event_listener::Event::listen`].
54    pub fn listen(&self) -> EventListener {
55        #[cfg(not(feature = "hanging_detection"))]
56        return EventListener {
57            listener: self.event.listen(),
58        };
59        #[cfg(feature = "hanging_detection")]
60        return EventListener {
61            description: self.description.clone(),
62            note: Arc::new(String::new),
63            future: Some(Box::pin(timeout(
64                Duration::from_secs(30),
65                self.event.listen(),
66            ))),
67            duration: Duration::from_secs(30),
68        };
69    }
70
71    /// See [`event_listener::Event::listen`]. May attach a note that may optionally be read later.
72    ///
73    /// This confusingly takes a closure ([`FnOnce`]) that returns a nested closure ([`Fn`]).
74    ///
75    /// When `hanging_detection` is disabled, `note` is never called.
76    ///
77    /// When `hanging_detection` is enabled, the outer closure is called immediately. The outer
78    /// closure can have an ephemeral lifetime. The inner closer must be `'static`, but is called
79    /// only when the `note` is actually read.
80    ///
81    /// The outer closure allow avoiding extra lookups (e.g. task type info) that may be needed to
82    /// capture information needed for constructing (moving into) the inner closure.
83    pub fn listen_with_note<InnerFn>(&self, _note: impl FnOnce() -> InnerFn) -> EventListener
84    where
85        InnerFn: Fn() -> String + Sync + Send + 'static,
86    {
87        #[cfg(not(feature = "hanging_detection"))]
88        return EventListener {
89            listener: self.event.listen(),
90        };
91        #[cfg(feature = "hanging_detection")]
92        return EventListener {
93            description: self.description.clone(),
94            note: Arc::new((_note)()),
95            future: Some(Box::pin(timeout(
96                Duration::from_secs(30),
97                self.event.listen(),
98            ))),
99            duration: Duration::from_secs(30),
100        };
101    }
102
103    /// pulls out the event listener, leaving a new, empty event in its place.
104    pub fn take(&mut self) -> Event {
105        #[cfg(not(feature = "hanging_detection"))]
106        return Self {
107            event: replace(&mut self.event, event_listener::Event::new()),
108        };
109        #[cfg(feature = "hanging_detection")]
110        return Self {
111            description: self.description.clone(),
112            event: replace(&mut self.event, event_listener::Event::new()),
113        };
114    }
115}
116
117impl Event {
118    /// see [`event_listener::Event::notify`]
119    pub fn notify(&self, n: usize) {
120        self.event.notify(n);
121    }
122}
123
124impl Debug for Event {
125    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126        let mut t = f.debug_tuple("Event");
127        #[cfg(feature = "hanging_detection")]
128        t.field(&(self.description)());
129        t.finish()
130    }
131}
132
133#[cfg(not(feature = "hanging_detection"))]
134pub struct EventListener {
135    listener: event_listener::EventListener,
136}
137
138#[cfg(not(feature = "hanging_detection"))]
139impl Debug for EventListener {
140    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
141        f.debug_tuple("EventListener").finish()
142    }
143}
144
145#[cfg(not(feature = "hanging_detection"))]
146impl Future for EventListener {
147    type Output = ();
148
149    fn poll(
150        self: Pin<&mut Self>,
151        cx: &mut std::task::Context<'_>,
152    ) -> std::task::Poll<Self::Output> {
153        let listener = unsafe { self.map_unchecked_mut(|s| &mut s.listener) };
154        listener.poll(cx)
155    }
156}
157
158#[cfg(feature = "hanging_detection")]
159pub struct EventListener {
160    description: Arc<dyn Fn() -> String + Sync + Send>,
161    note: Arc<dyn Fn() -> String + Sync + Send>,
162    // Timeout need to stay pinned while polling and also while it's dropped.
163    // So it's important to put it into a pinned Box to be able to take it out of the Option.
164    future: Option<std::pin::Pin<Box<Timeout<event_listener::EventListener>>>>,
165    duration: Duration,
166}
167
168#[cfg(feature = "hanging_detection")]
169impl Debug for EventListener {
170    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
171        let mut t = f.debug_tuple("EventListener");
172        t.field(&(self.description)());
173        let note = (self.note)();
174        if !note.is_empty() {
175            t.field(&note);
176        }
177        t.finish()
178    }
179}
180
181#[cfg(feature = "hanging_detection")]
182impl Future for EventListener {
183    type Output = ();
184
185    fn poll(
186        mut self: Pin<&mut Self>,
187        cx: &mut std::task::Context<'_>,
188    ) -> std::task::Poll<Self::Output> {
189        while let Some(future) = self.future.as_mut() {
190            match ready!(future.as_mut().poll(cx)) {
191                Ok(_) => {
192                    self.future = None;
193                    return Poll::Ready(());
194                }
195                Err(_) => {
196                    let note = (self.note)();
197                    let description = (self.description)();
198                    if note.is_empty() {
199                        eprintln!(
200                            "EventListener({}) is potentially hanging, waiting for {}s",
201                            description,
202                            self.duration.as_secs(),
203                        );
204                    } else {
205                        eprintln!(
206                            "EventListener({}) is potentially hanging, waiting for {}s from {}",
207                            description,
208                            self.duration.as_secs(),
209                            note
210                        );
211                    }
212                    self.duration *= 2;
213                    // SAFETY: Taking from Option is safe because the value is inside of a pinned
214                    // Box. Pinning must continue until dropped.
215                    let future = self.future.take().unwrap();
216                    self.future = Some(Box::pin(timeout(
217                        self.duration,
218                        // SAFETY: We can move the inner future since it's an EventListener and
219                        // that is Unpin.
220                        unsafe { std::pin::Pin::into_inner_unchecked(future) }.into_inner(),
221                    )));
222                }
223            }
224        }
225        // EventListener was awaited again after completion
226        Poll::Ready(())
227    }
228}
229
230#[cfg(all(test, not(feature = "hanging_detection")))]
231mod tests {
232    use std::hint::black_box;
233
234    use tokio::time::{Duration, timeout};
235
236    use super::*;
237
238    // The closures used for descriptions/notes should be eliminated. This may only happen at higher
239    // optimization levels (that would be okay), but in practice it seems to work even for
240    // opt-level=0.
241    #[tokio::test]
242    async fn ensure_dead_code_elimination() {
243        fn dead_fn() {
244            // This code triggers a build error when it's not removed.
245            unsafe {
246                unsafe extern "C" {
247                    fn trigger_link_error() -> !;
248                }
249                trigger_link_error();
250            }
251        }
252
253        let event = black_box(Event::new(|| {
254            dead_fn();
255            || {
256                dead_fn();
257                String::new()
258            }
259        }));
260        let listener = black_box(event.listen_with_note(|| {
261            dead_fn();
262            || {
263                dead_fn();
264                String::new()
265            }
266        }));
267
268        let _ = black_box(timeout(Duration::from_millis(10), listener)).await;
269    }
270}