turbo_trace_server/
server.rs

1use std::{
2    net::{SocketAddr, SocketAddrV4, TcpListener, TcpStream},
3    sync::{Arc, Mutex},
4    thread::spawn,
5};
6
7use anyhow::{Result, bail};
8use serde::{Deserialize, Serialize};
9use tungstenite::{Message, accept};
10
11use crate::{
12    store::SpanId,
13    store_container::StoreContainer,
14    timestamp::Timestamp,
15    u64_string,
16    viewer::{Update, ViewLineUpdate, ViewMode, Viewer},
17};
18
19#[derive(Serialize, Deserialize, Debug)]
20#[serde(tag = "type")]
21#[serde(rename_all = "kebab-case")]
22pub enum ServerToClientMessage {
23    ViewLine {
24        #[serde(flatten)]
25        update: ViewLineUpdate,
26    },
27    ViewLinesCount {
28        count: usize,
29        max: u64,
30    },
31    #[serde(rename_all = "camelCase")]
32    QueryResult {
33        #[serde(with = "u64_string")]
34        id: SpanId,
35        is_graph: bool,
36        start: Timestamp,
37        end: Timestamp,
38        duration: Timestamp,
39        cpu: Timestamp,
40        allocations: u64,
41        deallocations: u64,
42        allocation_count: u64,
43        persistent_allocations: u64,
44        args: Vec<(String, String)>,
45        path: Vec<String>,
46    },
47}
48
49#[derive(Serialize, Deserialize, Debug)]
50#[serde(tag = "type")]
51#[serde(rename_all = "kebab-case")]
52pub enum ClientToServerMessage {
53    #[serde(rename_all = "camelCase")]
54    ViewRect {
55        view_rect: ViewRect,
56    },
57    ViewMode {
58        #[serde(with = "u64_string")]
59        id: SpanId,
60        mode: String,
61        inherit: bool,
62    },
63    ResetViewMode {
64        #[serde(with = "u64_string")]
65        id: SpanId,
66    },
67    Query {
68        #[serde(with = "u64_string")]
69        id: SpanId,
70    },
71    Ack,
72    CheckForMoreData,
73}
74
75#[derive(Serialize, Deserialize, Debug)]
76#[serde(rename_all = "camelCase")]
77pub struct SpanViewEvent {
78    pub start: Timestamp,
79    pub duration: Timestamp,
80    pub name: String,
81    pub id: Option<SpanId>,
82}
83
84#[derive(Serialize, Deserialize, Debug)]
85pub struct Filter {
86    pub op: Op,
87    pub value: u64,
88}
89
90#[derive(Serialize, Deserialize, Debug)]
91#[serde(rename_all = "snake_case")]
92pub enum Op {
93    Gt,
94    Lt,
95}
96
97#[derive(Serialize, Deserialize, Debug)]
98#[serde(rename_all = "camelCase")]
99pub struct ViewRect {
100    pub x: u64,
101    pub y: u64,
102    pub width: u64,
103    pub height: u64,
104    pub horizontal_pixels: u64,
105    pub query: String,
106    pub view_mode: String,
107    pub value_mode: String,
108    pub value_filter: Option<Filter>,
109    pub count_filter: Option<Filter>,
110}
111
112struct ConnectionState {
113    store: Arc<StoreContainer>,
114    viewer: Viewer,
115    view_rect: ViewRect,
116    last_update_generation: usize,
117}
118
119pub fn serve(store: Arc<StoreContainer>, port: u16) {
120    let server = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
121        std::net::Ipv4Addr::new(127, 0, 0, 1),
122        port,
123    )))
124    .unwrap();
125    for stream in server.incoming() {
126        let store = store.clone();
127
128        spawn(move || {
129            let websocket = accept(stream.unwrap()).unwrap();
130            if let Err(err) = handle_connection(websocket, store) {
131                eprintln!("Error: {err:?}");
132            }
133        });
134    }
135}
136
137fn handle_connection(
138    mut websocket: tungstenite::WebSocket<TcpStream>,
139    store: Arc<StoreContainer>,
140) -> Result<()> {
141    let state = Arc::new(Mutex::new(ConnectionState {
142        store,
143        viewer: Viewer::new(),
144        view_rect: ViewRect {
145            x: 0,
146            y: 0,
147            width: 1,
148            height: 1,
149            horizontal_pixels: 1,
150            query: String::new(),
151            view_mode: "aggregated".to_string(),
152            value_mode: "duration".to_string(),
153            count_filter: None,
154            value_filter: None,
155        },
156        last_update_generation: 0,
157    }));
158    let mut update_skipped = false;
159    let mut ready_for_update = true;
160
161    fn send_update(
162        websocket: &mut tungstenite::WebSocket<TcpStream>,
163        state: &mut ConnectionState,
164        force_send: bool,
165        ready_for_update: &mut bool,
166        update_skipped: &mut bool,
167    ) -> Result<()> {
168        if !*ready_for_update {
169            if force_send {
170                *update_skipped = true;
171            }
172            return Ok(());
173        }
174        let store = state.store.read();
175        if !force_send && state.last_update_generation == store.generation() {
176            return Ok(());
177        }
178        state.last_update_generation = store.generation();
179        let Update {
180            lines: updates,
181            max,
182        } = state.viewer.compute_update(&store, &state.view_rect);
183        let count = updates.len();
184        for update in updates {
185            let message = ServerToClientMessage::ViewLine { update };
186            let message = serde_json::to_string(&message).unwrap();
187            websocket.send(Message::Text(message))?;
188        }
189        let message = ServerToClientMessage::ViewLinesCount { count, max };
190        let message = serde_json::to_string(&message).unwrap();
191        websocket.send(Message::Text(message))?;
192        *ready_for_update = false;
193        Ok(())
194    }
195    loop {
196        match websocket.read()? {
197            Message::Frame(_frame) => {}
198            Message::Text(text) => {
199                let message: ClientToServerMessage = serde_json::from_str(&text)?;
200                let mut state = state.lock().unwrap();
201                match message {
202                    ClientToServerMessage::CheckForMoreData => {
203                        send_update(
204                            &mut websocket,
205                            &mut state,
206                            false,
207                            &mut ready_for_update,
208                            &mut update_skipped,
209                        )?;
210                    }
211                    ClientToServerMessage::ViewRect { view_rect } => {
212                        state.view_rect = view_rect;
213                        send_update(
214                            &mut websocket,
215                            &mut state,
216                            true,
217                            &mut ready_for_update,
218                            &mut update_skipped,
219                        )?;
220                    }
221                    ClientToServerMessage::ViewMode { id, mode, inherit } => {
222                        let (mode, sorted) = if let Some(mode) = mode.strip_suffix("-sorted") {
223                            (mode, true)
224                        } else {
225                            (mode.as_str(), false)
226                        };
227                        match mode {
228                            "raw-spans" => {
229                                state.viewer.set_view_mode(
230                                    id,
231                                    Some((ViewMode::RawSpans { sorted }, inherit)),
232                                );
233                            }
234                            "aggregated" => {
235                                state.viewer.set_view_mode(
236                                    id,
237                                    Some((ViewMode::Aggregated { sorted }, inherit)),
238                                );
239                            }
240                            "bottom-up" => {
241                                state.viewer.set_view_mode(
242                                    id,
243                                    Some((ViewMode::BottomUp { sorted }, inherit)),
244                                );
245                            }
246                            "aggregated-bottom-up" => {
247                                state.viewer.set_view_mode(
248                                    id,
249                                    Some((ViewMode::AggregatedBottomUp { sorted }, inherit)),
250                                );
251                            }
252                            _ => {
253                                bail!("unknown view mode: {}", mode)
254                            }
255                        }
256                        send_update(
257                            &mut websocket,
258                            &mut state,
259                            true,
260                            &mut ready_for_update,
261                            &mut update_skipped,
262                        )?;
263                    }
264                    ClientToServerMessage::ResetViewMode { id } => {
265                        state.viewer.set_view_mode(id, None);
266                        send_update(
267                            &mut websocket,
268                            &mut state,
269                            true,
270                            &mut ready_for_update,
271                            &mut update_skipped,
272                        )?;
273                    }
274                    ClientToServerMessage::Query { id } => {
275                        let message = {
276                            let store = state.store.read();
277                            if let Some((span, is_graph)) = store.span(id) {
278                                let root_start = store.root_span().start();
279                                let span_start = span.start() - root_start;
280                                let span_end = span.end() - root_start;
281                                let duration = span.corrected_total_time();
282                                let cpu = span.total_time();
283                                let allocations = span.total_allocations();
284                                let deallocations = span.total_deallocations();
285                                let allocation_count = span.total_allocation_count();
286                                let persistent_allocations = span.total_persistent_allocations();
287                                let args = span
288                                    .args()
289                                    .map(|(k, v)| (k.to_string(), v.to_string()))
290                                    .collect();
291                                let mut path = Vec::new();
292                                let mut current = span;
293                                while let Some(parent) = current.parent() {
294                                    path.push(parent.nice_name().1.to_string());
295                                    current = parent;
296                                }
297                                path.reverse();
298                                ServerToClientMessage::QueryResult {
299                                    id,
300                                    is_graph,
301                                    start: span_start,
302                                    end: span_end,
303                                    duration,
304                                    cpu,
305                                    allocations,
306                                    deallocations,
307                                    allocation_count,
308                                    persistent_allocations,
309                                    args,
310                                    path,
311                                }
312                            } else {
313                                ServerToClientMessage::QueryResult {
314                                    id,
315                                    is_graph: false,
316                                    start: Timestamp::ZERO,
317                                    end: Timestamp::ZERO,
318                                    duration: Timestamp::ZERO,
319                                    cpu: Timestamp::ZERO,
320                                    allocations: 0,
321                                    deallocations: 0,
322                                    allocation_count: 0,
323                                    persistent_allocations: 0,
324                                    args: Vec::new(),
325                                    path: Vec::new(),
326                                }
327                            }
328                        };
329                        let message = serde_json::to_string(&message).unwrap();
330                        websocket.send(Message::Text(message))?;
331                        send_update(
332                            &mut websocket,
333                            &mut state,
334                            true,
335                            &mut ready_for_update,
336                            &mut update_skipped,
337                        )?;
338
339                        continue;
340                    }
341                    ClientToServerMessage::Ack => {
342                        ready_for_update = true;
343                        if update_skipped {
344                            update_skipped = false;
345                            send_update(
346                                &mut websocket,
347                                &mut state,
348                                true,
349                                &mut ready_for_update,
350                                &mut update_skipped,
351                            )?;
352                        }
353                    }
354                }
355            }
356            Message::Binary(_) => {
357                // This doesn't happen
358            }
359            Message::Close(_) => {
360                return Ok(());
361            }
362            Message::Ping(d) => {
363                websocket.send(Message::Pong(d))?;
364            }
365            Message::Pong(_) => {
366                // thanks for the fish
367            }
368        }
369    }
370}