1use std::{
2 fmt::{Debug, Display, Formatter},
3 future::Future,
4 mem::replace,
5 pin::Pin,
6};
7#[cfg(feature = "hanging_detection")]
8use std::{
9 sync::Arc,
10 task::{Poll, ready},
11 time::Duration,
12};
13
14use event_listener::Listener as _;
15#[cfg(feature = "hanging_detection")]
16use tokio::time::{Timeout, timeout};
17
18pub trait EventDescriptor {
19 #[cfg(feature = "hanging_detection")]
20 fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send>;
21}
22
23impl<T, InnerFn> EventDescriptor for T
24where
25 T: FnOnce() -> InnerFn,
26 InnerFn: Fn() -> String + Sync + Send + 'static,
27{
28 #[cfg(feature = "hanging_detection")]
29 fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send> {
30 Arc::new((self)())
31 }
32}
33
34#[derive(Clone)]
35pub struct EventDescription {
36 #[cfg(feature = "hanging_detection")]
37 description: Arc<dyn Fn() -> String + Sync + Send>,
38}
39
40impl EventDescription {
41 #[inline(always)]
42 pub fn new<InnerFn>(#[allow(unused_variables)] description: impl FnOnce() -> InnerFn) -> Self
43 where
44 InnerFn: Fn() -> String + Sync + Send + 'static,
45 {
46 Self {
47 #[cfg(feature = "hanging_detection")]
48 description: Arc::new((description)()),
49 }
50 }
51}
52
53impl Display for EventDescription {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 #[cfg(not(feature = "hanging_detection"))]
56 return write!(f, "");
57
58 #[cfg(feature = "hanging_detection")]
59 return write!(f, "{}", (self.description)());
60 }
61}
62
63impl EventDescriptor for EventDescription {
64 #[cfg(feature = "hanging_detection")]
65 fn get_description(self) -> Arc<dyn Fn() -> String + Sync + Send> {
66 self.description
67 }
68}
69
70pub struct Event {
71 #[cfg(feature = "hanging_detection")]
72 description: Arc<dyn Fn() -> String + Sync + Send>,
73 event: event_listener::Event,
74}
75
76impl Event {
77 #[inline(always)]
91 pub fn new(#[allow(unused_variables)] description: impl EventDescriptor) -> Self {
92 #[cfg(not(feature = "hanging_detection"))]
93 return Self {
94 event: event_listener::Event::new(),
95 };
96 #[cfg(feature = "hanging_detection")]
97 return Self {
98 description: description.get_description(),
99 event: event_listener::Event::new(),
100 };
101 }
102
103 pub fn listen(&self) -> EventListener {
105 #[cfg(not(feature = "hanging_detection"))]
106 return EventListener {
107 listener: self.event.listen(),
108 };
109 #[cfg(feature = "hanging_detection")]
110 return EventListener {
111 description: self.description.clone(),
112 note: Arc::new(String::new),
113 future: Some(Box::pin(timeout(
114 Duration::from_secs(30),
115 self.event.listen(),
116 ))),
117 duration: Duration::from_secs(30),
118 };
119 }
120
121 pub fn listen_with_note(&self, _note: impl EventDescriptor) -> EventListener {
134 #[cfg(not(feature = "hanging_detection"))]
135 return EventListener {
136 listener: self.event.listen(),
137 };
138 #[cfg(feature = "hanging_detection")]
139 return EventListener {
140 description: self.description.clone(),
141 note: _note.get_description(),
142 future: Some(Box::pin(timeout(
143 Duration::from_secs(30),
144 self.event.listen(),
145 ))),
146 duration: Duration::from_secs(30),
147 };
148 }
149
150 pub fn take(&mut self) -> Event {
152 #[cfg(not(feature = "hanging_detection"))]
153 return Self {
154 event: replace(&mut self.event, event_listener::Event::new()),
155 };
156 #[cfg(feature = "hanging_detection")]
157 return Self {
158 description: self.description.clone(),
159 event: replace(&mut self.event, event_listener::Event::new()),
160 };
161 }
162}
163
164impl Event {
165 pub fn notify(&self, n: usize) {
167 self.event.notify(n);
168 }
169}
170
171impl Debug for Event {
172 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
173 let mut t = f.debug_tuple("Event");
174 #[cfg(feature = "hanging_detection")]
175 t.field(&(self.description)());
176 t.finish()
177 }
178}
179
180#[cfg(not(feature = "hanging_detection"))]
181pub struct EventListener {
182 listener: event_listener::EventListener,
183}
184
185#[cfg(not(feature = "hanging_detection"))]
186impl Debug for EventListener {
187 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
188 f.debug_tuple("EventListener").finish()
189 }
190}
191
192#[cfg(not(feature = "hanging_detection"))]
193impl Future for EventListener {
194 type Output = ();
195
196 fn poll(
197 self: Pin<&mut Self>,
198 cx: &mut std::task::Context<'_>,
199 ) -> std::task::Poll<Self::Output> {
200 let listener = unsafe { self.map_unchecked_mut(|s| &mut s.listener) };
201 listener.poll(cx)
202 }
203}
204
205#[cfg(not(feature = "hanging_detection"))]
206impl EventListener {
207 pub fn wait(self) {
212 self.listener.wait();
213 }
214}
215
216#[cfg(feature = "hanging_detection")]
217pub struct EventListener {
218 description: Arc<dyn Fn() -> String + Sync + Send>,
219 note: Arc<dyn Fn() -> String + Sync + Send>,
220 future: Option<std::pin::Pin<Box<Timeout<event_listener::EventListener>>>>,
223 duration: Duration,
224}
225
226#[cfg(feature = "hanging_detection")]
227impl Debug for EventListener {
228 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229 let mut t = f.debug_tuple("EventListener");
230 t.field(&(self.description)());
231 let note = (self.note)();
232 if !note.is_empty() {
233 t.field(¬e);
234 }
235 t.finish()
236 }
237}
238
239#[cfg(feature = "hanging_detection")]
240impl Future for EventListener {
241 type Output = ();
242
243 fn poll(
244 mut self: Pin<&mut Self>,
245 cx: &mut std::task::Context<'_>,
246 ) -> std::task::Poll<Self::Output> {
247 while let Some(future) = self.future.as_mut() {
248 match ready!(future.as_mut().poll(cx)) {
249 Ok(_) => {
250 self.future = None;
251 return Poll::Ready(());
252 }
253 Err(_) => {
254 let note = (self.note)();
255 let description = (self.description)();
256 if note.is_empty() {
257 eprintln!(
258 "EventListener({}) is potentially hanging, waiting for {}s",
259 description,
260 self.duration.as_secs(),
261 );
262 } else {
263 eprintln!(
264 "EventListener({}) is potentially hanging, waiting for {}s from {}",
265 description,
266 self.duration.as_secs(),
267 note
268 );
269 }
270 self.duration *= 2;
271 let future = self.future.take().unwrap();
274 self.future = Some(Box::pin(timeout(
275 self.duration,
276 unsafe { std::pin::Pin::into_inner_unchecked(future) }.into_inner(),
279 )));
280 }
281 }
282 }
283 Poll::Ready(())
285 }
286}
287
288#[cfg(feature = "hanging_detection")]
289impl EventListener {
290 pub fn wait(mut self) {
295 if let Some(future) = self.future.take() {
296 unsafe { std::pin::Pin::into_inner_unchecked(future) }
298 .into_inner()
299 .wait();
300 }
301 }
302}
303
304#[cfg(all(test, not(feature = "hanging_detection")))]
305mod tests {
306 use std::hint::black_box;
307
308 use tokio::time::{Duration, timeout};
309
310 use super::*;
311
312 #[tokio::test]
316 async fn ensure_dead_code_elimination() {
317 fn dead_fn() {
318 unsafe {
320 unsafe extern "C" {
321 fn trigger_link_error() -> !;
322 }
323 trigger_link_error();
324 }
325 }
326
327 let event = black_box(Event::new(|| {
328 dead_fn();
329 || {
330 dead_fn();
331 String::new()
332 }
333 }));
334 let listener = black_box(event.listen_with_note(|| {
335 dead_fn();
336 || {
337 dead_fn();
338 String::new()
339 }
340 }));
341
342 let _ = black_box(timeout(Duration::from_millis(10), listener)).await;
343 }
344}