1use std::{
2 net::{SocketAddr, SocketAddrV4, TcpListener, TcpStream},
3 sync::{Arc, Mutex},
4 thread::spawn,
5};
6
7use anyhow::{Result, bail};
8use serde::{Deserialize, Serialize};
9use tungstenite::{Message, accept};
10
11use crate::{
12 store::SpanId,
13 store_container::StoreContainer,
14 timestamp::Timestamp,
15 u64_string,
16 viewer::{Update, ViewLineUpdate, ViewMode, Viewer},
17};
18
19#[derive(Serialize, Debug)]
20#[serde(tag = "type")]
21#[serde(rename_all = "kebab-case")]
22pub enum ServerToClientMessage {
23 ViewLine {
24 #[serde(flatten)]
25 update: ViewLineUpdate,
26 },
27 ViewLinesCount {
28 count: usize,
29 max: u64,
30 },
31 #[serde(rename_all = "camelCase")]
32 QueryResult {
33 #[serde(with = "u64_string")]
34 id: SpanId,
35 is_graph: bool,
36 start: Timestamp,
37 end: Timestamp,
38 duration: Timestamp,
39 cpu: Timestamp,
40 allocations: u64,
41 deallocations: u64,
42 allocation_count: u64,
43 persistent_allocations: u64,
44 args: Vec<(String, String)>,
45 path: Vec<String>,
46 memory_samples: Vec<u64>,
47 },
48}
49
50#[derive(Deserialize, Debug)]
51#[serde(tag = "type")]
52#[serde(rename_all = "kebab-case")]
53pub enum ClientToServerMessage {
54 #[serde(rename_all = "camelCase")]
55 ViewRect {
56 view_rect: ViewRect,
57 },
58 ViewMode {
59 #[serde(with = "u64_string")]
60 id: SpanId,
61 mode: String,
62 inherit: bool,
63 },
64 ResetViewMode {
65 #[serde(with = "u64_string")]
66 id: SpanId,
67 },
68 Query {
69 #[serde(with = "u64_string")]
70 id: SpanId,
71 },
72 Ack,
73 CheckForMoreData,
74}
75
76#[derive(Deserialize, Debug)]
77pub struct Filter {
78 pub op: Op,
79 pub value: u64,
80}
81
82#[derive(Deserialize, Debug)]
83#[serde(rename_all = "snake_case")]
84pub enum Op {
85 Gt,
86 Lt,
87}
88
89#[derive(Deserialize, Debug)]
90#[serde(rename_all = "camelCase")]
91pub struct ViewRect {
92 pub x: u64,
93 pub y: u64,
94 pub width: u64,
95 pub height: u64,
96 pub horizontal_pixels: u64,
97 pub query: String,
98 pub view_mode: String,
99 pub value_mode: String,
100 pub value_filter: Option<Filter>,
101 pub count_filter: Option<Filter>,
102}
103
104struct ConnectionState {
105 store: Arc<StoreContainer>,
106 viewer: Viewer,
107 view_rect: ViewRect,
108 last_update_generation: usize,
109}
110
111pub fn serve(store: Arc<StoreContainer>, port: u16) {
112 let server = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
113 std::net::Ipv4Addr::new(127, 0, 0, 1),
114 port,
115 )))
116 .unwrap();
117 for stream in server.incoming() {
118 let store = store.clone();
119
120 spawn(move || {
121 let websocket = accept(stream.unwrap()).unwrap();
122 if let Err(err) = handle_connection(websocket, store) {
123 eprintln!("Error: {err:?}");
124 }
125 });
126 }
127}
128
129fn handle_connection(
130 mut websocket: tungstenite::WebSocket<TcpStream>,
131 store: Arc<StoreContainer>,
132) -> Result<()> {
133 let state = Arc::new(Mutex::new(ConnectionState {
134 store,
135 viewer: Viewer::new(),
136 view_rect: ViewRect {
137 x: 0,
138 y: 0,
139 width: 1,
140 height: 1,
141 horizontal_pixels: 1,
142 query: String::new(),
143 view_mode: "aggregated".to_string(),
144 value_mode: "duration".to_string(),
145 count_filter: None,
146 value_filter: None,
147 },
148 last_update_generation: 0,
149 }));
150 let mut update_skipped = false;
151 let mut ready_for_update = true;
152
153 fn send_update(
154 websocket: &mut tungstenite::WebSocket<TcpStream>,
155 state: &mut ConnectionState,
156 force_send: bool,
157 ready_for_update: &mut bool,
158 update_skipped: &mut bool,
159 ) -> Result<()> {
160 if !*ready_for_update {
161 if force_send {
162 *update_skipped = true;
163 }
164 return Ok(());
165 }
166 let store = state.store.read();
167 if !force_send && state.last_update_generation == store.generation() {
168 return Ok(());
169 }
170 state.last_update_generation = store.generation();
171 let Update {
172 lines: updates,
173 max,
174 } = state.viewer.compute_update(&store, &state.view_rect);
175 let count = updates.len();
176 for update in updates {
177 let message = ServerToClientMessage::ViewLine { update };
178 let message = serde_json::to_string(&message).unwrap();
179 websocket.send(Message::Text(message))?;
180 }
181 let message = ServerToClientMessage::ViewLinesCount { count, max };
182 let message = serde_json::to_string(&message).unwrap();
183 websocket.send(Message::Text(message))?;
184 *ready_for_update = false;
185 Ok(())
186 }
187 loop {
188 match websocket.read()? {
189 Message::Frame(_frame) => {}
190 Message::Text(text) => {
191 let message: ClientToServerMessage = serde_json::from_str(&text)?;
192 let mut state = state.lock().unwrap();
193 match message {
194 ClientToServerMessage::CheckForMoreData => {
195 send_update(
196 &mut websocket,
197 &mut state,
198 false,
199 &mut ready_for_update,
200 &mut update_skipped,
201 )?;
202 }
203 ClientToServerMessage::ViewRect { view_rect } => {
204 state.view_rect = view_rect;
205 send_update(
206 &mut websocket,
207 &mut state,
208 true,
209 &mut ready_for_update,
210 &mut update_skipped,
211 )?;
212 }
213 ClientToServerMessage::ViewMode { id, mode, inherit } => {
214 let (mode, sorted) = if let Some(mode) = mode.strip_suffix("-sorted") {
215 (mode, true)
216 } else {
217 (mode.as_str(), false)
218 };
219 match mode {
220 "raw-spans" => {
221 state.viewer.set_view_mode(
222 id,
223 Some((ViewMode::RawSpans { sorted }, inherit)),
224 );
225 }
226 "aggregated" => {
227 state.viewer.set_view_mode(
228 id,
229 Some((ViewMode::Aggregated { sorted }, inherit)),
230 );
231 }
232 "bottom-up" => {
233 state.viewer.set_view_mode(
234 id,
235 Some((ViewMode::BottomUp { sorted }, inherit)),
236 );
237 }
238 "aggregated-bottom-up" => {
239 state.viewer.set_view_mode(
240 id,
241 Some((ViewMode::AggregatedBottomUp { sorted }, inherit)),
242 );
243 }
244 _ => {
245 bail!("unknown view mode: {}", mode)
246 }
247 }
248 send_update(
249 &mut websocket,
250 &mut state,
251 true,
252 &mut ready_for_update,
253 &mut update_skipped,
254 )?;
255 }
256 ClientToServerMessage::ResetViewMode { id } => {
257 state.viewer.set_view_mode(id, None);
258 send_update(
259 &mut websocket,
260 &mut state,
261 true,
262 &mut ready_for_update,
263 &mut update_skipped,
264 )?;
265 }
266 ClientToServerMessage::Query { id } => {
267 let message = {
268 let store = state.store.read();
269 if let Some((span, is_graph)) = store.span(id) {
270 let root_start = store.root_span().start();
271 let span_start = span.start() - root_start;
272 let span_end = span.end() - root_start;
273 let duration = span.corrected_total_time();
274 let cpu = span.total_time();
275 let allocations = span.total_allocations();
276 let deallocations = span.total_deallocations();
277 let allocation_count = span.total_allocation_count();
278 let persistent_allocations = span.total_persistent_allocations();
279 let args = span
280 .args()
281 .map(|(k, v)| (k.to_string(), v.to_string()))
282 .collect();
283 let mut path = Vec::new();
284 let mut current = span;
285 while let Some(parent) = current.parent() {
286 path.push(parent.nice_name().1.to_string());
287 current = parent;
288 }
289 path.reverse();
290 let memory_samples =
291 store.memory_samples_for_range(span.start(), span.end());
292 ServerToClientMessage::QueryResult {
293 id,
294 is_graph,
295 start: span_start,
296 end: span_end,
297 duration,
298 cpu,
299 allocations,
300 deallocations,
301 allocation_count,
302 persistent_allocations,
303 args,
304 path,
305 memory_samples,
306 }
307 } else {
308 ServerToClientMessage::QueryResult {
309 id,
310 is_graph: false,
311 start: Timestamp::ZERO,
312 end: Timestamp::ZERO,
313 duration: Timestamp::ZERO,
314 cpu: Timestamp::ZERO,
315 allocations: 0,
316 deallocations: 0,
317 allocation_count: 0,
318 persistent_allocations: 0,
319 args: Vec::new(),
320 path: Vec::new(),
321 memory_samples: Vec::new(),
322 }
323 }
324 };
325 let message = serde_json::to_string(&message).unwrap();
326 websocket.send(Message::Text(message))?;
327 send_update(
328 &mut websocket,
329 &mut state,
330 true,
331 &mut ready_for_update,
332 &mut update_skipped,
333 )?;
334
335 continue;
336 }
337 ClientToServerMessage::Ack => {
338 ready_for_update = true;
339 if update_skipped {
340 update_skipped = false;
341 send_update(
342 &mut websocket,
343 &mut state,
344 true,
345 &mut ready_for_update,
346 &mut update_skipped,
347 )?;
348 }
349 }
350 }
351 }
352 Message::Binary(_) => {
353 }
355 Message::Close(_) => {
356 return Ok(());
357 }
358 Message::Ping(d) => {
359 websocket.send(Message::Pong(d))?;
360 }
361 Message::Pong(_) => {
362 }
364 }
365 }
366}