turbo_trace_server/
server.rs

1use std::{
2    net::{SocketAddr, SocketAddrV4, TcpListener, TcpStream},
3    sync::{Arc, Mutex},
4    thread::spawn,
5};
6
7use anyhow::{Result, bail};
8use serde::{Deserialize, Serialize};
9use tungstenite::{Message, accept};
10
11use crate::{
12    store::SpanId,
13    store_container::StoreContainer,
14    timestamp::Timestamp,
15    u64_string,
16    viewer::{Update, ViewLineUpdate, ViewMode, Viewer},
17};
18
19#[derive(Serialize, Deserialize, Debug)]
20#[serde(tag = "type")]
21#[serde(rename_all = "kebab-case")]
22pub enum ServerToClientMessage {
23    ViewLine {
24        #[serde(flatten)]
25        update: ViewLineUpdate,
26    },
27    ViewLinesCount {
28        count: usize,
29        max: u64,
30    },
31    #[serde(rename_all = "camelCase")]
32    QueryResult {
33        #[serde(with = "u64_string")]
34        id: SpanId,
35        is_graph: bool,
36        start: Timestamp,
37        end: Timestamp,
38        duration: Timestamp,
39        cpu: Timestamp,
40        allocations: u64,
41        deallocations: u64,
42        allocation_count: u64,
43        persistent_allocations: u64,
44        args: Vec<(String, String)>,
45        path: Vec<String>,
46    },
47}
48
49#[derive(Serialize, Deserialize, Debug)]
50#[serde(tag = "type")]
51#[serde(rename_all = "kebab-case")]
52pub enum ClientToServerMessage {
53    #[serde(rename_all = "camelCase")]
54    ViewRect {
55        view_rect: ViewRect,
56    },
57    ViewMode {
58        #[serde(with = "u64_string")]
59        id: SpanId,
60        mode: String,
61        inherit: bool,
62    },
63    ResetViewMode {
64        #[serde(with = "u64_string")]
65        id: SpanId,
66    },
67    Query {
68        #[serde(with = "u64_string")]
69        id: SpanId,
70    },
71    Ack,
72    CheckForMoreData,
73}
74
75#[derive(Serialize, Deserialize, Debug)]
76pub struct Filter {
77    pub op: Op,
78    pub value: u64,
79}
80
81#[derive(Serialize, Deserialize, Debug)]
82#[serde(rename_all = "snake_case")]
83pub enum Op {
84    Gt,
85    Lt,
86}
87
88#[derive(Serialize, Deserialize, Debug)]
89#[serde(rename_all = "camelCase")]
90pub struct ViewRect {
91    pub x: u64,
92    pub y: u64,
93    pub width: u64,
94    pub height: u64,
95    pub horizontal_pixels: u64,
96    pub query: String,
97    pub view_mode: String,
98    pub value_mode: String,
99    pub value_filter: Option<Filter>,
100    pub count_filter: Option<Filter>,
101}
102
103struct ConnectionState {
104    store: Arc<StoreContainer>,
105    viewer: Viewer,
106    view_rect: ViewRect,
107    last_update_generation: usize,
108}
109
110pub fn serve(store: Arc<StoreContainer>, port: u16) {
111    let server = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
112        std::net::Ipv4Addr::new(127, 0, 0, 1),
113        port,
114    )))
115    .unwrap();
116    for stream in server.incoming() {
117        let store = store.clone();
118
119        spawn(move || {
120            let websocket = accept(stream.unwrap()).unwrap();
121            if let Err(err) = handle_connection(websocket, store) {
122                eprintln!("Error: {err:?}");
123            }
124        });
125    }
126}
127
128fn handle_connection(
129    mut websocket: tungstenite::WebSocket<TcpStream>,
130    store: Arc<StoreContainer>,
131) -> Result<()> {
132    let state = Arc::new(Mutex::new(ConnectionState {
133        store,
134        viewer: Viewer::new(),
135        view_rect: ViewRect {
136            x: 0,
137            y: 0,
138            width: 1,
139            height: 1,
140            horizontal_pixels: 1,
141            query: String::new(),
142            view_mode: "aggregated".to_string(),
143            value_mode: "duration".to_string(),
144            count_filter: None,
145            value_filter: None,
146        },
147        last_update_generation: 0,
148    }));
149    let mut update_skipped = false;
150    let mut ready_for_update = true;
151
152    fn send_update(
153        websocket: &mut tungstenite::WebSocket<TcpStream>,
154        state: &mut ConnectionState,
155        force_send: bool,
156        ready_for_update: &mut bool,
157        update_skipped: &mut bool,
158    ) -> Result<()> {
159        if !*ready_for_update {
160            if force_send {
161                *update_skipped = true;
162            }
163            return Ok(());
164        }
165        let store = state.store.read();
166        if !force_send && state.last_update_generation == store.generation() {
167            return Ok(());
168        }
169        state.last_update_generation = store.generation();
170        let Update {
171            lines: updates,
172            max,
173        } = state.viewer.compute_update(&store, &state.view_rect);
174        let count = updates.len();
175        for update in updates {
176            let message = ServerToClientMessage::ViewLine { update };
177            let message = serde_json::to_string(&message).unwrap();
178            websocket.send(Message::Text(message))?;
179        }
180        let message = ServerToClientMessage::ViewLinesCount { count, max };
181        let message = serde_json::to_string(&message).unwrap();
182        websocket.send(Message::Text(message))?;
183        *ready_for_update = false;
184        Ok(())
185    }
186    loop {
187        match websocket.read()? {
188            Message::Frame(_frame) => {}
189            Message::Text(text) => {
190                let message: ClientToServerMessage = serde_json::from_str(&text)?;
191                let mut state = state.lock().unwrap();
192                match message {
193                    ClientToServerMessage::CheckForMoreData => {
194                        send_update(
195                            &mut websocket,
196                            &mut state,
197                            false,
198                            &mut ready_for_update,
199                            &mut update_skipped,
200                        )?;
201                    }
202                    ClientToServerMessage::ViewRect { view_rect } => {
203                        state.view_rect = view_rect;
204                        send_update(
205                            &mut websocket,
206                            &mut state,
207                            true,
208                            &mut ready_for_update,
209                            &mut update_skipped,
210                        )?;
211                    }
212                    ClientToServerMessage::ViewMode { id, mode, inherit } => {
213                        let (mode, sorted) = if let Some(mode) = mode.strip_suffix("-sorted") {
214                            (mode, true)
215                        } else {
216                            (mode.as_str(), false)
217                        };
218                        match mode {
219                            "raw-spans" => {
220                                state.viewer.set_view_mode(
221                                    id,
222                                    Some((ViewMode::RawSpans { sorted }, inherit)),
223                                );
224                            }
225                            "aggregated" => {
226                                state.viewer.set_view_mode(
227                                    id,
228                                    Some((ViewMode::Aggregated { sorted }, inherit)),
229                                );
230                            }
231                            "bottom-up" => {
232                                state.viewer.set_view_mode(
233                                    id,
234                                    Some((ViewMode::BottomUp { sorted }, inherit)),
235                                );
236                            }
237                            "aggregated-bottom-up" => {
238                                state.viewer.set_view_mode(
239                                    id,
240                                    Some((ViewMode::AggregatedBottomUp { sorted }, inherit)),
241                                );
242                            }
243                            _ => {
244                                bail!("unknown view mode: {}", mode)
245                            }
246                        }
247                        send_update(
248                            &mut websocket,
249                            &mut state,
250                            true,
251                            &mut ready_for_update,
252                            &mut update_skipped,
253                        )?;
254                    }
255                    ClientToServerMessage::ResetViewMode { id } => {
256                        state.viewer.set_view_mode(id, None);
257                        send_update(
258                            &mut websocket,
259                            &mut state,
260                            true,
261                            &mut ready_for_update,
262                            &mut update_skipped,
263                        )?;
264                    }
265                    ClientToServerMessage::Query { id } => {
266                        let message = {
267                            let store = state.store.read();
268                            if let Some((span, is_graph)) = store.span(id) {
269                                let root_start = store.root_span().start();
270                                let span_start = span.start() - root_start;
271                                let span_end = span.end() - root_start;
272                                let duration = span.corrected_total_time();
273                                let cpu = span.total_time();
274                                let allocations = span.total_allocations();
275                                let deallocations = span.total_deallocations();
276                                let allocation_count = span.total_allocation_count();
277                                let persistent_allocations = span.total_persistent_allocations();
278                                let args = span
279                                    .args()
280                                    .map(|(k, v)| (k.to_string(), v.to_string()))
281                                    .collect();
282                                let mut path = Vec::new();
283                                let mut current = span;
284                                while let Some(parent) = current.parent() {
285                                    path.push(parent.nice_name().1.to_string());
286                                    current = parent;
287                                }
288                                path.reverse();
289                                ServerToClientMessage::QueryResult {
290                                    id,
291                                    is_graph,
292                                    start: span_start,
293                                    end: span_end,
294                                    duration,
295                                    cpu,
296                                    allocations,
297                                    deallocations,
298                                    allocation_count,
299                                    persistent_allocations,
300                                    args,
301                                    path,
302                                }
303                            } else {
304                                ServerToClientMessage::QueryResult {
305                                    id,
306                                    is_graph: false,
307                                    start: Timestamp::ZERO,
308                                    end: Timestamp::ZERO,
309                                    duration: Timestamp::ZERO,
310                                    cpu: Timestamp::ZERO,
311                                    allocations: 0,
312                                    deallocations: 0,
313                                    allocation_count: 0,
314                                    persistent_allocations: 0,
315                                    args: Vec::new(),
316                                    path: Vec::new(),
317                                }
318                            }
319                        };
320                        let message = serde_json::to_string(&message).unwrap();
321                        websocket.send(Message::Text(message))?;
322                        send_update(
323                            &mut websocket,
324                            &mut state,
325                            true,
326                            &mut ready_for_update,
327                            &mut update_skipped,
328                        )?;
329
330                        continue;
331                    }
332                    ClientToServerMessage::Ack => {
333                        ready_for_update = true;
334                        if update_skipped {
335                            update_skipped = false;
336                            send_update(
337                                &mut websocket,
338                                &mut state,
339                                true,
340                                &mut ready_for_update,
341                                &mut update_skipped,
342                            )?;
343                        }
344                    }
345                }
346            }
347            Message::Binary(_) => {
348                // This doesn't happen
349            }
350            Message::Close(_) => {
351                return Ok(());
352            }
353            Message::Ping(d) => {
354                websocket.send(Message::Pong(d))?;
355            }
356            Message::Pong(_) => {
357                // thanks for the fish
358            }
359        }
360    }
361}