1use std::{any::Any, collections::VecDeque, fmt::Display, sync::Arc, time::Duration};
2
3use dashmap::DashMap;
4use serde::Serialize;
5use tokio::sync::{Mutex, mpsc};
6
7pub trait CompilationEvent: Sync + Send + Any {
8 fn type_name(&self) -> &'static str;
9 fn severity(&self) -> Severity;
10 fn message(&self) -> String;
11 fn to_json(&self) -> String;
12}
13
14const MAX_QUEUE_SIZE: usize = 256;
15
16type ArcMx<T> = Arc<Mutex<T>>;
17type CompilationEventChannel = mpsc::Sender<Arc<dyn CompilationEvent>>;
18
19#[derive(Debug, Clone, Eq, PartialEq, Hash)]
20enum EventChannelType {
21 Global,
22 Type(String),
23}
24
25pub struct CompilationEventQueue {
26 event_history: ArcMx<VecDeque<Arc<dyn CompilationEvent>>>,
27 subscribers: Arc<DashMap<EventChannelType, Vec<CompilationEventChannel>>>,
28}
29
30impl Default for CompilationEventQueue {
31 fn default() -> Self {
32 let subscribers = DashMap::new();
33 subscribers.insert(
34 EventChannelType::Global,
35 Vec::<CompilationEventChannel>::new(),
36 );
37
38 Self {
39 event_history: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_QUEUE_SIZE))),
40 subscribers: Arc::new(subscribers),
41 }
42 }
43}
44
45impl CompilationEventQueue {
46 pub fn send(
47 &self,
48 message: Arc<dyn CompilationEvent>,
49 ) -> Result<(), mpsc::error::SendError<Arc<dyn CompilationEvent>>> {
50 let event_history = self.event_history.clone();
51 let subscribers = self.subscribers.clone();
52 let message_clone = message.clone();
53
54 tokio::spawn(async move {
56 let mut history = event_history.lock().await;
58 if history.len() >= MAX_QUEUE_SIZE {
59 history.pop_front();
60 }
61 history.push_back(message_clone.clone());
62
63 if let Some(mut type_subscribers) = subscribers.get_mut(&EventChannelType::Type(
65 message_clone.type_name().to_owned(),
66 )) {
67 let mut removal_indices = Vec::new();
68 for (ix, sender) in type_subscribers.iter().enumerate() {
69 if sender.send(message_clone.clone()).await.is_err() {
70 removal_indices.push(ix);
71 }
72 }
73
74 for ix in removal_indices.iter().rev() {
75 type_subscribers.remove(*ix);
76 }
77 }
78
79 let mut all_channel = subscribers.get_mut(&EventChannelType::Global).unwrap();
81 let mut removal_indices = Vec::new();
82 for (ix, sender) in all_channel.iter_mut().enumerate() {
83 if sender.send(message_clone.clone()).await.is_err() {
84 removal_indices.push(ix);
85 }
86 }
87
88 for ix in removal_indices.iter().rev() {
89 all_channel.remove(*ix);
90 }
91 });
92
93 Ok(())
94 }
95
96 pub fn subscribe(
97 &self,
98 event_types: Option<Vec<String>>,
99 ) -> mpsc::Receiver<Arc<dyn CompilationEvent>> {
100 let (tx, rx) = mpsc::channel(MAX_QUEUE_SIZE);
101 let subscribers = self.subscribers.clone();
102 let event_history = self.event_history.clone();
103 let tx_clone = tx.clone();
104
105 tokio::spawn(async move {
107 if let Some(event_types) = event_types {
109 for event_type in event_types.iter() {
110 let mut type_subscribers = subscribers
111 .entry(EventChannelType::Type(event_type.clone()))
112 .or_default();
113 type_subscribers.push(tx_clone.clone());
114 }
115
116 for event in event_history.lock().await.iter() {
117 if event_types.contains(&event.type_name().to_string()) {
118 let _ = tx_clone.send(event.clone()).await;
119 }
120 }
121 } else {
122 let mut global_subscribers =
123 subscribers.entry(EventChannelType::Global).or_default();
124 global_subscribers.push(tx_clone.clone());
125
126 for event in event_history.lock().await.iter() {
127 let _ = tx_clone.send(event.clone()).await;
128 }
129 }
130 });
131
132 rx
133 }
134}
135
136#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize)]
137pub enum Severity {
138 Info,
139 Trace,
140 Warning,
141 Error,
142 Fatal,
143 Event,
144}
145
146impl Display for Severity {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 match self {
149 Severity::Info => write!(f, "INFO"),
150 Severity::Trace => write!(f, "TRACE"),
151 Severity::Warning => write!(f, "WARNING"),
152 Severity::Error => write!(f, "ERROR"),
153 Severity::Fatal => write!(f, "FATAL"),
154 Severity::Event => write!(f, "EVENT"),
155 }
156 }
157}
158
159#[derive(Debug, Clone, Serialize)]
160pub struct TimingEvent {
162 pub message: String,
171 pub duration: Duration,
173}
174
175impl TimingEvent {
176 pub fn new(message: String, duration: Duration) -> Self {
177 Self { message, duration }
178 }
179}
180
181impl CompilationEvent for TimingEvent {
182 fn type_name(&self) -> &'static str {
183 "TimingEvent"
184 }
185
186 fn severity(&self) -> Severity {
187 Severity::Event
188 }
189
190 fn message(&self) -> String {
191 let duration_secs = self.duration.as_secs_f64();
192 let duration_string = if duration_secs > 120.0 {
193 format!("{:.1}min", duration_secs / 60.0)
194 } else if duration_secs > 40.0 {
195 format!("{duration_secs:.0}s")
196 } else if duration_secs > 2.0 {
197 format!("{duration_secs:.1}s")
198 } else {
199 format!("{:.0}ms", duration_secs * 1000.0)
200 };
201 format!("{} in {}", self.message, duration_string)
202 }
203
204 fn to_json(&self) -> String {
205 serde_json::to_string(self).unwrap()
206 }
207}
208
209#[derive(Debug, Clone, Serialize)]
210pub struct DiagnosticEvent {
211 pub message: String,
212 pub severity: Severity,
213}
214
215impl DiagnosticEvent {
216 pub fn new(severity: Severity, message: String) -> Self {
217 Self { message, severity }
218 }
219}
220
221impl CompilationEvent for DiagnosticEvent {
222 fn type_name(&self) -> &'static str {
223 "DiagnosticEvent"
224 }
225
226 fn severity(&self) -> Severity {
227 self.severity
228 }
229
230 fn message(&self) -> String {
231 self.message.clone()
232 }
233
234 fn to_json(&self) -> String {
235 serde_json::to_string(self).unwrap()
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242
243 #[test]
244 fn test_timing_event_string_formatting() {
245 let tests = vec![
246 (Duration::from_nanos(1588), "0ms"),
247 (Duration::from_nanos(1022616), "1ms"),
248 (Duration::from_millis(100), "100ms"),
249 (Duration::from_millis(1000), "1000ms"),
250 (Duration::from_millis(10000), "10.0s"),
251 (Duration::from_millis(20381), "20.4s"),
252 (Duration::from_secs(60), "60s"),
253 (Duration::from_secs(100), "100s"),
254 (Duration::from_secs(125), "2.1min"),
255 ];
256
257 for (duration, expected) in tests {
258 let event = TimingEvent::new("Compiled successfully".to_string(), duration);
259 assert_eq!(
260 event.message(),
261 format!("Compiled successfully in {expected}")
262 );
263 }
264 }
265}