1#![feature(min_specialization)]
2#![feature(trait_alias)]
3#![feature(iter_intersperse)]
4#![feature(str_split_remainder)]
5#![feature(arbitrary_self_types)]
6#![feature(arbitrary_self_types_pointers)]
7
8pub mod html;
9mod http;
10pub mod introspect;
11mod invalidation;
12pub mod source;
13pub mod update;
14
15use std::{
16 collections::VecDeque,
17 future::Future,
18 net::{SocketAddr, TcpListener},
19 pin::Pin,
20 sync::Arc,
21 time::{Duration, Instant},
22};
23
24use anyhow::{Context, Result};
25use hyper::{
26 Request, Response, Server,
27 server::{Builder, conn::AddrIncoming},
28 service::{make_service_fn, service_fn},
29};
30use parking_lot::Mutex;
31use socket2::{Domain, Protocol, Socket, Type};
32use tokio::task::JoinHandle;
33use tracing::{Instrument, Level, Span, event, info_span};
34use turbo_tasks::{
35 Effects, NonLocalValue, OperationVc, PrettyPrintError, TurboTasksApi, Vc, get_effects,
36 run_once_with_reason, trace::TraceRawVcs, util::FormatDuration,
37};
38use turbopack_core::issue::{IssueReporter, IssueSeverity, handle_issues};
39
40use self::{source::ContentSource, update::UpdateServer};
41use crate::{
42 invalidation::{ServerRequest, ServerRequestSideEffects},
43 source::ContentSourceSideEffect,
44};
45
46pub trait SourceProvider: Send + Clone + 'static {
47 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
49}
50
51impl<T> SourceProvider for T
52where
53 T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
54{
55 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
56 self()
57 }
58}
59
60#[turbo_tasks::value(serialization = "none")]
61struct ContentSourceWithIssues {
62 source_op: OperationVc<Box<dyn ContentSource>>,
63 effects: Effects,
64}
65
66#[turbo_tasks::function(operation)]
67async fn get_source_with_issues_operation(
68 source_op: OperationVc<Box<dyn ContentSource>>,
69) -> Result<Vc<ContentSourceWithIssues>> {
70 let _ = source_op.resolve_strongly_consistent().await?;
71 let effects = get_effects(source_op).await?;
72 Ok(ContentSourceWithIssues { source_op, effects }.cell())
73}
74
75#[derive(TraceRawVcs, Debug, NonLocalValue)]
76pub struct DevServerBuilder {
77 #[turbo_tasks(trace_ignore)]
78 pub addr: SocketAddr,
79 #[turbo_tasks(trace_ignore)]
80 server: Builder<AddrIncoming>,
81}
82
83#[derive(TraceRawVcs, NonLocalValue)]
84pub struct DevServer {
85 #[turbo_tasks(trace_ignore)]
86 pub addr: SocketAddr,
87 #[turbo_tasks(trace_ignore)]
88 pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89}
90
91impl DevServer {
92 pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
93 let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
100 .context("unable to create socket")?;
101 #[cfg(not(windows))]
106 let _ = socket.set_reuse_address(true);
107 if matches!(addr, SocketAddr::V6(_)) {
108 let _ = socket.set_only_v6(false);
110 }
111 let sock_addr = addr.into();
112 socket
113 .bind(&sock_addr)
114 .context("not able to bind address")?;
115 socket.listen(128).context("not able to listen on socket")?;
116
117 let listener: TcpListener = socket.into();
118 let addr = listener
119 .local_addr()
120 .context("not able to get bound address")?;
121 let server = Server::from_tcp(listener).context("Not able to start server")?;
122 Ok(DevServerBuilder { addr, server })
123 }
124}
125
126impl DevServerBuilder {
127 pub fn serve(
128 self,
129 turbo_tasks: Arc<dyn TurboTasksApi>,
130 source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
131 get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
132 ) -> DevServer {
133 let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
134 Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
135 >::with_capacity(16)));
136 let make_svc = make_service_fn(move |_| {
137 let tt = turbo_tasks.clone();
138 let source_provider = source_provider.clone();
139 let get_issue_reporter = get_issue_reporter.clone();
140 let ongoing_side_effects = ongoing_side_effects.clone();
141 async move {
142 let handler = move |request: Request<hyper::Body>| {
143 let request_span = info_span!(parent: None, "request", name = ?request.uri());
144 let start = Instant::now();
145 let tt = tt.clone();
146 let get_issue_reporter = get_issue_reporter.clone();
147 let ongoing_side_effects = ongoing_side_effects.clone();
148 let source_provider = source_provider.clone();
149 let future = async move {
150 event!(parent: Span::current(), Level::DEBUG, "request start");
151 let current_ongoing_side_effects = {
155 let mut guard = ongoing_side_effects.lock();
157 while let Some(front) = guard.front() {
158 let Ok(front_guard) = front.try_lock() else {
159 break;
160 };
161 if front_guard.is_some() {
162 break;
163 }
164 drop(front_guard);
165 guard.pop_front();
166 }
167 (*guard).clone()
169 };
170 for side_effect_mutex in current_ongoing_side_effects {
172 let mut guard = side_effect_mutex.lock().await;
173 if let Some(join_handle) = guard.take() {
174 join_handle.await??;
175 }
176 drop(guard);
177 }
178 let reason = ServerRequest {
179 method: request.method().clone(),
180 uri: request.uri().clone(),
181 };
182 let side_effects_reason = ServerRequestSideEffects {
183 method: request.method().clone(),
184 uri: request.uri().clone(),
185 };
186 run_once_with_reason(tt.clone(), reason, async move {
187 let issue_reporter = get_issue_reporter();
190
191 if hyper_tungstenite::is_upgrade_request(&request) {
192 let uri = request.uri();
193 let path = uri.path();
194
195 if path == "/turbopack-hmr" {
196 let (response, websocket) =
197 hyper_tungstenite::upgrade(request, None)?;
198 let update_server =
199 UpdateServer::new(source_provider, issue_reporter);
200 update_server.run(&*tt, websocket);
201 return Ok(response);
202 }
203
204 println!("[404] {path} (WebSocket)");
205 if path == "/_next/hmr" {
206 println!(
211 "A non-turbopack next.js client is trying to connect."
212 );
213 println!(
214 "Make sure to reload/close any browser window which has \
215 been opened without --turbo."
216 );
217 }
218
219 return Ok(Response::builder()
220 .status(404)
221 .body(hyper::Body::empty())?);
222 }
223
224 let uri = request.uri();
225 let path = uri.path().to_string();
226 let source_with_issues_op =
227 get_source_with_issues_operation(source_provider.get_source());
228 let ContentSourceWithIssues { source_op, effects } =
229 &*source_with_issues_op.read_strongly_consistent().await?;
230 effects.apply().await?;
231 handle_issues(
232 source_with_issues_op,
233 issue_reporter,
234 IssueSeverity::Fatal,
235 Some(&path),
236 Some("get source"),
237 )
238 .await?;
239 let (response, side_effects) =
240 http::process_request_with_content_source(
241 *source_op,
249 request,
250 issue_reporter,
251 )
252 .await?;
253 let status = response.status().as_u16();
254 let is_error = response.status().is_client_error()
255 || response.status().is_server_error();
256 let elapsed = start.elapsed();
257 if is_error
258 || (cfg!(feature = "log_request_stats")
259 && elapsed > Duration::from_secs(1))
260 {
261 println!(
262 "[{status}] {path} ({duration})",
263 duration = FormatDuration(elapsed)
264 );
265 }
266 if !side_effects.is_empty() {
267 let join_handle = tokio::spawn(run_once_with_reason(
268 tt.clone(),
269 side_effects_reason,
270 async move {
271 for side_effect in side_effects {
272 side_effect.apply().await?;
273 }
274 Ok(())
275 },
276 ));
277 ongoing_side_effects.lock().push_back(Arc::new(
278 tokio::sync::Mutex::new(Some(join_handle)),
279 ));
280 }
281 Ok(response)
282 })
283 .await
284 };
285 async move {
286 match future.await {
287 Ok(r) => Ok::<_, hyper::http::Error>(r),
288 Err(e) => {
289 println!(
290 "[500] error ({}): {}",
291 FormatDuration(start.elapsed()),
292 PrettyPrintError(&e),
293 );
294 Ok(Response::builder()
295 .status(500)
296 .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
297 }
298 }
299 }
300 .instrument(request_span)
301 };
302 anyhow::Ok(service_fn(handler))
303 }
304 });
305 let server = self.server.serve(make_svc);
306
307 DevServer {
308 addr: self.addr,
309 future: Box::pin(async move {
310 server.await?;
311 Ok(())
312 }),
313 }
314 }
315}