turbo_tasks/
message_queue.rs

1use std::{any::Any, collections::VecDeque, fmt::Display, sync::Arc};
2
3use dashmap::DashMap;
4use tokio::sync::{Mutex, mpsc};
5
6pub trait CompilationEvent: Sync + Send + Any {
7    fn type_name(&self) -> &'static str;
8    fn severity(&self) -> Severity;
9    fn message(&self) -> String;
10    fn to_json(&self) -> String;
11}
12
13const MAX_QUEUE_SIZE: usize = 256;
14
15type ArcMx<T> = Arc<Mutex<T>>;
16type CompilationEventChannel = mpsc::Sender<Arc<dyn CompilationEvent>>;
17
18pub struct CompilationEventQueue {
19    event_history: ArcMx<VecDeque<Arc<dyn CompilationEvent>>>,
20    subscribers: Arc<DashMap<String, Vec<CompilationEventChannel>>>,
21}
22
23impl Default for CompilationEventQueue {
24    fn default() -> Self {
25        let subscribers = DashMap::new();
26        subscribers.insert("*".to_owned(), Vec::<CompilationEventChannel>::new());
27
28        Self {
29            event_history: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_QUEUE_SIZE))),
30            subscribers: Arc::new(subscribers),
31        }
32    }
33}
34
35impl CompilationEventQueue {
36    pub fn send(
37        &self,
38        message: Arc<dyn CompilationEvent>,
39    ) -> Result<(), mpsc::error::SendError<Arc<dyn CompilationEvent>>> {
40        let event_history = self.event_history.clone();
41        let subscribers = self.subscribers.clone();
42        let message_clone = message.clone();
43
44        // Spawn a task to handle the async operations
45        tokio::spawn(async move {
46            // Store the message in history
47            let mut history = event_history.lock().await;
48            if history.len() >= MAX_QUEUE_SIZE {
49                history.pop_front();
50            }
51            history.push_back(message_clone.clone());
52
53            // Send to all active receivers of the same message type
54            if let Some(mut type_subscribers) = subscribers.get_mut(message_clone.type_name()) {
55                let mut removal_indices = Vec::new();
56                for (ix, sender) in type_subscribers.iter().enumerate() {
57                    if sender.send(message_clone.clone()).await.is_err() {
58                        removal_indices.push(ix);
59                    }
60                }
61
62                for ix in removal_indices.iter().rev() {
63                    type_subscribers.remove(*ix);
64                }
65            }
66
67            // Send to all global message subscribers
68            let mut all_channel = subscribers.get_mut("*").unwrap();
69            let mut removal_indices = Vec::new();
70            for (ix, sender) in all_channel.iter_mut().enumerate() {
71                if sender.send(message_clone.clone()).await.is_err() {
72                    removal_indices.push(ix);
73                }
74            }
75
76            for ix in removal_indices.iter().rev() {
77                all_channel.remove(*ix);
78            }
79        });
80
81        Ok(())
82    }
83
84    pub fn subscribe(
85        &self,
86        event_types: Option<Vec<String>>,
87    ) -> mpsc::Receiver<Arc<dyn CompilationEvent>> {
88        let (tx, rx) = mpsc::channel(MAX_QUEUE_SIZE);
89        let subscribers = self.subscribers.clone();
90        let event_history = self.event_history.clone();
91        let tx_clone = tx.clone();
92
93        // Spawn a task to handle the async operations
94        tokio::spawn(async move {
95            // Store the sender
96            if let Some(event_types) = event_types {
97                for event_type in event_types.iter() {
98                    let mut type_subscribers = subscribers.entry(event_type.clone()).or_default();
99                    type_subscribers.push(tx_clone.clone());
100                }
101
102                for event in event_history.lock().await.iter() {
103                    if event_types.contains(&event.type_name().to_string()) {
104                        let _ = tx_clone.send(event.clone()).await;
105                    }
106                }
107            } else {
108                let mut global_subscribers = subscribers.entry("*".to_string()).or_default();
109                global_subscribers.push(tx_clone.clone());
110
111                for event in event_history.lock().await.iter() {
112                    let _ = tx_clone.send(event.clone()).await;
113                }
114            }
115        });
116
117        rx
118    }
119}
120
121#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
122pub enum Severity {
123    Info,
124    Trace,
125    Warning,
126    Error,
127    Fatal,
128    Event,
129}
130
131impl Display for Severity {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        match self {
134            Severity::Info => write!(f, "INFO"),
135            Severity::Trace => write!(f, "TRACE"),
136            Severity::Warning => write!(f, "WARNING"),
137            Severity::Error => write!(f, "ERROR"),
138            Severity::Fatal => write!(f, "FATAL"),
139            Severity::Event => write!(f, "EVENT"),
140        }
141    }
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145pub struct DiagnosticEvent {
146    pub message: String,
147    pub severity: Severity,
148}
149
150impl DiagnosticEvent {
151    pub fn new(message: String, severity: Severity) -> Self {
152        Self { message, severity }
153    }
154}
155
156impl CompilationEvent for DiagnosticEvent {
157    fn type_name(&self) -> &'static str {
158        "DiagnosticEvent"
159    }
160
161    fn severity(&self) -> Severity {
162        self.severity
163    }
164
165    fn message(&self) -> String {
166        self.message.clone()
167    }
168
169    fn to_json(&self) -> String {
170        serde_json::to_string(self).unwrap()
171    }
172}