1use std::{
2 net::{SocketAddr, SocketAddrV4, TcpListener, TcpStream},
3 sync::{Arc, Mutex},
4 thread::spawn,
5};
6
7use anyhow::{Result, bail};
8use serde::{Deserialize, Serialize};
9use tungstenite::{Message, accept};
10
11use crate::{
12 store::SpanId,
13 store_container::StoreContainer,
14 timestamp::Timestamp,
15 u64_string,
16 viewer::{Update, ViewLineUpdate, ViewMode, Viewer},
17};
18
19#[derive(Serialize, Deserialize, Debug)]
20#[serde(tag = "type")]
21#[serde(rename_all = "kebab-case")]
22pub enum ServerToClientMessage {
23 ViewLine {
24 #[serde(flatten)]
25 update: ViewLineUpdate,
26 },
27 ViewLinesCount {
28 count: usize,
29 max: u64,
30 },
31 #[serde(rename_all = "camelCase")]
32 QueryResult {
33 #[serde(with = "u64_string")]
34 id: SpanId,
35 is_graph: bool,
36 start: Timestamp,
37 end: Timestamp,
38 duration: Timestamp,
39 cpu: Timestamp,
40 allocations: u64,
41 deallocations: u64,
42 allocation_count: u64,
43 persistent_allocations: u64,
44 args: Vec<(String, String)>,
45 path: Vec<String>,
46 },
47}
48
49#[derive(Serialize, Deserialize, Debug)]
50#[serde(tag = "type")]
51#[serde(rename_all = "kebab-case")]
52pub enum ClientToServerMessage {
53 #[serde(rename_all = "camelCase")]
54 ViewRect {
55 view_rect: ViewRect,
56 },
57 ViewMode {
58 #[serde(with = "u64_string")]
59 id: SpanId,
60 mode: String,
61 inherit: bool,
62 },
63 ResetViewMode {
64 #[serde(with = "u64_string")]
65 id: SpanId,
66 },
67 Query {
68 #[serde(with = "u64_string")]
69 id: SpanId,
70 },
71 Ack,
72 CheckForMoreData,
73}
74
75#[derive(Serialize, Deserialize, Debug)]
76#[serde(rename_all = "camelCase")]
77pub struct SpanViewEvent {
78 pub start: Timestamp,
79 pub duration: Timestamp,
80 pub name: String,
81 pub id: Option<SpanId>,
82}
83
84#[derive(Serialize, Deserialize, Debug)]
85pub struct Filter {
86 pub op: Op,
87 pub value: u64,
88}
89
90#[derive(Serialize, Deserialize, Debug)]
91#[serde(rename_all = "snake_case")]
92pub enum Op {
93 Gt,
94 Lt,
95}
96
97#[derive(Serialize, Deserialize, Debug)]
98#[serde(rename_all = "camelCase")]
99pub struct ViewRect {
100 pub x: u64,
101 pub y: u64,
102 pub width: u64,
103 pub height: u64,
104 pub horizontal_pixels: u64,
105 pub query: String,
106 pub view_mode: String,
107 pub value_mode: String,
108 pub value_filter: Option<Filter>,
109 pub count_filter: Option<Filter>,
110}
111
112struct ConnectionState {
113 store: Arc<StoreContainer>,
114 viewer: Viewer,
115 view_rect: ViewRect,
116 last_update_generation: usize,
117}
118
119pub fn serve(store: Arc<StoreContainer>, port: u16) {
120 let server = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
121 std::net::Ipv4Addr::new(127, 0, 0, 1),
122 port,
123 )))
124 .unwrap();
125 for stream in server.incoming() {
126 let store = store.clone();
127
128 spawn(move || {
129 let websocket = accept(stream.unwrap()).unwrap();
130 if let Err(err) = handle_connection(websocket, store) {
131 eprintln!("Error: {err:?}");
132 }
133 });
134 }
135}
136
137fn handle_connection(
138 mut websocket: tungstenite::WebSocket<TcpStream>,
139 store: Arc<StoreContainer>,
140) -> Result<()> {
141 let state = Arc::new(Mutex::new(ConnectionState {
142 store,
143 viewer: Viewer::new(),
144 view_rect: ViewRect {
145 x: 0,
146 y: 0,
147 width: 1,
148 height: 1,
149 horizontal_pixels: 1,
150 query: String::new(),
151 view_mode: "aggregated".to_string(),
152 value_mode: "duration".to_string(),
153 count_filter: None,
154 value_filter: None,
155 },
156 last_update_generation: 0,
157 }));
158 let mut update_skipped = false;
159 let mut ready_for_update = true;
160
161 fn send_update(
162 websocket: &mut tungstenite::WebSocket<TcpStream>,
163 state: &mut ConnectionState,
164 force_send: bool,
165 ready_for_update: &mut bool,
166 update_skipped: &mut bool,
167 ) -> Result<()> {
168 if !*ready_for_update {
169 if force_send {
170 *update_skipped = true;
171 }
172 return Ok(());
173 }
174 let store = state.store.read();
175 if !force_send && state.last_update_generation == store.generation() {
176 return Ok(());
177 }
178 state.last_update_generation = store.generation();
179 let Update {
180 lines: updates,
181 max,
182 } = state.viewer.compute_update(&store, &state.view_rect);
183 let count = updates.len();
184 for update in updates {
185 let message = ServerToClientMessage::ViewLine { update };
186 let message = serde_json::to_string(&message).unwrap();
187 websocket.send(Message::Text(message))?;
188 }
189 let message = ServerToClientMessage::ViewLinesCount { count, max };
190 let message = serde_json::to_string(&message).unwrap();
191 websocket.send(Message::Text(message))?;
192 *ready_for_update = false;
193 Ok(())
194 }
195 loop {
196 match websocket.read()? {
197 Message::Frame(_frame) => {}
198 Message::Text(text) => {
199 let message: ClientToServerMessage = serde_json::from_str(&text)?;
200 let mut state = state.lock().unwrap();
201 match message {
202 ClientToServerMessage::CheckForMoreData => {
203 send_update(
204 &mut websocket,
205 &mut state,
206 false,
207 &mut ready_for_update,
208 &mut update_skipped,
209 )?;
210 }
211 ClientToServerMessage::ViewRect { view_rect } => {
212 state.view_rect = view_rect;
213 send_update(
214 &mut websocket,
215 &mut state,
216 true,
217 &mut ready_for_update,
218 &mut update_skipped,
219 )?;
220 }
221 ClientToServerMessage::ViewMode { id, mode, inherit } => {
222 let (mode, sorted) = if let Some(mode) = mode.strip_suffix("-sorted") {
223 (mode, true)
224 } else {
225 (mode.as_str(), false)
226 };
227 match mode {
228 "raw-spans" => {
229 state.viewer.set_view_mode(
230 id,
231 Some((ViewMode::RawSpans { sorted }, inherit)),
232 );
233 }
234 "aggregated" => {
235 state.viewer.set_view_mode(
236 id,
237 Some((ViewMode::Aggregated { sorted }, inherit)),
238 );
239 }
240 "bottom-up" => {
241 state.viewer.set_view_mode(
242 id,
243 Some((ViewMode::BottomUp { sorted }, inherit)),
244 );
245 }
246 "aggregated-bottom-up" => {
247 state.viewer.set_view_mode(
248 id,
249 Some((ViewMode::AggregatedBottomUp { sorted }, inherit)),
250 );
251 }
252 _ => {
253 bail!("unknown view mode: {}", mode)
254 }
255 }
256 send_update(
257 &mut websocket,
258 &mut state,
259 true,
260 &mut ready_for_update,
261 &mut update_skipped,
262 )?;
263 }
264 ClientToServerMessage::ResetViewMode { id } => {
265 state.viewer.set_view_mode(id, None);
266 send_update(
267 &mut websocket,
268 &mut state,
269 true,
270 &mut ready_for_update,
271 &mut update_skipped,
272 )?;
273 }
274 ClientToServerMessage::Query { id } => {
275 let message = {
276 let store = state.store.read();
277 if let Some((span, is_graph)) = store.span(id) {
278 let root_start = store.root_span().start();
279 let span_start = span.start() - root_start;
280 let span_end = span.end() - root_start;
281 let duration = span.corrected_total_time();
282 let cpu = span.total_time();
283 let allocations = span.total_allocations();
284 let deallocations = span.total_deallocations();
285 let allocation_count = span.total_allocation_count();
286 let persistent_allocations = span.total_persistent_allocations();
287 let args = span
288 .args()
289 .map(|(k, v)| (k.to_string(), v.to_string()))
290 .collect();
291 let mut path = Vec::new();
292 let mut current = span;
293 while let Some(parent) = current.parent() {
294 path.push(parent.nice_name().1.to_string());
295 current = parent;
296 }
297 path.reverse();
298 ServerToClientMessage::QueryResult {
299 id,
300 is_graph,
301 start: span_start,
302 end: span_end,
303 duration,
304 cpu,
305 allocations,
306 deallocations,
307 allocation_count,
308 persistent_allocations,
309 args,
310 path,
311 }
312 } else {
313 ServerToClientMessage::QueryResult {
314 id,
315 is_graph: false,
316 start: Timestamp::ZERO,
317 end: Timestamp::ZERO,
318 duration: Timestamp::ZERO,
319 cpu: Timestamp::ZERO,
320 allocations: 0,
321 deallocations: 0,
322 allocation_count: 0,
323 persistent_allocations: 0,
324 args: Vec::new(),
325 path: Vec::new(),
326 }
327 }
328 };
329 let message = serde_json::to_string(&message).unwrap();
330 websocket.send(Message::Text(message))?;
331 send_update(
332 &mut websocket,
333 &mut state,
334 true,
335 &mut ready_for_update,
336 &mut update_skipped,
337 )?;
338
339 continue;
340 }
341 ClientToServerMessage::Ack => {
342 ready_for_update = true;
343 if update_skipped {
344 update_skipped = false;
345 send_update(
346 &mut websocket,
347 &mut state,
348 true,
349 &mut ready_for_update,
350 &mut update_skipped,
351 )?;
352 }
353 }
354 }
355 }
356 Message::Binary(_) => {
357 }
359 Message::Close(_) => {
360 return Ok(());
361 }
362 Message::Ping(d) => {
363 websocket.send(Message::Pong(d))?;
364 }
365 Message::Pong(_) => {
366 }
368 }
369 }
370}