1#[cfg(feature = "hanging_detection")]
2use std::sync::Arc;
3#[cfg(feature = "hanging_detection")]
4use std::task::Poll;
5#[cfg(feature = "hanging_detection")]
6use std::task::ready;
7#[cfg(feature = "hanging_detection")]
8use std::time::Duration;
9use std::{
10 fmt::{Debug, Formatter},
11 future::Future,
12 mem::replace,
13 pin::Pin,
14};
15
16#[cfg(feature = "hanging_detection")]
17use tokio::time::Timeout;
18#[cfg(feature = "hanging_detection")]
19use tokio::time::timeout;
20
21pub struct Event {
22 #[cfg(feature = "hanging_detection")]
23 description: Arc<dyn Fn() -> String + Sync + Send>,
24 event: event_listener::Event,
25}
26
27#[cfg(not(feature = "hanging_detection"))]
28impl Event {
29 #[inline(always)]
31 pub fn new(_description: impl Fn() -> String + Sync + Send + 'static) -> Self {
32 Self {
33 event: event_listener::Event::new(),
34 }
35 }
36
37 pub fn listen(&self) -> EventListener {
39 EventListener {
40 listener: self.event.listen(),
41 }
42 }
43
44 pub fn listen_with_note(
46 &self,
47 _note: impl Fn() -> String + Sync + Send + 'static,
48 ) -> EventListener {
49 EventListener {
50 listener: self.event.listen(),
51 }
52 }
53
54 pub fn take(&mut self) -> Self {
56 Self {
57 event: replace(&mut self.event, event_listener::Event::new()),
58 }
59 }
60}
61
62#[cfg(feature = "hanging_detection")]
63impl Event {
64 #[inline(always)]
66 pub fn new(description: impl Fn() -> String + Sync + Send + 'static) -> Self {
67 Self {
68 description: Arc::new(description),
69 event: event_listener::Event::new(),
70 }
71 }
72
73 pub fn listen(&self) -> EventListener {
75 EventListener {
76 description: self.description.clone(),
77 note: Arc::new(|| String::new()),
78 future: Some(Box::pin(timeout(
79 Duration::from_secs(10),
80 self.event.listen(),
81 ))),
82 duration: Duration::from_secs(10),
83 }
84 }
85
86 pub fn listen_with_note(
88 &self,
89 note: impl Fn() -> String + Sync + Send + 'static,
90 ) -> EventListener {
91 EventListener {
92 description: self.description.clone(),
93 note: Arc::new(note),
94 future: Some(Box::pin(timeout(
95 Duration::from_secs(10),
96 self.event.listen(),
97 ))),
98 duration: Duration::from_secs(10),
99 }
100 }
101
102 pub fn take(&mut self) -> Event {
104 Self {
105 description: self.description.clone(),
106 event: replace(&mut self.event, event_listener::Event::new()),
107 }
108 }
109}
110
111impl Event {
112 pub fn notify(&self, n: usize) {
114 self.event.notify(n);
115 }
116}
117
118impl Debug for Event {
119 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120 let mut t = f.debug_tuple("Event");
121 #[cfg(feature = "hanging_detection")]
122 t.field(&(self.description)());
123 t.finish()
124 }
125}
126
127#[cfg(not(feature = "hanging_detection"))]
128pub struct EventListener {
129 listener: event_listener::EventListener,
130}
131
132#[cfg(not(feature = "hanging_detection"))]
133impl Debug for EventListener {
134 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
135 f.debug_tuple("EventListener").finish()
136 }
137}
138
139#[cfg(not(feature = "hanging_detection"))]
140impl Future for EventListener {
141 type Output = ();
142
143 fn poll(
144 self: Pin<&mut Self>,
145 cx: &mut std::task::Context<'_>,
146 ) -> std::task::Poll<Self::Output> {
147 let listener = unsafe { self.map_unchecked_mut(|s| &mut s.listener) };
148 listener.poll(cx)
149 }
150}
151
152#[cfg(feature = "hanging_detection")]
153pub struct EventListener {
154 description: Arc<dyn Fn() -> String + Sync + Send>,
155 note: Arc<dyn Fn() -> String + Sync + Send>,
156 future: Option<std::pin::Pin<Box<Timeout<event_listener::EventListener>>>>,
159 duration: Duration,
160}
161
162#[cfg(feature = "hanging_detection")]
163impl Debug for EventListener {
164 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165 let mut t = f.debug_tuple("EventListener");
166 t.field(&(self.description)());
167 let note = (self.note)();
168 if !note.is_empty() {
169 t.field(¬e);
170 }
171 t.finish()
172 }
173}
174
175#[cfg(feature = "hanging_detection")]
176impl Future for EventListener {
177 type Output = ();
178
179 fn poll(
180 mut self: Pin<&mut Self>,
181 cx: &mut std::task::Context<'_>,
182 ) -> std::task::Poll<Self::Output> {
183 while let Some(future) = self.future.as_mut() {
184 match ready!(future.as_mut().poll(cx)) {
185 Ok(_) => {
186 self.future = None;
187 return Poll::Ready(());
188 }
189 Err(_) => {
190 use crate::util::FormatDuration;
191 eprintln!(
192 "{:?} is potentially hanging (waiting for {})",
193 self,
194 FormatDuration(self.duration)
195 );
196 self.duration *= 2;
197 let future = self.future.take().unwrap();
200 self.future = Some(Box::pin(timeout(
201 self.duration,
202 unsafe { std::pin::Pin::into_inner_unchecked(future) }.into_inner(),
205 )));
206 }
207 }
208 }
209 Poll::Ready(())
211 }
212}