1use std::{
2 fmt::{Debug, Formatter},
3 future::Future,
4 mem::replace,
5 pin::Pin,
6};
7#[cfg(feature = "hanging_detection")]
8use std::{
9 sync::Arc,
10 task::{Poll, ready},
11 time::Duration,
12};
13
14#[cfg(feature = "hanging_detection")]
15use tokio::time::{Timeout, timeout};
16
17pub struct Event {
18 #[cfg(feature = "hanging_detection")]
19 description: Arc<dyn Fn() -> String + Sync + Send>,
20 event: event_listener::Event,
21}
22
23impl Event {
24 #[inline(always)]
38 pub fn new<InnerFn>(_description: impl FnOnce() -> InnerFn) -> Self
39 where
40 InnerFn: Fn() -> String + Sync + Send + 'static,
41 {
42 #[cfg(not(feature = "hanging_detection"))]
43 return Self {
44 event: event_listener::Event::new(),
45 };
46 #[cfg(feature = "hanging_detection")]
47 return Self {
48 description: Arc::new((_description)()),
49 event: event_listener::Event::new(),
50 };
51 }
52
53 pub fn listen(&self) -> EventListener {
55 #[cfg(not(feature = "hanging_detection"))]
56 return EventListener {
57 listener: self.event.listen(),
58 };
59 #[cfg(feature = "hanging_detection")]
60 return EventListener {
61 description: self.description.clone(),
62 note: Arc::new(String::new),
63 future: Some(Box::pin(timeout(
64 Duration::from_secs(30),
65 self.event.listen(),
66 ))),
67 duration: Duration::from_secs(30),
68 };
69 }
70
71 pub fn listen_with_note<InnerFn>(&self, _note: impl FnOnce() -> InnerFn) -> EventListener
84 where
85 InnerFn: Fn() -> String + Sync + Send + 'static,
86 {
87 #[cfg(not(feature = "hanging_detection"))]
88 return EventListener {
89 listener: self.event.listen(),
90 };
91 #[cfg(feature = "hanging_detection")]
92 return EventListener {
93 description: self.description.clone(),
94 note: Arc::new((_note)()),
95 future: Some(Box::pin(timeout(
96 Duration::from_secs(30),
97 self.event.listen(),
98 ))),
99 duration: Duration::from_secs(30),
100 };
101 }
102
103 pub fn take(&mut self) -> Event {
105 #[cfg(not(feature = "hanging_detection"))]
106 return Self {
107 event: replace(&mut self.event, event_listener::Event::new()),
108 };
109 #[cfg(feature = "hanging_detection")]
110 return Self {
111 description: self.description.clone(),
112 event: replace(&mut self.event, event_listener::Event::new()),
113 };
114 }
115}
116
117impl Event {
118 pub fn notify(&self, n: usize) {
120 self.event.notify(n);
121 }
122}
123
124impl Debug for Event {
125 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126 let mut t = f.debug_tuple("Event");
127 #[cfg(feature = "hanging_detection")]
128 t.field(&(self.description)());
129 t.finish()
130 }
131}
132
133#[cfg(not(feature = "hanging_detection"))]
134pub struct EventListener {
135 listener: event_listener::EventListener,
136}
137
138#[cfg(not(feature = "hanging_detection"))]
139impl Debug for EventListener {
140 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
141 f.debug_tuple("EventListener").finish()
142 }
143}
144
145#[cfg(not(feature = "hanging_detection"))]
146impl Future for EventListener {
147 type Output = ();
148
149 fn poll(
150 self: Pin<&mut Self>,
151 cx: &mut std::task::Context<'_>,
152 ) -> std::task::Poll<Self::Output> {
153 let listener = unsafe { self.map_unchecked_mut(|s| &mut s.listener) };
154 listener.poll(cx)
155 }
156}
157
158#[cfg(feature = "hanging_detection")]
159pub struct EventListener {
160 description: Arc<dyn Fn() -> String + Sync + Send>,
161 note: Arc<dyn Fn() -> String + Sync + Send>,
162 future: Option<std::pin::Pin<Box<Timeout<event_listener::EventListener>>>>,
165 duration: Duration,
166}
167
168#[cfg(feature = "hanging_detection")]
169impl Debug for EventListener {
170 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
171 let mut t = f.debug_tuple("EventListener");
172 t.field(&(self.description)());
173 let note = (self.note)();
174 if !note.is_empty() {
175 t.field(¬e);
176 }
177 t.finish()
178 }
179}
180
181#[cfg(feature = "hanging_detection")]
182impl Future for EventListener {
183 type Output = ();
184
185 fn poll(
186 mut self: Pin<&mut Self>,
187 cx: &mut std::task::Context<'_>,
188 ) -> std::task::Poll<Self::Output> {
189 while let Some(future) = self.future.as_mut() {
190 match ready!(future.as_mut().poll(cx)) {
191 Ok(_) => {
192 self.future = None;
193 return Poll::Ready(());
194 }
195 Err(_) => {
196 let note = (self.note)();
197 let description = (self.description)();
198 if note.is_empty() {
199 eprintln!(
200 "EventListener({}) is potentially hanging, waiting for {}s",
201 description,
202 self.duration.as_secs(),
203 );
204 } else {
205 eprintln!(
206 "EventListener({}) is potentially hanging, waiting for {}s from {}",
207 description,
208 self.duration.as_secs(),
209 note
210 );
211 }
212 self.duration *= 2;
213 let future = self.future.take().unwrap();
216 self.future = Some(Box::pin(timeout(
217 self.duration,
218 unsafe { std::pin::Pin::into_inner_unchecked(future) }.into_inner(),
221 )));
222 }
223 }
224 }
225 Poll::Ready(())
227 }
228}
229
230#[cfg(all(test, not(feature = "hanging_detection")))]
231mod tests {
232 use std::hint::black_box;
233
234 use tokio::time::{Duration, timeout};
235
236 use super::*;
237
238 #[tokio::test]
242 async fn ensure_dead_code_elimination() {
243 fn dead_fn() {
244 unsafe {
246 unsafe extern "C" {
247 fn trigger_link_error() -> !;
248 }
249 trigger_link_error();
250 }
251 }
252
253 let event = black_box(Event::new(|| {
254 dead_fn();
255 || {
256 dead_fn();
257 String::new()
258 }
259 }));
260 let listener = black_box(event.listen_with_note(|| {
261 dead_fn();
262 || {
263 dead_fn();
264 String::new()
265 }
266 }));
267
268 let _ = black_box(timeout(Duration::from_millis(10), listener)).await;
269 }
270}