turbo_tasks/
message_queue.rs

1use std::{any::Any, collections::VecDeque, fmt::Display, sync::Arc, time::Duration};
2
3use dashmap::DashMap;
4use serde::Serialize;
5use tokio::sync::{Mutex, mpsc};
6
7pub trait CompilationEvent: Sync + Send + Any {
8    fn type_name(&self) -> &'static str;
9    fn severity(&self) -> Severity;
10    fn message(&self) -> String;
11    fn to_json(&self) -> String;
12}
13
14const MAX_QUEUE_SIZE: usize = 256;
15
16type ArcMx<T> = Arc<Mutex<T>>;
17type CompilationEventChannel = mpsc::Sender<Arc<dyn CompilationEvent>>;
18
19#[derive(Debug, Clone, Eq, PartialEq, Hash)]
20enum EventChannelType {
21    Global,
22    Type(String),
23}
24
25pub struct CompilationEventQueue {
26    event_history: ArcMx<VecDeque<Arc<dyn CompilationEvent>>>,
27    subscribers: Arc<DashMap<EventChannelType, Vec<CompilationEventChannel>>>,
28}
29
30impl Default for CompilationEventQueue {
31    fn default() -> Self {
32        let subscribers = DashMap::new();
33        subscribers.insert(
34            EventChannelType::Global,
35            Vec::<CompilationEventChannel>::new(),
36        );
37
38        Self {
39            event_history: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_QUEUE_SIZE))),
40            subscribers: Arc::new(subscribers),
41        }
42    }
43}
44
45impl CompilationEventQueue {
46    pub fn send(
47        &self,
48        message: Arc<dyn CompilationEvent>,
49    ) -> Result<(), mpsc::error::SendError<Arc<dyn CompilationEvent>>> {
50        let event_history = self.event_history.clone();
51        let subscribers = self.subscribers.clone();
52        let message_clone = message.clone();
53
54        // Spawn a task to handle the async operations
55        tokio::spawn(async move {
56            // Store the message in history
57            let mut history = event_history.lock().await;
58            if history.len() >= MAX_QUEUE_SIZE {
59                history.pop_front();
60            }
61            history.push_back(message_clone.clone());
62
63            // Send to all active receivers of the same message type
64            if let Some(mut type_subscribers) = subscribers.get_mut(&EventChannelType::Type(
65                message_clone.type_name().to_owned(),
66            )) {
67                let mut removal_indices = Vec::new();
68                for (ix, sender) in type_subscribers.iter().enumerate() {
69                    if sender.send(message_clone.clone()).await.is_err() {
70                        removal_indices.push(ix);
71                    }
72                }
73
74                for ix in removal_indices.iter().rev() {
75                    type_subscribers.remove(*ix);
76                }
77            }
78
79            // Send to all global message subscribers
80            let mut all_channel = subscribers.get_mut(&EventChannelType::Global).unwrap();
81            let mut removal_indices = Vec::new();
82            for (ix, sender) in all_channel.iter_mut().enumerate() {
83                if sender.send(message_clone.clone()).await.is_err() {
84                    removal_indices.push(ix);
85                }
86            }
87
88            for ix in removal_indices.iter().rev() {
89                all_channel.remove(*ix);
90            }
91        });
92
93        Ok(())
94    }
95
96    pub fn subscribe(
97        &self,
98        event_types: Option<Vec<String>>,
99    ) -> mpsc::Receiver<Arc<dyn CompilationEvent>> {
100        let (tx, rx) = mpsc::channel(MAX_QUEUE_SIZE);
101        let subscribers = self.subscribers.clone();
102        let event_history = self.event_history.clone();
103        let tx_clone = tx.clone();
104
105        // Spawn a task to handle the async operations
106        tokio::spawn(async move {
107            // Store the sender
108            if let Some(event_types) = event_types {
109                for event_type in event_types.iter() {
110                    let mut type_subscribers = subscribers
111                        .entry(EventChannelType::Type(event_type.clone()))
112                        .or_default();
113                    type_subscribers.push(tx_clone.clone());
114                }
115
116                for event in event_history.lock().await.iter() {
117                    if event_types.contains(&event.type_name().to_string()) {
118                        let _ = tx_clone.send(event.clone()).await;
119                    }
120                }
121            } else {
122                let mut global_subscribers =
123                    subscribers.entry(EventChannelType::Global).or_default();
124                global_subscribers.push(tx_clone.clone());
125
126                for event in event_history.lock().await.iter() {
127                    let _ = tx_clone.send(event.clone()).await;
128                }
129            }
130        });
131
132        rx
133    }
134}
135
136#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize)]
137pub enum Severity {
138    Info,
139    Trace,
140    Warning,
141    Error,
142    Fatal,
143    Event,
144}
145
146impl Display for Severity {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        match self {
149            Severity::Info => write!(f, "INFO"),
150            Severity::Trace => write!(f, "TRACE"),
151            Severity::Warning => write!(f, "WARNING"),
152            Severity::Error => write!(f, "ERROR"),
153            Severity::Fatal => write!(f, "FATAL"),
154            Severity::Event => write!(f, "EVENT"),
155        }
156    }
157}
158
159#[derive(Debug, Clone, Serialize)]
160/// Compilation event that is used to log the duration of a task
161pub struct TimingEvent {
162    /// Message of the event without the timing information
163    ///
164    /// Example:
165    /// ```rust
166    /// let event = TimingEvent::new("Compiled successfully".to_string(), Duration::from_millis(100));
167    /// let message = event.message();
168    /// assert_eq!(message, "Compiled successfully in 100ms");
169    /// ```
170    pub message: String,
171    /// Duration in milliseconds
172    pub duration: Duration,
173}
174
175impl TimingEvent {
176    pub fn new(message: String, duration: Duration) -> Self {
177        Self { message, duration }
178    }
179}
180
181impl CompilationEvent for TimingEvent {
182    fn type_name(&self) -> &'static str {
183        "TimingEvent"
184    }
185
186    fn severity(&self) -> Severity {
187        Severity::Event
188    }
189
190    fn message(&self) -> String {
191        let duration_secs = self.duration.as_secs_f64();
192        let duration_string = if duration_secs > 120.0 {
193            format!("{:.1}min", duration_secs / 60.0)
194        } else if duration_secs > 40.0 {
195            format!("{duration_secs:.0}s")
196        } else if duration_secs > 2.0 {
197            format!("{duration_secs:.1}s")
198        } else {
199            format!("{:.0}ms", duration_secs * 1000.0)
200        };
201        format!("{} in {}", self.message, duration_string)
202    }
203
204    fn to_json(&self) -> String {
205        serde_json::to_string(self).unwrap()
206    }
207}
208
209#[derive(Debug, Clone, Serialize)]
210pub struct DiagnosticEvent {
211    pub message: String,
212    pub severity: Severity,
213}
214
215impl DiagnosticEvent {
216    pub fn new(severity: Severity, message: String) -> Self {
217        Self { message, severity }
218    }
219}
220
221impl CompilationEvent for DiagnosticEvent {
222    fn type_name(&self) -> &'static str {
223        "DiagnosticEvent"
224    }
225
226    fn severity(&self) -> Severity {
227        self.severity
228    }
229
230    fn message(&self) -> String {
231        self.message.clone()
232    }
233
234    fn to_json(&self) -> String {
235        serde_json::to_string(self).unwrap()
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    #[test]
244    fn test_timing_event_string_formatting() {
245        let tests = vec![
246            (Duration::from_nanos(1588), "0ms"),
247            (Duration::from_nanos(1022616), "1ms"),
248            (Duration::from_millis(100), "100ms"),
249            (Duration::from_millis(1000), "1000ms"),
250            (Duration::from_millis(10000), "10.0s"),
251            (Duration::from_millis(20381), "20.4s"),
252            (Duration::from_secs(60), "60s"),
253            (Duration::from_secs(100), "100s"),
254            (Duration::from_secs(125), "2.1min"),
255        ];
256
257        for (duration, expected) in tests {
258            let event = TimingEvent::new("Compiled successfully".to_string(), duration);
259            assert_eq!(
260                event.message(),
261                format!("Compiled successfully in {expected}")
262            );
263        }
264    }
265}