Skip to main content

turbo_tasks/
event.rs

1use std::{
2    fmt::{Debug, Display, Formatter},
3    future::Future,
4    mem::replace,
5    pin::Pin,
6};
7#[cfg(feature = "hanging_detection")]
8use std::{
9    sync::Arc,
10    task::{Poll, ready},
11    time::Duration,
12};
13
14use event_listener::Listener as _;
15#[cfg(feature = "hanging_detection")]
16use tokio::time::{Timeout, timeout};
17
18pub trait EventDescriptor {
19    #[cfg(feature = "hanging_detection")]
20    fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send>;
21}
22
23impl<T, InnerFn> EventDescriptor for T
24where
25    T: FnOnce() -> InnerFn,
26    InnerFn: Fn() -> String + Sync + Send + 'static,
27{
28    #[cfg(feature = "hanging_detection")]
29    fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send> {
30        Arc::new((self)())
31    }
32}
33
34#[derive(Clone)]
35pub struct EventDescription {
36    #[cfg(feature = "hanging_detection")]
37    description: Arc<dyn Fn() -> String + Sync + Send>,
38}
39
40impl EventDescription {
41    #[inline(always)]
42    pub fn new<InnerFn>(#[allow(unused_variables)] description: impl FnOnce() -> InnerFn) -> Self
43    where
44        InnerFn: Fn() -> String + Sync + Send + 'static,
45    {
46        Self {
47            #[cfg(feature = "hanging_detection")]
48            description: Arc::new((description)()),
49        }
50    }
51}
52
53impl Display for EventDescription {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        #[cfg(not(feature = "hanging_detection"))]
56        return write!(f, "");
57
58        #[cfg(feature = "hanging_detection")]
59        return write!(f, "{}", (self.description)());
60    }
61}
62
63impl EventDescriptor for EventDescription {
64    #[cfg(feature = "hanging_detection")]
65    fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send> {
66        self.description
67    }
68}
69
70pub struct Event {
71    #[cfg(feature = "hanging_detection")]
72    description: Arc<dyn Fn() -> String + Sync + Send>,
73    event: event_listener::Event,
74}
75
76impl Event {
77    /// See [`event_listener::Event::new`]. May attach a description that may optionally be read
78    /// later.
79    ///
80    /// This confusingly takes a closure ([`FnOnce`]) that returns a nested closure ([`Fn`]).
81    ///
82    /// When `hanging_detection` is disabled, `description` is never called.
83    ///
84    /// When `hanging_detection` is enabled, the outer closure is called immediately. The outer
85    /// closure can have an ephemeral lifetime. The inner closure must be `'static`, but is called
86    /// only when the `description` is actually read.
87    ///
88    /// The outer closure allows avoiding extra lookups (e.g. task type info) that may be needed to
89    /// capture information needed for constructing (moving into) the inner closure.
90    #[inline(always)]
91    pub fn new(#[allow(unused_variables)] description: impl EventDescriptor) -> Self {
92        #[cfg(not(feature = "hanging_detection"))]
93        return Self {
94            event: event_listener::Event::new(),
95        };
96        #[cfg(feature = "hanging_detection")]
97        return Self {
98            description: description.get_description(),
99            event: event_listener::Event::new(),
100        };
101    }
102
103    /// See [`event_listener::Event::listen`].
104    pub fn listen(&self) -> EventListener {
105        #[cfg(not(feature = "hanging_detection"))]
106        return EventListener {
107            listener: self.event.listen(),
108        };
109        #[cfg(feature = "hanging_detection")]
110        return EventListener {
111            description: self.description.clone(),
112            note: Arc::new(String::new),
113            future: Some(Box::pin(timeout(
114                Duration::from_secs(30),
115                self.event.listen(),
116            ))),
117            duration: Duration::from_secs(30),
118        };
119    }
120
121    /// See [`event_listener::Event::listen`]. May attach a note that may optionally be read later.
122    ///
123    /// This confusingly takes a closure ([`FnOnce`]) that returns a nested closure ([`Fn`]).
124    ///
125    /// When `hanging_detection` is disabled, `note` is never called.
126    ///
127    /// When `hanging_detection` is enabled, the outer closure is called immediately. The outer
128    /// closure can have an ephemeral lifetime. The inner closer must be `'static`, but is called
129    /// only when the `note` is actually read.
130    ///
131    /// The outer closure allow avoiding extra lookups (e.g. task type info) that may be needed to
132    /// capture information needed for constructing (moving into) the inner closure.
133    pub fn listen_with_note(&self, _note: impl EventDescriptor) -> EventListener {
134        #[cfg(not(feature = "hanging_detection"))]
135        return EventListener {
136            listener: self.event.listen(),
137        };
138        #[cfg(feature = "hanging_detection")]
139        return EventListener {
140            description: self.description.clone(),
141            note: _note.get_description(),
142            future: Some(Box::pin(timeout(
143                Duration::from_secs(30),
144                self.event.listen(),
145            ))),
146            duration: Duration::from_secs(30),
147        };
148    }
149
150    /// pulls out the event listener, leaving a new, empty event in its place.
151    pub fn take(&mut self) -> Event {
152        #[cfg(not(feature = "hanging_detection"))]
153        return Self {
154            event: replace(&mut self.event, event_listener::Event::new()),
155        };
156        #[cfg(feature = "hanging_detection")]
157        return Self {
158            description: self.description.clone(),
159            event: replace(&mut self.event, event_listener::Event::new()),
160        };
161    }
162}
163
164impl Event {
165    /// see [`event_listener::Event::notify`]
166    pub fn notify(&self, n: usize) {
167        self.event.notify(n);
168    }
169}
170
171impl Debug for Event {
172    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
173        let mut t = f.debug_tuple("Event");
174        #[cfg(feature = "hanging_detection")]
175        t.field(&(self.description)());
176        t.finish()
177    }
178}
179
180#[cfg(not(feature = "hanging_detection"))]
181pub struct EventListener {
182    listener: event_listener::EventListener,
183}
184
185#[cfg(not(feature = "hanging_detection"))]
186impl Debug for EventListener {
187    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
188        f.debug_tuple("EventListener").finish()
189    }
190}
191
192#[cfg(not(feature = "hanging_detection"))]
193impl Future for EventListener {
194    type Output = ();
195
196    fn poll(
197        self: Pin<&mut Self>,
198        cx: &mut std::task::Context<'_>,
199    ) -> std::task::Poll<Self::Output> {
200        let listener = unsafe { self.map_unchecked_mut(|s| &mut s.listener) };
201        listener.poll(cx)
202    }
203}
204
205#[cfg(not(feature = "hanging_detection"))]
206impl EventListener {
207    /// Blocks the current thread until the event is notified.
208    ///
209    /// This is the synchronous equivalent of `.await`-ing the `EventListener`.
210    /// Only valid in synchronous contexts (e.g. backend operations).
211    pub fn wait(self) {
212        self.listener.wait();
213    }
214}
215
216#[cfg(feature = "hanging_detection")]
217pub struct EventListener {
218    description: Arc<dyn Fn() -> String + Sync + Send>,
219    note: Arc<dyn Fn() -> String + Sync + Send>,
220    // Timeout need to stay pinned while polling and also while it's dropped.
221    // So it's important to put it into a pinned Box to be able to take it out of the Option.
222    future: Option<std::pin::Pin<Box<Timeout<event_listener::EventListener>>>>,
223    duration: Duration,
224}
225
226#[cfg(feature = "hanging_detection")]
227impl Debug for EventListener {
228    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229        let mut t = f.debug_tuple("EventListener");
230        t.field(&(self.description)());
231        let note = (self.note)();
232        if !note.is_empty() {
233            t.field(&note);
234        }
235        t.finish()
236    }
237}
238
239#[cfg(feature = "hanging_detection")]
240impl Future for EventListener {
241    type Output = ();
242
243    fn poll(
244        mut self: Pin<&mut Self>,
245        cx: &mut std::task::Context<'_>,
246    ) -> std::task::Poll<Self::Output> {
247        while let Some(future) = self.future.as_mut() {
248            match ready!(future.as_mut().poll(cx)) {
249                Ok(_) => {
250                    self.future = None;
251                    return Poll::Ready(());
252                }
253                Err(_) => {
254                    let note = (self.note)();
255                    let description = (self.description)();
256                    if note.is_empty() {
257                        eprintln!(
258                            "EventListener({}) is potentially hanging, waiting for {}s",
259                            description,
260                            self.duration.as_secs(),
261                        );
262                    } else {
263                        eprintln!(
264                            "EventListener({}) is potentially hanging, waiting for {}s from {}",
265                            description,
266                            self.duration.as_secs(),
267                            note
268                        );
269                    }
270                    self.duration *= 2;
271                    // SAFETY: Taking from Option is safe because the value is inside of a pinned
272                    // Box. Pinning must continue until dropped.
273                    let future = self.future.take().unwrap();
274                    self.future = Some(Box::pin(timeout(
275                        self.duration,
276                        // SAFETY: We can move the inner future since it's an EventListener and
277                        // that is Unpin.
278                        unsafe { std::pin::Pin::into_inner_unchecked(future) }.into_inner(),
279                    )));
280                }
281            }
282        }
283        // EventListener was awaited again after completion
284        Poll::Ready(())
285    }
286}
287
288#[cfg(feature = "hanging_detection")]
289impl EventListener {
290    /// Blocks the current thread until the event is notified.
291    ///
292    /// Note: In `hanging_detection` builds, timeout warnings are not emitted
293    /// for sync waits (only for async `.await` usage).
294    pub fn wait(mut self) {
295        if let Some(future) = self.future.take() {
296            // SAFETY: EventListener is Unpin, so it's safe to move out of the Pin.
297            unsafe { std::pin::Pin::into_inner_unchecked(future) }
298                .into_inner()
299                .wait();
300        }
301    }
302}
303
304#[cfg(all(test, not(feature = "hanging_detection")))]
305mod tests {
306    use std::hint::black_box;
307
308    use tokio::time::{Duration, timeout};
309
310    use super::*;
311
312    // The closures used for descriptions/notes should be eliminated. This may only happen at higher
313    // optimization levels (that would be okay), but in practice it seems to work even for
314    // opt-level=0.
315    #[tokio::test]
316    async fn ensure_dead_code_elimination() {
317        fn dead_fn() {
318            // This code triggers a build error when it's not removed.
319            unsafe {
320                unsafe extern "C" {
321                    fn trigger_link_error() -> !;
322                }
323                trigger_link_error();
324            }
325        }
326
327        let event = black_box(Event::new(|| {
328            dead_fn();
329            || {
330                dead_fn();
331                String::new()
332            }
333        }));
334        let listener = black_box(event.listen_with_note(|| {
335            dead_fn();
336            || {
337                dead_fn();
338                String::new()
339            }
340        }));
341
342        let _ = black_box(timeout(Duration::from_millis(10), listener)).await;
343    }
344}