Skip to main content

turbo_trace_server/
server.rs

1use std::{
2    net::{SocketAddr, SocketAddrV4, TcpListener, TcpStream},
3    sync::{Arc, Mutex},
4    thread::spawn,
5};
6
7use anyhow::{Result, bail};
8use serde::{Deserialize, Serialize};
9use tungstenite::{Message, accept};
10
11use crate::{
12    store::SpanId,
13    store_container::StoreContainer,
14    timestamp::Timestamp,
15    u64_string,
16    viewer::{Update, ViewLineUpdate, ViewMode, Viewer},
17};
18
19#[derive(Serialize, Debug)]
20#[serde(tag = "type")]
21#[serde(rename_all = "kebab-case")]
22pub enum ServerToClientMessage {
23    ViewLine {
24        #[serde(flatten)]
25        update: ViewLineUpdate,
26    },
27    ViewLinesCount {
28        count: usize,
29        max: u64,
30    },
31    #[serde(rename_all = "camelCase")]
32    QueryResult {
33        #[serde(with = "u64_string")]
34        id: SpanId,
35        is_graph: bool,
36        start: Timestamp,
37        end: Timestamp,
38        duration: Timestamp,
39        cpu: Timestamp,
40        allocations: u64,
41        deallocations: u64,
42        allocation_count: u64,
43        persistent_allocations: u64,
44        args: Vec<(String, String)>,
45        path: Vec<String>,
46        memory_samples: Vec<u64>,
47    },
48}
49
50#[derive(Deserialize, Debug)]
51#[serde(tag = "type")]
52#[serde(rename_all = "kebab-case")]
53pub enum ClientToServerMessage {
54    #[serde(rename_all = "camelCase")]
55    ViewRect {
56        view_rect: ViewRect,
57    },
58    ViewMode {
59        #[serde(with = "u64_string")]
60        id: SpanId,
61        mode: String,
62        inherit: bool,
63    },
64    ResetViewMode {
65        #[serde(with = "u64_string")]
66        id: SpanId,
67    },
68    Query {
69        #[serde(with = "u64_string")]
70        id: SpanId,
71    },
72    Ack,
73    CheckForMoreData,
74}
75
76#[derive(Deserialize, Debug)]
77pub struct Filter {
78    pub op: Op,
79    pub value: u64,
80}
81
82#[derive(Deserialize, Debug)]
83#[serde(rename_all = "snake_case")]
84pub enum Op {
85    Gt,
86    Lt,
87}
88
89#[derive(Deserialize, Debug)]
90#[serde(rename_all = "camelCase")]
91pub struct ViewRect {
92    pub x: u64,
93    pub y: u64,
94    pub width: u64,
95    pub height: u64,
96    pub horizontal_pixels: u64,
97    pub query: String,
98    pub view_mode: String,
99    pub value_mode: String,
100    pub value_filter: Option<Filter>,
101    pub count_filter: Option<Filter>,
102}
103
104struct ConnectionState {
105    store: Arc<StoreContainer>,
106    viewer: Viewer,
107    view_rect: ViewRect,
108    last_update_generation: usize,
109}
110
111pub fn serve(store: Arc<StoreContainer>, port: u16) {
112    let server = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
113        std::net::Ipv4Addr::new(127, 0, 0, 1),
114        port,
115    )))
116    .unwrap();
117    for stream in server.incoming() {
118        let store = store.clone();
119
120        spawn(move || {
121            let websocket = accept(stream.unwrap()).unwrap();
122            if let Err(err) = handle_connection(websocket, store) {
123                eprintln!("Error: {err:?}");
124            }
125        });
126    }
127}
128
129fn handle_connection(
130    mut websocket: tungstenite::WebSocket<TcpStream>,
131    store: Arc<StoreContainer>,
132) -> Result<()> {
133    let state = Arc::new(Mutex::new(ConnectionState {
134        store,
135        viewer: Viewer::new(),
136        view_rect: ViewRect {
137            x: 0,
138            y: 0,
139            width: 1,
140            height: 1,
141            horizontal_pixels: 1,
142            query: String::new(),
143            view_mode: "aggregated".to_string(),
144            value_mode: "duration".to_string(),
145            count_filter: None,
146            value_filter: None,
147        },
148        last_update_generation: 0,
149    }));
150    let mut update_skipped = false;
151    let mut ready_for_update = true;
152
153    fn send_update(
154        websocket: &mut tungstenite::WebSocket<TcpStream>,
155        state: &mut ConnectionState,
156        force_send: bool,
157        ready_for_update: &mut bool,
158        update_skipped: &mut bool,
159    ) -> Result<()> {
160        if !*ready_for_update {
161            if force_send {
162                *update_skipped = true;
163            }
164            return Ok(());
165        }
166        let store = state.store.read();
167        if !force_send && state.last_update_generation == store.generation() {
168            return Ok(());
169        }
170        state.last_update_generation = store.generation();
171        let Update {
172            lines: updates,
173            max,
174        } = state.viewer.compute_update(&store, &state.view_rect);
175        let count = updates.len();
176        for update in updates {
177            let message = ServerToClientMessage::ViewLine { update };
178            let message = serde_json::to_string(&message).unwrap();
179            websocket.send(Message::Text(message))?;
180        }
181        let message = ServerToClientMessage::ViewLinesCount { count, max };
182        let message = serde_json::to_string(&message).unwrap();
183        websocket.send(Message::Text(message))?;
184        *ready_for_update = false;
185        Ok(())
186    }
187    loop {
188        match websocket.read()? {
189            Message::Frame(_frame) => {}
190            Message::Text(text) => {
191                let message: ClientToServerMessage = serde_json::from_str(&text)?;
192                let mut state = state.lock().unwrap();
193                match message {
194                    ClientToServerMessage::CheckForMoreData => {
195                        send_update(
196                            &mut websocket,
197                            &mut state,
198                            false,
199                            &mut ready_for_update,
200                            &mut update_skipped,
201                        )?;
202                    }
203                    ClientToServerMessage::ViewRect { view_rect } => {
204                        state.view_rect = view_rect;
205                        send_update(
206                            &mut websocket,
207                            &mut state,
208                            true,
209                            &mut ready_for_update,
210                            &mut update_skipped,
211                        )?;
212                    }
213                    ClientToServerMessage::ViewMode { id, mode, inherit } => {
214                        let (mode, sorted) = if let Some(mode) = mode.strip_suffix("-sorted") {
215                            (mode, true)
216                        } else {
217                            (mode.as_str(), false)
218                        };
219                        match mode {
220                            "raw-spans" => {
221                                state.viewer.set_view_mode(
222                                    id,
223                                    Some((ViewMode::RawSpans { sorted }, inherit)),
224                                );
225                            }
226                            "aggregated" => {
227                                state.viewer.set_view_mode(
228                                    id,
229                                    Some((ViewMode::Aggregated { sorted }, inherit)),
230                                );
231                            }
232                            "bottom-up" => {
233                                state.viewer.set_view_mode(
234                                    id,
235                                    Some((ViewMode::BottomUp { sorted }, inherit)),
236                                );
237                            }
238                            "aggregated-bottom-up" => {
239                                state.viewer.set_view_mode(
240                                    id,
241                                    Some((ViewMode::AggregatedBottomUp { sorted }, inherit)),
242                                );
243                            }
244                            _ => {
245                                bail!("unknown view mode: {}", mode)
246                            }
247                        }
248                        send_update(
249                            &mut websocket,
250                            &mut state,
251                            true,
252                            &mut ready_for_update,
253                            &mut update_skipped,
254                        )?;
255                    }
256                    ClientToServerMessage::ResetViewMode { id } => {
257                        state.viewer.set_view_mode(id, None);
258                        send_update(
259                            &mut websocket,
260                            &mut state,
261                            true,
262                            &mut ready_for_update,
263                            &mut update_skipped,
264                        )?;
265                    }
266                    ClientToServerMessage::Query { id } => {
267                        let message = {
268                            let store = state.store.read();
269                            if let Some((span, is_graph)) = store.span(id) {
270                                let root_start = store.root_span().start();
271                                let span_start = span.start() - root_start;
272                                let span_end = span.end() - root_start;
273                                let duration = span.corrected_total_time();
274                                let cpu = span.total_time();
275                                let allocations = span.total_allocations();
276                                let deallocations = span.total_deallocations();
277                                let allocation_count = span.total_allocation_count();
278                                let persistent_allocations = span.total_persistent_allocations();
279                                let args = span
280                                    .args()
281                                    .map(|(k, v)| (k.to_string(), v.to_string()))
282                                    .collect();
283                                let mut path = Vec::new();
284                                let mut current = span;
285                                while let Some(parent) = current.parent() {
286                                    path.push(parent.nice_name().1.to_string());
287                                    current = parent;
288                                }
289                                path.reverse();
290                                let memory_samples =
291                                    store.memory_samples_for_range(span.start(), span.end());
292                                ServerToClientMessage::QueryResult {
293                                    id,
294                                    is_graph,
295                                    start: span_start,
296                                    end: span_end,
297                                    duration,
298                                    cpu,
299                                    allocations,
300                                    deallocations,
301                                    allocation_count,
302                                    persistent_allocations,
303                                    args,
304                                    path,
305                                    memory_samples,
306                                }
307                            } else {
308                                ServerToClientMessage::QueryResult {
309                                    id,
310                                    is_graph: false,
311                                    start: Timestamp::ZERO,
312                                    end: Timestamp::ZERO,
313                                    duration: Timestamp::ZERO,
314                                    cpu: Timestamp::ZERO,
315                                    allocations: 0,
316                                    deallocations: 0,
317                                    allocation_count: 0,
318                                    persistent_allocations: 0,
319                                    args: Vec::new(),
320                                    path: Vec::new(),
321                                    memory_samples: Vec::new(),
322                                }
323                            }
324                        };
325                        let message = serde_json::to_string(&message).unwrap();
326                        websocket.send(Message::Text(message))?;
327                        send_update(
328                            &mut websocket,
329                            &mut state,
330                            true,
331                            &mut ready_for_update,
332                            &mut update_skipped,
333                        )?;
334
335                        continue;
336                    }
337                    ClientToServerMessage::Ack => {
338                        ready_for_update = true;
339                        if update_skipped {
340                            update_skipped = false;
341                            send_update(
342                                &mut websocket,
343                                &mut state,
344                                true,
345                                &mut ready_for_update,
346                                &mut update_skipped,
347                            )?;
348                        }
349                    }
350                }
351            }
352            Message::Binary(_) => {
353                // This doesn't happen
354            }
355            Message::Close(_) => {
356                return Ok(());
357            }
358            Message::Ping(d) => {
359                websocket.send(Message::Pong(d))?;
360            }
361            Message::Pong(_) => {
362                // thanks for the fish
363            }
364        }
365    }
366}