Skip to main content

turbo_tasks/
event.rs

1use std::{
2    fmt::{Debug, Display, Formatter},
3    future::Future,
4    mem::replace,
5    pin::Pin,
6};
7#[cfg(feature = "hanging_detection")]
8use std::{
9    sync::Arc,
10    task::{Poll, ready},
11    time::Duration,
12};
13
14#[cfg(feature = "hanging_detection")]
15use tokio::time::{Timeout, timeout};
16
17pub trait EventDescriptor {
18    #[cfg(feature = "hanging_detection")]
19    fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send>;
20}
21
22impl<T, InnerFn> EventDescriptor for T
23where
24    T: FnOnce() -> InnerFn,
25    InnerFn: Fn() -> String + Sync + Send + 'static,
26{
27    #[cfg(feature = "hanging_detection")]
28    fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send> {
29        Arc::new((self)())
30    }
31}
32
33#[derive(Clone)]
34pub struct EventDescription {
35    #[cfg(feature = "hanging_detection")]
36    description: Arc<dyn Fn() -> String + Sync + Send>,
37}
38
39impl EventDescription {
40    #[inline(always)]
41    pub fn new<InnerFn>(#[allow(unused_variables)] description: impl FnOnce() -> InnerFn) -> Self
42    where
43        InnerFn: Fn() -> String + Sync + Send + 'static,
44    {
45        Self {
46            #[cfg(feature = "hanging_detection")]
47            description: Arc::new((description)()),
48        }
49    }
50}
51
52impl Display for EventDescription {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        #[cfg(not(feature = "hanging_detection"))]
55        return write!(f, "");
56
57        #[cfg(feature = "hanging_detection")]
58        return write!(f, "{}", (self.description)());
59    }
60}
61
62impl EventDescriptor for EventDescription {
63    #[cfg(feature = "hanging_detection")]
64    fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send> {
65        self.description
66    }
67}
68
69pub struct Event {
70    #[cfg(feature = "hanging_detection")]
71    description: Arc<dyn Fn() -> String + Sync + Send>,
72    event: event_listener::Event,
73}
74
75impl Event {
76    /// See [`event_listener::Event::new`]. May attach a description that may optionally be read
77    /// later.
78    ///
79    /// This confusingly takes a closure ([`FnOnce`]) that returns a nested closure ([`Fn`]).
80    ///
81    /// When `hanging_detection` is disabled, `description` is never called.
82    ///
83    /// When `hanging_detection` is enabled, the outer closure is called immediately. The outer
84    /// closure can have an ephemeral lifetime. The inner closure must be `'static`, but is called
85    /// only when the `description` is actually read.
86    ///
87    /// The outer closure allows avoiding extra lookups (e.g. task type info) that may be needed to
88    /// capture information needed for constructing (moving into) the inner closure.
89    #[inline(always)]
90    pub fn new(#[allow(unused_variables)] description: impl EventDescriptor) -> Self {
91        #[cfg(not(feature = "hanging_detection"))]
92        return Self {
93            event: event_listener::Event::new(),
94        };
95        #[cfg(feature = "hanging_detection")]
96        return Self {
97            description: description.get_description(),
98            event: event_listener::Event::new(),
99        };
100    }
101
102    /// See [`event_listener::Event::listen`].
103    pub fn listen(&self) -> EventListener {
104        #[cfg(not(feature = "hanging_detection"))]
105        return EventListener {
106            listener: self.event.listen(),
107        };
108        #[cfg(feature = "hanging_detection")]
109        return EventListener {
110            description: self.description.clone(),
111            note: Arc::new(String::new),
112            future: Some(Box::pin(timeout(
113                Duration::from_secs(30),
114                self.event.listen(),
115            ))),
116            duration: Duration::from_secs(30),
117        };
118    }
119
120    /// See [`event_listener::Event::listen`]. May attach a note that may optionally be read later.
121    ///
122    /// This confusingly takes a closure ([`FnOnce`]) that returns a nested closure ([`Fn`]).
123    ///
124    /// When `hanging_detection` is disabled, `note` is never called.
125    ///
126    /// When `hanging_detection` is enabled, the outer closure is called immediately. The outer
127    /// closure can have an ephemeral lifetime. The inner closer must be `'static`, but is called
128    /// only when the `note` is actually read.
129    ///
130    /// The outer closure allow avoiding extra lookups (e.g. task type info) that may be needed to
131    /// capture information needed for constructing (moving into) the inner closure.
132    pub fn listen_with_note(&self, _note: impl EventDescriptor) -> EventListener {
133        #[cfg(not(feature = "hanging_detection"))]
134        return EventListener {
135            listener: self.event.listen(),
136        };
137        #[cfg(feature = "hanging_detection")]
138        return EventListener {
139            description: self.description.clone(),
140            note: _note.get_description(),
141            future: Some(Box::pin(timeout(
142                Duration::from_secs(30),
143                self.event.listen(),
144            ))),
145            duration: Duration::from_secs(30),
146        };
147    }
148
149    /// pulls out the event listener, leaving a new, empty event in its place.
150    pub fn take(&mut self) -> Event {
151        #[cfg(not(feature = "hanging_detection"))]
152        return Self {
153            event: replace(&mut self.event, event_listener::Event::new()),
154        };
155        #[cfg(feature = "hanging_detection")]
156        return Self {
157            description: self.description.clone(),
158            event: replace(&mut self.event, event_listener::Event::new()),
159        };
160    }
161}
162
163impl Event {
164    /// see [`event_listener::Event::notify`]
165    pub fn notify(&self, n: usize) {
166        self.event.notify(n);
167    }
168}
169
170impl Debug for Event {
171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172        let mut t = f.debug_tuple("Event");
173        #[cfg(feature = "hanging_detection")]
174        t.field(&(self.description)());
175        t.finish()
176    }
177}
178
179#[cfg(not(feature = "hanging_detection"))]
180pub struct EventListener {
181    listener: event_listener::EventListener,
182}
183
184#[cfg(not(feature = "hanging_detection"))]
185impl Debug for EventListener {
186    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
187        f.debug_tuple("EventListener").finish()
188    }
189}
190
191#[cfg(not(feature = "hanging_detection"))]
192impl Future for EventListener {
193    type Output = ();
194
195    fn poll(
196        self: Pin<&mut Self>,
197        cx: &mut std::task::Context<'_>,
198    ) -> std::task::Poll<Self::Output> {
199        let listener = unsafe { self.map_unchecked_mut(|s| &mut s.listener) };
200        listener.poll(cx)
201    }
202}
203
204#[cfg(feature = "hanging_detection")]
205pub struct EventListener {
206    description: Arc<dyn Fn() -> String + Sync + Send>,
207    note: Arc<dyn Fn() -> String + Sync + Send>,
208    // Timeout need to stay pinned while polling and also while it's dropped.
209    // So it's important to put it into a pinned Box to be able to take it out of the Option.
210    future: Option<std::pin::Pin<Box<Timeout<event_listener::EventListener>>>>,
211    duration: Duration,
212}
213
214#[cfg(feature = "hanging_detection")]
215impl Debug for EventListener {
216    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
217        let mut t = f.debug_tuple("EventListener");
218        t.field(&(self.description)());
219        let note = (self.note)();
220        if !note.is_empty() {
221            t.field(&note);
222        }
223        t.finish()
224    }
225}
226
227#[cfg(feature = "hanging_detection")]
228impl Future for EventListener {
229    type Output = ();
230
231    fn poll(
232        mut self: Pin<&mut Self>,
233        cx: &mut std::task::Context<'_>,
234    ) -> std::task::Poll<Self::Output> {
235        while let Some(future) = self.future.as_mut() {
236            match ready!(future.as_mut().poll(cx)) {
237                Ok(_) => {
238                    self.future = None;
239                    return Poll::Ready(());
240                }
241                Err(_) => {
242                    let note = (self.note)();
243                    let description = (self.description)();
244                    if note.is_empty() {
245                        eprintln!(
246                            "EventListener({}) is potentially hanging, waiting for {}s",
247                            description,
248                            self.duration.as_secs(),
249                        );
250                    } else {
251                        eprintln!(
252                            "EventListener({}) is potentially hanging, waiting for {}s from {}",
253                            description,
254                            self.duration.as_secs(),
255                            note
256                        );
257                    }
258                    self.duration *= 2;
259                    // SAFETY: Taking from Option is safe because the value is inside of a pinned
260                    // Box. Pinning must continue until dropped.
261                    let future = self.future.take().unwrap();
262                    self.future = Some(Box::pin(timeout(
263                        self.duration,
264                        // SAFETY: We can move the inner future since it's an EventListener and
265                        // that is Unpin.
266                        unsafe { std::pin::Pin::into_inner_unchecked(future) }.into_inner(),
267                    )));
268                }
269            }
270        }
271        // EventListener was awaited again after completion
272        Poll::Ready(())
273    }
274}
275
276#[cfg(all(test, not(feature = "hanging_detection")))]
277mod tests {
278    use std::hint::black_box;
279
280    use tokio::time::{Duration, timeout};
281
282    use super::*;
283
284    // The closures used for descriptions/notes should be eliminated. This may only happen at higher
285    // optimization levels (that would be okay), but in practice it seems to work even for
286    // opt-level=0.
287    #[tokio::test]
288    async fn ensure_dead_code_elimination() {
289        fn dead_fn() {
290            // This code triggers a build error when it's not removed.
291            unsafe {
292                unsafe extern "C" {
293                    fn trigger_link_error() -> !;
294                }
295                trigger_link_error();
296            }
297        }
298
299        let event = black_box(Event::new(|| {
300            dead_fn();
301            || {
302                dead_fn();
303                String::new()
304            }
305        }));
306        let listener = black_box(event.listen_with_note(|| {
307            dead_fn();
308            || {
309                dead_fn();
310                String::new()
311            }
312        }));
313
314        let _ = black_box(timeout(Duration::from_millis(10), listener)).await;
315    }
316}