1#![feature(min_specialization)]
2#![feature(str_split_remainder)]
3#![feature(arbitrary_self_types)]
4#![feature(arbitrary_self_types_pointers)]
5
6pub mod html;
7mod http;
8pub mod introspect;
9mod invalidation;
10pub mod source;
11pub mod update;
12
13use std::{
14 collections::VecDeque,
15 future::Future,
16 net::{SocketAddr, TcpListener},
17 pin::Pin,
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use anyhow::{Context, Result};
23use hyper::{
24 Request, Response, Server,
25 server::{Builder, conn::AddrIncoming},
26 service::{make_service_fn, service_fn},
27};
28use parking_lot::Mutex;
29use socket2::{Domain, Protocol, Socket, Type};
30use tokio::task::JoinHandle;
31use tracing::{Instrument, Level, Span, event, info_span};
32use turbo_tasks::{
33 Effects, NonLocalValue, OperationVc, PrettyPrintError, TurboTasksApi, Vc, run_once_with_reason,
34 take_effects, trace::TraceRawVcs, util::FormatDuration,
35};
36use turbopack_core::issue::{IssueReporter, IssueSeverity, handle_issues};
37
38use self::{source::ContentSource, update::UpdateServer};
39use crate::{
40 invalidation::{ServerRequest, ServerRequestSideEffects},
41 source::ContentSourceSideEffect,
42};
43
44pub trait SourceProvider: Send + Clone + 'static {
45 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
47}
48
49impl<T> SourceProvider for T
50where
51 T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
52{
53 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
54 self()
55 }
56}
57
58#[turbo_tasks::value(serialization = "skip")]
59struct ContentSourceWithIssues {
60 source_op: OperationVc<Box<dyn ContentSource>>,
61 effects: Effects,
62}
63
64#[turbo_tasks::function(operation, root)]
65async fn get_source_with_issues_operation(
66 source_op: OperationVc<Box<dyn ContentSource>>,
67) -> Result<Vc<ContentSourceWithIssues>> {
68 let _ = source_op.resolve().strongly_consistent().await?;
69 let effects = take_effects(source_op).await?;
70 Ok(ContentSourceWithIssues { source_op, effects }.cell())
71}
72
73#[derive(TraceRawVcs, Debug, NonLocalValue)]
74pub struct DevServerBuilder {
75 #[turbo_tasks(trace_ignore)]
76 pub addr: SocketAddr,
77 #[turbo_tasks(trace_ignore)]
78 server: Builder<AddrIncoming>,
79}
80
81#[derive(TraceRawVcs, NonLocalValue)]
82pub struct DevServer {
83 #[turbo_tasks(trace_ignore)]
84 pub addr: SocketAddr,
85 #[turbo_tasks(trace_ignore)]
86 pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
87}
88
89impl DevServer {
90 pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
91 let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
98 .context("unable to create socket")?;
99 #[cfg(not(windows))]
104 let _ = socket.set_reuse_address(true);
105 if matches!(addr, SocketAddr::V6(_)) {
106 let _ = socket.set_only_v6(false);
108 }
109 let sock_addr = addr.into();
110 socket
111 .bind(&sock_addr)
112 .context("not able to bind address")?;
113 socket.listen(128).context("not able to listen on socket")?;
114
115 let listener: TcpListener = socket.into();
116 let addr = listener
117 .local_addr()
118 .context("not able to get bound address")?;
119 let server = Server::from_tcp(listener).context("Not able to start server")?;
120 Ok(DevServerBuilder { addr, server })
121 }
122}
123
124impl DevServerBuilder {
125 pub fn serve(
126 self,
127 turbo_tasks: Arc<dyn TurboTasksApi>,
128 source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
129 get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
130 ) -> DevServer {
131 let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
132 Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
133 >::with_capacity(16)));
134 let make_svc = make_service_fn(move |_| {
135 let tt = turbo_tasks.clone();
136 let source_provider = source_provider.clone();
137 let get_issue_reporter = get_issue_reporter.clone();
138 let ongoing_side_effects = ongoing_side_effects.clone();
139 async move {
140 let handler = move |request: Request<hyper::Body>| {
141 let request_span = info_span!(parent: None, "request", name = ?request.uri());
142 let start = Instant::now();
143 let tt = tt.clone();
144 let get_issue_reporter = get_issue_reporter.clone();
145 let ongoing_side_effects = ongoing_side_effects.clone();
146 let source_provider = source_provider.clone();
147 let future = async move {
148 event!(parent: Span::current(), Level::DEBUG, "request start");
149 let current_ongoing_side_effects = {
153 let mut guard = ongoing_side_effects.lock();
155 while let Some(front) = guard.front() {
156 let Ok(front_guard) = front.try_lock() else {
157 break;
158 };
159 if front_guard.is_some() {
160 break;
161 }
162 drop(front_guard);
163 guard.pop_front();
164 }
165 (*guard).clone()
167 };
168 for side_effect_mutex in current_ongoing_side_effects {
170 let mut guard = side_effect_mutex.lock().await;
171 if let Some(join_handle) = guard.take() {
172 join_handle.await??;
173 }
174 drop(guard);
175 }
176 let reason = ServerRequest {
177 method: request.method().clone(),
178 uri: request.uri().clone(),
179 };
180 let side_effects_reason = ServerRequestSideEffects {
181 method: request.method().clone(),
182 uri: request.uri().clone(),
183 };
184 run_once_with_reason(tt.clone(), reason, async move {
185 let issue_reporter = get_issue_reporter();
188
189 if hyper_tungstenite::is_upgrade_request(&request) {
190 let uri = request.uri();
191 let path = uri.path();
192
193 if path == "/turbopack-hmr" {
194 let (response, websocket) =
195 hyper_tungstenite::upgrade(request, None)?;
196 let update_server =
197 UpdateServer::new(source_provider, issue_reporter);
198 update_server.run(&*tt, websocket);
199 return Ok(response);
200 }
201
202 println!("[404] {path} (WebSocket)");
203 if path == "/_next/hmr" {
204 println!(
209 "A non-turbopack next.js client is trying to connect."
210 );
211 println!(
212 "Make sure to reload/close any browser window which has \
213 been opened without --turbo."
214 );
215 }
216
217 return Ok(Response::builder()
218 .status(404)
219 .body(hyper::Body::empty())?);
220 }
221
222 let uri = request.uri();
223 let path = uri.path().to_string();
224 let source_with_issues_op =
225 get_source_with_issues_operation(source_provider.get_source());
226 let ContentSourceWithIssues { source_op, effects } =
227 &*source_with_issues_op.read_strongly_consistent().await?;
228 effects.apply().await?;
229 handle_issues(
230 source_with_issues_op,
231 issue_reporter,
232 IssueSeverity::Fatal,
233 Some(&path),
234 Some("get source"),
235 )
236 .await?;
237 let (response, side_effects) =
238 http::process_request_with_content_source(
239 *source_op,
247 request,
248 issue_reporter,
249 )
250 .await?;
251 let status = response.status().as_u16();
252 let is_error = response.status().is_client_error()
253 || response.status().is_server_error();
254 let elapsed = start.elapsed();
255 if is_error
256 || (cfg!(feature = "log_request_stats")
257 && elapsed > Duration::from_secs(1))
258 {
259 println!(
260 "[{status}] {path} ({duration})",
261 duration = FormatDuration(elapsed)
262 );
263 }
264 if !side_effects.is_empty() {
265 let join_handle = tokio::spawn(run_once_with_reason(
266 tt.clone(),
267 side_effects_reason,
268 async move {
269 for side_effect in side_effects {
270 side_effect.apply().await?;
271 }
272 Ok(())
273 },
274 ));
275 ongoing_side_effects.lock().push_back(Arc::new(
276 tokio::sync::Mutex::new(Some(join_handle)),
277 ));
278 }
279 Ok(response)
280 })
281 .await
282 };
283 async move {
284 match future.await {
285 Ok(r) => Ok::<_, hyper::http::Error>(r),
286 Err(e) => {
287 println!(
288 "[500] error ({}): {}",
289 FormatDuration(start.elapsed()),
290 PrettyPrintError(&e),
291 );
292 Ok(Response::builder()
293 .status(500)
294 .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
295 }
296 }
297 }
298 .instrument(request_span)
299 };
300 anyhow::Ok(service_fn(handler))
301 }
302 });
303 let server = self.server.serve(make_svc);
304
305 DevServer {
306 addr: self.addr,
307 future: Box::pin(async move {
308 server.await?;
309 Ok(())
310 }),
311 }
312 }
313}