1#![feature(min_specialization)]
2#![feature(trait_alias)]
3#![feature(iter_intersperse)]
4#![feature(str_split_remainder)]
5#![feature(arbitrary_self_types)]
6#![feature(arbitrary_self_types_pointers)]
7
8pub mod html;
9mod http;
10pub mod introspect;
11mod invalidation;
12pub mod source;
13pub mod update;
14
15use std::{
16 collections::VecDeque,
17 future::Future,
18 net::{SocketAddr, TcpListener},
19 pin::Pin,
20 sync::Arc,
21 time::{Duration, Instant},
22};
23
24use anyhow::{Context, Result};
25use hyper::{
26 Request, Response, Server,
27 server::{Builder, conn::AddrIncoming},
28 service::{make_service_fn, service_fn},
29};
30use parking_lot::Mutex;
31use socket2::{Domain, Protocol, Socket, Type};
32use tokio::task::JoinHandle;
33use tracing::{Instrument, Level, Span, event, info_span};
34use turbo_tasks::{
35 NonLocalValue, OperationVc, TurboTasksApi, Vc, apply_effects, run_once_with_reason,
36 trace::TraceRawVcs, util::FormatDuration,
37};
38use turbopack_core::{
39 error::PrettyPrintError,
40 issue::{IssueReporter, IssueSeverity, handle_issues},
41};
42
43use self::{source::ContentSource, update::UpdateServer};
44use crate::{
45 invalidation::{ServerRequest, ServerRequestSideEffects},
46 source::ContentSourceSideEffect,
47};
48
49pub trait SourceProvider: Send + Clone + 'static {
50 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
52}
53
54impl<T> SourceProvider for T
55where
56 T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
57{
58 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
59 self()
60 }
61}
62
63#[derive(TraceRawVcs, Debug, NonLocalValue)]
64pub struct DevServerBuilder {
65 #[turbo_tasks(trace_ignore)]
66 pub addr: SocketAddr,
67 #[turbo_tasks(trace_ignore)]
68 server: Builder<AddrIncoming>,
69}
70
71#[derive(TraceRawVcs, NonLocalValue)]
72pub struct DevServer {
73 #[turbo_tasks(trace_ignore)]
74 pub addr: SocketAddr,
75 #[turbo_tasks(trace_ignore)]
76 pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
77}
78
79impl DevServer {
80 pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
81 let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
88 .context("unable to create socket")?;
89 #[cfg(not(windows))]
94 let _ = socket.set_reuse_address(true);
95 if matches!(addr, SocketAddr::V6(_)) {
96 let _ = socket.set_only_v6(false);
98 }
99 let sock_addr = addr.into();
100 socket
101 .bind(&sock_addr)
102 .context("not able to bind address")?;
103 socket.listen(128).context("not able to listen on socket")?;
104
105 let listener: TcpListener = socket.into();
106 let addr = listener
107 .local_addr()
108 .context("not able to get bound address")?;
109 let server = Server::from_tcp(listener).context("Not able to start server")?;
110 Ok(DevServerBuilder { addr, server })
111 }
112}
113
114impl DevServerBuilder {
115 pub fn serve(
116 self,
117 turbo_tasks: Arc<dyn TurboTasksApi>,
118 source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
119 get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
120 ) -> DevServer {
121 let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
122 Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
123 >::with_capacity(16)));
124 let make_svc = make_service_fn(move |_| {
125 let tt = turbo_tasks.clone();
126 let source_provider = source_provider.clone();
127 let get_issue_reporter = get_issue_reporter.clone();
128 let ongoing_side_effects = ongoing_side_effects.clone();
129 async move {
130 let handler = move |request: Request<hyper::Body>| {
131 let request_span = info_span!(parent: None, "request", name = ?request.uri());
132 let start = Instant::now();
133 let tt = tt.clone();
134 let get_issue_reporter = get_issue_reporter.clone();
135 let ongoing_side_effects = ongoing_side_effects.clone();
136 let source_provider = source_provider.clone();
137 let future = async move {
138 event!(parent: Span::current(), Level::DEBUG, "request start");
139 let current_ongoing_side_effects = {
143 let mut guard = ongoing_side_effects.lock();
145 while let Some(front) = guard.front() {
146 let Ok(front_guard) = front.try_lock() else {
147 break;
148 };
149 if front_guard.is_some() {
150 break;
151 }
152 drop(front_guard);
153 guard.pop_front();
154 }
155 (*guard).clone()
157 };
158 for side_effect_mutex in current_ongoing_side_effects {
160 let mut guard = side_effect_mutex.lock().await;
161 if let Some(join_handle) = guard.take() {
162 join_handle.await??;
163 }
164 drop(guard);
165 }
166 let reason = ServerRequest {
167 method: request.method().clone(),
168 uri: request.uri().clone(),
169 };
170 let side_effects_reason = ServerRequestSideEffects {
171 method: request.method().clone(),
172 uri: request.uri().clone(),
173 };
174 run_once_with_reason(tt.clone(), reason, async move {
175 let issue_reporter = get_issue_reporter();
178
179 if hyper_tungstenite::is_upgrade_request(&request) {
180 let uri = request.uri();
181 let path = uri.path();
182
183 if path == "/turbopack-hmr" {
184 let (response, websocket) =
185 hyper_tungstenite::upgrade(request, None)?;
186 let update_server =
187 UpdateServer::new(source_provider, issue_reporter);
188 update_server.run(&*tt, websocket);
189 return Ok(response);
190 }
191
192 println!("[404] {path} (WebSocket)");
193 if path == "/_next/webpack-hmr" {
194 println!(
199 "A non-turbopack next.js client is trying to connect."
200 );
201 println!(
202 "Make sure to reload/close any browser window which has \
203 been opened without --turbo."
204 );
205 }
206
207 return Ok(Response::builder()
208 .status(404)
209 .body(hyper::Body::empty())?);
210 }
211
212 let uri = request.uri();
213 let path = uri.path().to_string();
214 let source_op = source_provider.get_source();
215 let _ = source_op.resolve_strongly_consistent().await?;
217 apply_effects(source_op).await?;
218 handle_issues(
219 source_op,
220 issue_reporter,
221 IssueSeverity::Fatal,
222 Some(&path),
223 Some("get source"),
224 )
225 .await?;
226 let (response, side_effects) =
227 http::process_request_with_content_source(
228 source_op,
236 request,
237 issue_reporter,
238 )
239 .await?;
240 let status = response.status().as_u16();
241 let is_error = response.status().is_client_error()
242 || response.status().is_server_error();
243 let elapsed = start.elapsed();
244 if is_error
245 || (cfg!(feature = "log_request_stats")
246 && elapsed > Duration::from_secs(1))
247 {
248 println!(
249 "[{status}] {path} ({duration})",
250 duration = FormatDuration(elapsed)
251 );
252 }
253 if !side_effects.is_empty() {
254 let join_handle = tokio::spawn(run_once_with_reason(
255 tt.clone(),
256 side_effects_reason,
257 async move {
258 for side_effect in side_effects {
259 side_effect.apply().await?;
260 }
261 Ok(())
262 },
263 ));
264 ongoing_side_effects.lock().push_back(Arc::new(
265 tokio::sync::Mutex::new(Some(join_handle)),
266 ));
267 }
268 Ok(response)
269 })
270 .await
271 };
272 async move {
273 match future.await {
274 Ok(r) => Ok::<_, hyper::http::Error>(r),
275 Err(e) => {
276 println!(
277 "[500] error ({}): {}",
278 FormatDuration(start.elapsed()),
279 PrettyPrintError(&e),
280 );
281 Ok(Response::builder()
282 .status(500)
283 .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
284 }
285 }
286 }
287 .instrument(request_span)
288 };
289 anyhow::Ok(service_fn(handler))
290 }
291 });
292 let server = self.server.serve(make_svc);
293
294 DevServer {
295 addr: self.addr,
296 future: Box::pin(async move {
297 server.await?;
298 Ok(())
299 }),
300 }
301 }
302}