turbo_tasks/
event.rs

1#[cfg(feature = "hanging_detection")]
2use std::sync::Arc;
3#[cfg(feature = "hanging_detection")]
4use std::task::Poll;
5#[cfg(feature = "hanging_detection")]
6use std::task::ready;
7#[cfg(feature = "hanging_detection")]
8use std::time::Duration;
9use std::{
10    fmt::{Debug, Formatter},
11    future::Future,
12    mem::replace,
13    pin::Pin,
14};
15
16#[cfg(feature = "hanging_detection")]
17use tokio::time::Timeout;
18#[cfg(feature = "hanging_detection")]
19use tokio::time::timeout;
20
21pub struct Event {
22    #[cfg(feature = "hanging_detection")]
23    description: Arc<dyn Fn() -> String + Sync + Send>,
24    event: event_listener::Event,
25}
26
27#[cfg(not(feature = "hanging_detection"))]
28impl Event {
29    /// see [event_listener::Event]::new
30    #[inline(always)]
31    pub fn new(_description: impl Fn() -> String + Sync + Send + 'static) -> Self {
32        Self {
33            event: event_listener::Event::new(),
34        }
35    }
36
37    /// see [event_listener::Event]::listen
38    pub fn listen(&self) -> EventListener {
39        EventListener {
40            listener: self.event.listen(),
41        }
42    }
43
44    /// see [event_listener::Event]::listen
45    pub fn listen_with_note(
46        &self,
47        _note: impl Fn() -> String + Sync + Send + 'static,
48    ) -> EventListener {
49        EventListener {
50            listener: self.event.listen(),
51        }
52    }
53
54    /// pulls out the event listener, leaving a new, empty event in its place.
55    pub fn take(&mut self) -> Self {
56        Self {
57            event: replace(&mut self.event, event_listener::Event::new()),
58        }
59    }
60}
61
62#[cfg(feature = "hanging_detection")]
63impl Event {
64    /// see [event_listener::Event]::new
65    #[inline(always)]
66    pub fn new(description: impl Fn() -> String + Sync + Send + 'static) -> Self {
67        Self {
68            description: Arc::new(description),
69            event: event_listener::Event::new(),
70        }
71    }
72
73    /// see [event_listener::Event]::listen
74    pub fn listen(&self) -> EventListener {
75        EventListener {
76            description: self.description.clone(),
77            note: Arc::new(|| String::new()),
78            future: Some(Box::pin(timeout(
79                Duration::from_secs(10),
80                self.event.listen(),
81            ))),
82            duration: Duration::from_secs(10),
83        }
84    }
85
86    /// see [event_listener::Event]::listen
87    pub fn listen_with_note(
88        &self,
89        note: impl Fn() -> String + Sync + Send + 'static,
90    ) -> EventListener {
91        EventListener {
92            description: self.description.clone(),
93            note: Arc::new(note),
94            future: Some(Box::pin(timeout(
95                Duration::from_secs(10),
96                self.event.listen(),
97            ))),
98            duration: Duration::from_secs(10),
99        }
100    }
101
102    /// pulls out the event listener, leaving a new, empty event in its place.
103    pub fn take(&mut self) -> Event {
104        Self {
105            description: self.description.clone(),
106            event: replace(&mut self.event, event_listener::Event::new()),
107        }
108    }
109}
110
111impl Event {
112    /// see [event_listener::Event]::notify
113    pub fn notify(&self, n: usize) {
114        self.event.notify(n);
115    }
116}
117
118impl Debug for Event {
119    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120        let mut t = f.debug_tuple("Event");
121        #[cfg(feature = "hanging_detection")]
122        t.field(&(self.description)());
123        t.finish()
124    }
125}
126
127#[cfg(not(feature = "hanging_detection"))]
128pub struct EventListener {
129    listener: event_listener::EventListener,
130}
131
132#[cfg(not(feature = "hanging_detection"))]
133impl Debug for EventListener {
134    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
135        f.debug_tuple("EventListener").finish()
136    }
137}
138
139#[cfg(not(feature = "hanging_detection"))]
140impl Future for EventListener {
141    type Output = ();
142
143    fn poll(
144        self: Pin<&mut Self>,
145        cx: &mut std::task::Context<'_>,
146    ) -> std::task::Poll<Self::Output> {
147        let listener = unsafe { self.map_unchecked_mut(|s| &mut s.listener) };
148        listener.poll(cx)
149    }
150}
151
152#[cfg(feature = "hanging_detection")]
153pub struct EventListener {
154    description: Arc<dyn Fn() -> String + Sync + Send>,
155    note: Arc<dyn Fn() -> String + Sync + Send>,
156    // Timeout need to stay pinned while polling and also while it's dropped.
157    // So it's important to put it into a pinned Box to be able to take it out of the Option.
158    future: Option<std::pin::Pin<Box<Timeout<event_listener::EventListener>>>>,
159    duration: Duration,
160}
161
162#[cfg(feature = "hanging_detection")]
163impl Debug for EventListener {
164    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165        let mut t = f.debug_tuple("EventListener");
166        t.field(&(self.description)());
167        let note = (self.note)();
168        if !note.is_empty() {
169            t.field(&note);
170        }
171        t.finish()
172    }
173}
174
175#[cfg(feature = "hanging_detection")]
176impl Future for EventListener {
177    type Output = ();
178
179    fn poll(
180        mut self: Pin<&mut Self>,
181        cx: &mut std::task::Context<'_>,
182    ) -> std::task::Poll<Self::Output> {
183        while let Some(future) = self.future.as_mut() {
184            match ready!(future.as_mut().poll(cx)) {
185                Ok(_) => {
186                    self.future = None;
187                    return Poll::Ready(());
188                }
189                Err(_) => {
190                    use crate::util::FormatDuration;
191                    eprintln!(
192                        "{:?} is potentially hanging (waiting for {})",
193                        self,
194                        FormatDuration(self.duration)
195                    );
196                    self.duration *= 2;
197                    // SAFETY: Taking from Option is safe because the value is inside of a pinned
198                    // Box. Pinning must continue until dropped.
199                    let future = self.future.take().unwrap();
200                    self.future = Some(Box::pin(timeout(
201                        self.duration,
202                        // SAFETY: We can move the inner future since it's an EventListener and
203                        // that is Unpin.
204                        unsafe { std::pin::Pin::into_inner_unchecked(future) }.into_inner(),
205                    )));
206                }
207            }
208        }
209        // EventListener was awaited again after completion
210        Poll::Ready(())
211    }
212}