turbo_tasks/
message_queue.rs1use std::{any::Any, collections::VecDeque, fmt::Display, sync::Arc};
2
3use dashmap::DashMap;
4use tokio::sync::{Mutex, mpsc};
5
6pub trait CompilationEvent: Sync + Send + Any {
7 fn type_name(&self) -> &'static str;
8 fn severity(&self) -> Severity;
9 fn message(&self) -> String;
10 fn to_json(&self) -> String;
11}
12
13const MAX_QUEUE_SIZE: usize = 256;
14
15type ArcMx<T> = Arc<Mutex<T>>;
16type CompilationEventChannel = mpsc::Sender<Arc<dyn CompilationEvent>>;
17
18pub struct CompilationEventQueue {
19 event_history: ArcMx<VecDeque<Arc<dyn CompilationEvent>>>,
20 subscribers: Arc<DashMap<String, Vec<CompilationEventChannel>>>,
21}
22
23impl Default for CompilationEventQueue {
24 fn default() -> Self {
25 let subscribers = DashMap::new();
26 subscribers.insert("*".to_owned(), Vec::<CompilationEventChannel>::new());
27
28 Self {
29 event_history: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_QUEUE_SIZE))),
30 subscribers: Arc::new(subscribers),
31 }
32 }
33}
34
35impl CompilationEventQueue {
36 pub fn send(
37 &self,
38 message: Arc<dyn CompilationEvent>,
39 ) -> Result<(), mpsc::error::SendError<Arc<dyn CompilationEvent>>> {
40 let event_history = self.event_history.clone();
41 let subscribers = self.subscribers.clone();
42 let message_clone = message.clone();
43
44 tokio::spawn(async move {
46 let mut history = event_history.lock().await;
48 if history.len() >= MAX_QUEUE_SIZE {
49 history.pop_front();
50 }
51 history.push_back(message_clone.clone());
52
53 if let Some(mut type_subscribers) = subscribers.get_mut(message_clone.type_name()) {
55 let mut removal_indices = Vec::new();
56 for (ix, sender) in type_subscribers.iter().enumerate() {
57 if sender.send(message_clone.clone()).await.is_err() {
58 removal_indices.push(ix);
59 }
60 }
61
62 for ix in removal_indices.iter().rev() {
63 type_subscribers.remove(*ix);
64 }
65 }
66
67 let mut all_channel = subscribers.get_mut("*").unwrap();
69 let mut removal_indices = Vec::new();
70 for (ix, sender) in all_channel.iter_mut().enumerate() {
71 if sender.send(message_clone.clone()).await.is_err() {
72 removal_indices.push(ix);
73 }
74 }
75
76 for ix in removal_indices.iter().rev() {
77 all_channel.remove(*ix);
78 }
79 });
80
81 Ok(())
82 }
83
84 pub fn subscribe(
85 &self,
86 event_types: Option<Vec<String>>,
87 ) -> mpsc::Receiver<Arc<dyn CompilationEvent>> {
88 let (tx, rx) = mpsc::channel(MAX_QUEUE_SIZE);
89 let subscribers = self.subscribers.clone();
90 let event_history = self.event_history.clone();
91 let tx_clone = tx.clone();
92
93 tokio::spawn(async move {
95 if let Some(event_types) = event_types {
97 for event_type in event_types.iter() {
98 let mut type_subscribers = subscribers.entry(event_type.clone()).or_default();
99 type_subscribers.push(tx_clone.clone());
100 }
101
102 for event in event_history.lock().await.iter() {
103 if event_types.contains(&event.type_name().to_string()) {
104 let _ = tx_clone.send(event.clone()).await;
105 }
106 }
107 } else {
108 let mut global_subscribers = subscribers.entry("*".to_string()).or_default();
109 global_subscribers.push(tx_clone.clone());
110
111 for event in event_history.lock().await.iter() {
112 let _ = tx_clone.send(event.clone()).await;
113 }
114 }
115 });
116
117 rx
118 }
119}
120
121#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
122pub enum Severity {
123 Info,
124 Trace,
125 Warning,
126 Error,
127 Fatal,
128 Event,
129}
130
131impl Display for Severity {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 match self {
134 Severity::Info => write!(f, "INFO"),
135 Severity::Trace => write!(f, "TRACE"),
136 Severity::Warning => write!(f, "WARNING"),
137 Severity::Error => write!(f, "ERROR"),
138 Severity::Fatal => write!(f, "FATAL"),
139 Severity::Event => write!(f, "EVENT"),
140 }
141 }
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145pub struct DiagnosticEvent {
146 pub message: String,
147 pub severity: Severity,
148}
149
150impl DiagnosticEvent {
151 pub fn new(message: String, severity: Severity) -> Self {
152 Self { message, severity }
153 }
154}
155
156impl CompilationEvent for DiagnosticEvent {
157 fn type_name(&self) -> &'static str {
158 "DiagnosticEvent"
159 }
160
161 fn severity(&self) -> Severity {
162 self.severity
163 }
164
165 fn message(&self) -> String {
166 self.message.clone()
167 }
168
169 fn to_json(&self) -> String {
170 serde_json::to_string(self).unwrap()
171 }
172}