1use std::{
2 net::{SocketAddr, SocketAddrV4, TcpListener, TcpStream},
3 sync::{Arc, Mutex},
4 thread::spawn,
5};
6
7use anyhow::{Result, bail};
8use serde::{Deserialize, Serialize};
9use tungstenite::{Message, accept};
10
11use crate::{
12 store::SpanId,
13 store_container::StoreContainer,
14 timestamp::Timestamp,
15 u64_string,
16 viewer::{Update, ViewLineUpdate, ViewMode, Viewer},
17};
18
19#[derive(Serialize, Deserialize, Debug)]
20#[serde(tag = "type")]
21#[serde(rename_all = "kebab-case")]
22pub enum ServerToClientMessage {
23 ViewLine {
24 #[serde(flatten)]
25 update: ViewLineUpdate,
26 },
27 ViewLinesCount {
28 count: usize,
29 max: u64,
30 },
31 #[serde(rename_all = "camelCase")]
32 QueryResult {
33 #[serde(with = "u64_string")]
34 id: SpanId,
35 is_graph: bool,
36 start: Timestamp,
37 end: Timestamp,
38 duration: Timestamp,
39 cpu: Timestamp,
40 allocations: u64,
41 deallocations: u64,
42 allocation_count: u64,
43 persistent_allocations: u64,
44 args: Vec<(String, String)>,
45 path: Vec<String>,
46 },
47}
48
49#[derive(Serialize, Deserialize, Debug)]
50#[serde(tag = "type")]
51#[serde(rename_all = "kebab-case")]
52pub enum ClientToServerMessage {
53 #[serde(rename_all = "camelCase")]
54 ViewRect {
55 view_rect: ViewRect,
56 },
57 ViewMode {
58 #[serde(with = "u64_string")]
59 id: SpanId,
60 mode: String,
61 inherit: bool,
62 },
63 ResetViewMode {
64 #[serde(with = "u64_string")]
65 id: SpanId,
66 },
67 Query {
68 #[serde(with = "u64_string")]
69 id: SpanId,
70 },
71 Ack,
72 CheckForMoreData,
73}
74
75#[derive(Serialize, Deserialize, Debug)]
76pub struct Filter {
77 pub op: Op,
78 pub value: u64,
79}
80
81#[derive(Serialize, Deserialize, Debug)]
82#[serde(rename_all = "snake_case")]
83pub enum Op {
84 Gt,
85 Lt,
86}
87
88#[derive(Serialize, Deserialize, Debug)]
89#[serde(rename_all = "camelCase")]
90pub struct ViewRect {
91 pub x: u64,
92 pub y: u64,
93 pub width: u64,
94 pub height: u64,
95 pub horizontal_pixels: u64,
96 pub query: String,
97 pub view_mode: String,
98 pub value_mode: String,
99 pub value_filter: Option<Filter>,
100 pub count_filter: Option<Filter>,
101}
102
103struct ConnectionState {
104 store: Arc<StoreContainer>,
105 viewer: Viewer,
106 view_rect: ViewRect,
107 last_update_generation: usize,
108}
109
110pub fn serve(store: Arc<StoreContainer>, port: u16) {
111 let server = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
112 std::net::Ipv4Addr::new(127, 0, 0, 1),
113 port,
114 )))
115 .unwrap();
116 for stream in server.incoming() {
117 let store = store.clone();
118
119 spawn(move || {
120 let websocket = accept(stream.unwrap()).unwrap();
121 if let Err(err) = handle_connection(websocket, store) {
122 eprintln!("Error: {err:?}");
123 }
124 });
125 }
126}
127
128fn handle_connection(
129 mut websocket: tungstenite::WebSocket<TcpStream>,
130 store: Arc<StoreContainer>,
131) -> Result<()> {
132 let state = Arc::new(Mutex::new(ConnectionState {
133 store,
134 viewer: Viewer::new(),
135 view_rect: ViewRect {
136 x: 0,
137 y: 0,
138 width: 1,
139 height: 1,
140 horizontal_pixels: 1,
141 query: String::new(),
142 view_mode: "aggregated".to_string(),
143 value_mode: "duration".to_string(),
144 count_filter: None,
145 value_filter: None,
146 },
147 last_update_generation: 0,
148 }));
149 let mut update_skipped = false;
150 let mut ready_for_update = true;
151
152 fn send_update(
153 websocket: &mut tungstenite::WebSocket<TcpStream>,
154 state: &mut ConnectionState,
155 force_send: bool,
156 ready_for_update: &mut bool,
157 update_skipped: &mut bool,
158 ) -> Result<()> {
159 if !*ready_for_update {
160 if force_send {
161 *update_skipped = true;
162 }
163 return Ok(());
164 }
165 let store = state.store.read();
166 if !force_send && state.last_update_generation == store.generation() {
167 return Ok(());
168 }
169 state.last_update_generation = store.generation();
170 let Update {
171 lines: updates,
172 max,
173 } = state.viewer.compute_update(&store, &state.view_rect);
174 let count = updates.len();
175 for update in updates {
176 let message = ServerToClientMessage::ViewLine { update };
177 let message = serde_json::to_string(&message).unwrap();
178 websocket.send(Message::Text(message))?;
179 }
180 let message = ServerToClientMessage::ViewLinesCount { count, max };
181 let message = serde_json::to_string(&message).unwrap();
182 websocket.send(Message::Text(message))?;
183 *ready_for_update = false;
184 Ok(())
185 }
186 loop {
187 match websocket.read()? {
188 Message::Frame(_frame) => {}
189 Message::Text(text) => {
190 let message: ClientToServerMessage = serde_json::from_str(&text)?;
191 let mut state = state.lock().unwrap();
192 match message {
193 ClientToServerMessage::CheckForMoreData => {
194 send_update(
195 &mut websocket,
196 &mut state,
197 false,
198 &mut ready_for_update,
199 &mut update_skipped,
200 )?;
201 }
202 ClientToServerMessage::ViewRect { view_rect } => {
203 state.view_rect = view_rect;
204 send_update(
205 &mut websocket,
206 &mut state,
207 true,
208 &mut ready_for_update,
209 &mut update_skipped,
210 )?;
211 }
212 ClientToServerMessage::ViewMode { id, mode, inherit } => {
213 let (mode, sorted) = if let Some(mode) = mode.strip_suffix("-sorted") {
214 (mode, true)
215 } else {
216 (mode.as_str(), false)
217 };
218 match mode {
219 "raw-spans" => {
220 state.viewer.set_view_mode(
221 id,
222 Some((ViewMode::RawSpans { sorted }, inherit)),
223 );
224 }
225 "aggregated" => {
226 state.viewer.set_view_mode(
227 id,
228 Some((ViewMode::Aggregated { sorted }, inherit)),
229 );
230 }
231 "bottom-up" => {
232 state.viewer.set_view_mode(
233 id,
234 Some((ViewMode::BottomUp { sorted }, inherit)),
235 );
236 }
237 "aggregated-bottom-up" => {
238 state.viewer.set_view_mode(
239 id,
240 Some((ViewMode::AggregatedBottomUp { sorted }, inherit)),
241 );
242 }
243 _ => {
244 bail!("unknown view mode: {}", mode)
245 }
246 }
247 send_update(
248 &mut websocket,
249 &mut state,
250 true,
251 &mut ready_for_update,
252 &mut update_skipped,
253 )?;
254 }
255 ClientToServerMessage::ResetViewMode { id } => {
256 state.viewer.set_view_mode(id, None);
257 send_update(
258 &mut websocket,
259 &mut state,
260 true,
261 &mut ready_for_update,
262 &mut update_skipped,
263 )?;
264 }
265 ClientToServerMessage::Query { id } => {
266 let message = {
267 let store = state.store.read();
268 if let Some((span, is_graph)) = store.span(id) {
269 let root_start = store.root_span().start();
270 let span_start = span.start() - root_start;
271 let span_end = span.end() - root_start;
272 let duration = span.corrected_total_time();
273 let cpu = span.total_time();
274 let allocations = span.total_allocations();
275 let deallocations = span.total_deallocations();
276 let allocation_count = span.total_allocation_count();
277 let persistent_allocations = span.total_persistent_allocations();
278 let args = span
279 .args()
280 .map(|(k, v)| (k.to_string(), v.to_string()))
281 .collect();
282 let mut path = Vec::new();
283 let mut current = span;
284 while let Some(parent) = current.parent() {
285 path.push(parent.nice_name().1.to_string());
286 current = parent;
287 }
288 path.reverse();
289 ServerToClientMessage::QueryResult {
290 id,
291 is_graph,
292 start: span_start,
293 end: span_end,
294 duration,
295 cpu,
296 allocations,
297 deallocations,
298 allocation_count,
299 persistent_allocations,
300 args,
301 path,
302 }
303 } else {
304 ServerToClientMessage::QueryResult {
305 id,
306 is_graph: false,
307 start: Timestamp::ZERO,
308 end: Timestamp::ZERO,
309 duration: Timestamp::ZERO,
310 cpu: Timestamp::ZERO,
311 allocations: 0,
312 deallocations: 0,
313 allocation_count: 0,
314 persistent_allocations: 0,
315 args: Vec::new(),
316 path: Vec::new(),
317 }
318 }
319 };
320 let message = serde_json::to_string(&message).unwrap();
321 websocket.send(Message::Text(message))?;
322 send_update(
323 &mut websocket,
324 &mut state,
325 true,
326 &mut ready_for_update,
327 &mut update_skipped,
328 )?;
329
330 continue;
331 }
332 ClientToServerMessage::Ack => {
333 ready_for_update = true;
334 if update_skipped {
335 update_skipped = false;
336 send_update(
337 &mut websocket,
338 &mut state,
339 true,
340 &mut ready_for_update,
341 &mut update_skipped,
342 )?;
343 }
344 }
345 }
346 }
347 Message::Binary(_) => {
348 }
350 Message::Close(_) => {
351 return Ok(());
352 }
353 Message::Ping(d) => {
354 websocket.send(Message::Pong(d))?;
355 }
356 Message::Pong(_) => {
357 }
359 }
360 }
361}