1#![feature(min_specialization)]
2#![feature(trait_alias)]
3#![feature(array_chunks)]
4#![feature(iter_intersperse)]
5#![feature(str_split_remainder)]
6#![feature(arbitrary_self_types)]
7#![feature(arbitrary_self_types_pointers)]
8
9pub mod html;
10mod http;
11pub mod introspect;
12mod invalidation;
13pub mod source;
14pub mod update;
15
16use std::{
17 collections::VecDeque,
18 future::Future,
19 net::{SocketAddr, TcpListener},
20 pin::Pin,
21 sync::Arc,
22 time::{Duration, Instant},
23};
24
25use anyhow::{Context, Result};
26use hyper::{
27 Request, Response, Server,
28 server::{Builder, conn::AddrIncoming},
29 service::{make_service_fn, service_fn},
30};
31use parking_lot::Mutex;
32use socket2::{Domain, Protocol, Socket, Type};
33use tokio::task::JoinHandle;
34use tracing::{Instrument, Level, Span, event, info_span};
35use turbo_tasks::{
36 NonLocalValue, OperationVc, TurboTasksApi, Vc, apply_effects, run_once_with_reason,
37 trace::TraceRawVcs, util::FormatDuration,
38};
39use turbopack_core::{
40 error::PrettyPrintError,
41 issue::{IssueReporter, IssueSeverity, handle_issues},
42};
43
44use self::{source::ContentSource, update::UpdateServer};
45use crate::{
46 invalidation::{ServerRequest, ServerRequestSideEffects},
47 source::ContentSourceSideEffect,
48};
49
50pub trait SourceProvider: Send + Clone + 'static {
51 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
53}
54
55impl<T> SourceProvider for T
56where
57 T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
58{
59 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
60 self()
61 }
62}
63
64#[derive(TraceRawVcs, Debug, NonLocalValue)]
65pub struct DevServerBuilder {
66 #[turbo_tasks(trace_ignore)]
67 pub addr: SocketAddr,
68 #[turbo_tasks(trace_ignore)]
69 server: Builder<AddrIncoming>,
70}
71
72#[derive(TraceRawVcs, NonLocalValue)]
73pub struct DevServer {
74 #[turbo_tasks(trace_ignore)]
75 pub addr: SocketAddr,
76 #[turbo_tasks(trace_ignore)]
77 pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
78}
79
80impl DevServer {
81 pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
82 let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
89 .context("unable to create socket")?;
90 #[cfg(not(windows))]
95 let _ = socket.set_reuse_address(true);
96 if matches!(addr, SocketAddr::V6(_)) {
97 let _ = socket.set_only_v6(false);
99 }
100 let sock_addr = addr.into();
101 socket
102 .bind(&sock_addr)
103 .context("not able to bind address")?;
104 socket.listen(128).context("not able to listen on socket")?;
105
106 let listener: TcpListener = socket.into();
107 let addr = listener
108 .local_addr()
109 .context("not able to get bound address")?;
110 let server = Server::from_tcp(listener).context("Not able to start server")?;
111 Ok(DevServerBuilder { addr, server })
112 }
113}
114
115impl DevServerBuilder {
116 pub fn serve(
117 self,
118 turbo_tasks: Arc<dyn TurboTasksApi>,
119 source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
120 get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
121 ) -> DevServer {
122 let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
123 Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
124 >::with_capacity(16)));
125 let make_svc = make_service_fn(move |_| {
126 let tt = turbo_tasks.clone();
127 let source_provider = source_provider.clone();
128 let get_issue_reporter = get_issue_reporter.clone();
129 let ongoing_side_effects = ongoing_side_effects.clone();
130 async move {
131 let handler = move |request: Request<hyper::Body>| {
132 let request_span = info_span!(parent: None, "request", name = ?request.uri());
133 let start = Instant::now();
134 let tt = tt.clone();
135 let get_issue_reporter = get_issue_reporter.clone();
136 let ongoing_side_effects = ongoing_side_effects.clone();
137 let source_provider = source_provider.clone();
138 let future = async move {
139 event!(parent: Span::current(), Level::DEBUG, "request start");
140 let current_ongoing_side_effects = {
144 let mut guard = ongoing_side_effects.lock();
146 while let Some(front) = guard.front() {
147 let Ok(front_guard) = front.try_lock() else {
148 break;
149 };
150 if front_guard.is_some() {
151 break;
152 }
153 drop(front_guard);
154 guard.pop_front();
155 }
156 (*guard).clone()
158 };
159 for side_effect_mutex in current_ongoing_side_effects {
161 let mut guard = side_effect_mutex.lock().await;
162 if let Some(join_handle) = guard.take() {
163 join_handle.await??;
164 }
165 drop(guard);
166 }
167 let reason = ServerRequest {
168 method: request.method().clone(),
169 uri: request.uri().clone(),
170 };
171 let side_effects_reason = ServerRequestSideEffects {
172 method: request.method().clone(),
173 uri: request.uri().clone(),
174 };
175 run_once_with_reason(tt.clone(), reason, async move {
176 let issue_reporter = get_issue_reporter();
179
180 if hyper_tungstenite::is_upgrade_request(&request) {
181 let uri = request.uri();
182 let path = uri.path();
183
184 if path == "/turbopack-hmr" {
185 let (response, websocket) =
186 hyper_tungstenite::upgrade(request, None)?;
187 let update_server =
188 UpdateServer::new(source_provider, issue_reporter);
189 update_server.run(&*tt, websocket);
190 return Ok(response);
191 }
192
193 println!("[404] {path} (WebSocket)");
194 if path == "/_next/webpack-hmr" {
195 println!(
200 "A non-turbopack next.js client is trying to connect."
201 );
202 println!(
203 "Make sure to reload/close any browser window which has \
204 been opened without --turbo."
205 );
206 }
207
208 return Ok(Response::builder()
209 .status(404)
210 .body(hyper::Body::empty())?);
211 }
212
213 let uri = request.uri();
214 let path = uri.path().to_string();
215 let source_op = source_provider.get_source();
216 let _ = source_op.resolve_strongly_consistent().await?;
218 apply_effects(source_op).await?;
219 handle_issues(
220 source_op,
221 issue_reporter,
222 IssueSeverity::Fatal.cell(),
223 Some(&path),
224 Some("get source"),
225 )
226 .await?;
227 let (response, side_effects) =
228 http::process_request_with_content_source(
229 source_op,
237 request,
238 issue_reporter,
239 )
240 .await?;
241 let status = response.status().as_u16();
242 let is_error = response.status().is_client_error()
243 || response.status().is_server_error();
244 let elapsed = start.elapsed();
245 if is_error
246 || (cfg!(feature = "log_request_stats")
247 && elapsed > Duration::from_secs(1))
248 {
249 println!(
250 "[{status}] {path} ({duration})",
251 duration = FormatDuration(elapsed)
252 );
253 }
254 if !side_effects.is_empty() {
255 let join_handle = tokio::spawn(run_once_with_reason(
256 tt.clone(),
257 side_effects_reason,
258 async move {
259 for side_effect in side_effects {
260 side_effect.apply().await?;
261 }
262 Ok(())
263 },
264 ));
265 ongoing_side_effects.lock().push_back(Arc::new(
266 tokio::sync::Mutex::new(Some(join_handle)),
267 ));
268 }
269 Ok(response)
270 })
271 .await
272 };
273 async move {
274 match future.await {
275 Ok(r) => Ok::<_, hyper::http::Error>(r),
276 Err(e) => {
277 println!(
278 "[500] error ({}): {}",
279 FormatDuration(start.elapsed()),
280 PrettyPrintError(&e),
281 );
282 Ok(Response::builder()
283 .status(500)
284 .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
285 }
286 }
287 }
288 .instrument(request_span)
289 };
290 anyhow::Ok(service_fn(handler))
291 }
292 });
293 let server = self.server.serve(make_svc);
294
295 DevServer {
296 addr: self.addr,
297 future: Box::pin(async move {
298 server.await?;
299 Ok(())
300 }),
301 }
302 }
303}
304
305pub fn register() {
306 turbo_tasks::register();
307 turbo_tasks_bytes::register();
308 turbo_tasks_fs::register();
309 turbopack_core::register();
310 turbopack_cli_utils::register();
311 turbopack_ecmascript::register();
312 include!(concat!(env!("OUT_DIR"), "/register.rs"));
313}