1#![feature(min_specialization)]
2#![feature(str_split_remainder)]
3#![feature(arbitrary_self_types)]
4#![feature(arbitrary_self_types_pointers)]
5
6pub mod html;
7mod http;
8pub mod introspect;
9mod invalidation;
10pub mod source;
11pub mod update;
12
13use std::{
14 collections::VecDeque,
15 future::Future,
16 net::{SocketAddr, TcpListener},
17 pin::Pin,
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use anyhow::{Context, Result};
23use hyper::{
24 Request, Response, Server,
25 server::{Builder, conn::AddrIncoming},
26 service::{make_service_fn, service_fn},
27};
28use parking_lot::Mutex;
29use socket2::{Domain, Protocol, Socket, Type};
30use tokio::task::JoinHandle;
31use tracing::{Instrument, Level, Span, event, info_span};
32use turbo_tasks::{
33 Effects, NonLocalValue, OperationVc, PrettyPrintError, TurboTasksApi, Vc,
34 read_strongly_consistent_and_apply_effects, run_once_with_reason, take_effects,
35 trace::TraceRawVcs, util::FormatDuration,
36};
37use turbopack_core::issue::{IssueReporter, IssueSeverity, handle_issues};
38
39use self::{source::ContentSource, update::UpdateServer};
40use crate::{
41 invalidation::{ServerRequest, ServerRequestSideEffects},
42 source::ContentSourceSideEffect,
43};
44
45pub trait SourceProvider: Send + Clone + 'static {
46 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
48}
49
50impl<T> SourceProvider for T
51where
52 T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
53{
54 fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
55 self()
56 }
57}
58
59#[turbo_tasks::value(serialization = "skip")]
60struct ContentSourceWithIssues {
61 source_op: OperationVc<Box<dyn ContentSource>>,
62 effects: Effects,
63}
64
65#[turbo_tasks::function(operation, root)]
66async fn get_source_with_issues_operation(
67 source_op: OperationVc<Box<dyn ContentSource>>,
68) -> Result<Vc<ContentSourceWithIssues>> {
69 let _ = source_op.resolve().strongly_consistent().await?;
70 let effects = take_effects(source_op).await?;
71 Ok(ContentSourceWithIssues { source_op, effects }.cell())
72}
73
74#[derive(TraceRawVcs, Debug, NonLocalValue)]
75pub struct DevServerBuilder {
76 #[turbo_tasks(trace_ignore)]
77 pub addr: SocketAddr,
78 #[turbo_tasks(trace_ignore)]
79 server: Builder<AddrIncoming>,
80}
81
82#[derive(TraceRawVcs, NonLocalValue)]
83pub struct DevServer {
84 #[turbo_tasks(trace_ignore)]
85 pub addr: SocketAddr,
86 #[turbo_tasks(trace_ignore)]
87 pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
88}
89
90impl DevServer {
91 pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
92 let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
99 .context("unable to create socket")?;
100 #[cfg(not(windows))]
105 let _ = socket.set_reuse_address(true);
106 if matches!(addr, SocketAddr::V6(_)) {
107 let _ = socket.set_only_v6(false);
109 }
110 let sock_addr = addr.into();
111 socket
112 .bind(&sock_addr)
113 .context("not able to bind address")?;
114 socket.listen(128).context("not able to listen on socket")?;
115
116 let listener: TcpListener = socket.into();
117 let addr = listener
118 .local_addr()
119 .context("not able to get bound address")?;
120 let server = Server::from_tcp(listener).context("Not able to start server")?;
121 Ok(DevServerBuilder { addr, server })
122 }
123}
124
125impl DevServerBuilder {
126 pub fn serve(
127 self,
128 turbo_tasks: Arc<dyn TurboTasksApi>,
129 source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
130 get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
131 ) -> DevServer {
132 let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
133 Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
134 >::with_capacity(16)));
135 let make_svc = make_service_fn(move |_| {
136 let tt = turbo_tasks.clone();
137 let source_provider = source_provider.clone();
138 let get_issue_reporter = get_issue_reporter.clone();
139 let ongoing_side_effects = ongoing_side_effects.clone();
140 async move {
141 let handler = move |request: Request<hyper::Body>| {
142 let request_span = info_span!(parent: None, "request", name = ?request.uri());
143 let start = Instant::now();
144 let tt = tt.clone();
145 let get_issue_reporter = get_issue_reporter.clone();
146 let ongoing_side_effects = ongoing_side_effects.clone();
147 let source_provider = source_provider.clone();
148 let future = async move {
149 event!(parent: Span::current(), Level::DEBUG, "request start");
150 let current_ongoing_side_effects = {
154 let mut guard = ongoing_side_effects.lock();
156 while let Some(front) = guard.front() {
157 let Ok(front_guard) = front.try_lock() else {
158 break;
159 };
160 if front_guard.is_some() {
161 break;
162 }
163 drop(front_guard);
164 guard.pop_front();
165 }
166 (*guard).clone()
168 };
169 for side_effect_mutex in current_ongoing_side_effects {
171 let mut guard = side_effect_mutex.lock().await;
172 if let Some(join_handle) = guard.take() {
173 join_handle.await??;
174 }
175 drop(guard);
176 }
177 let reason = ServerRequest {
178 method: request.method().clone(),
179 uri: request.uri().clone(),
180 };
181 let side_effects_reason = ServerRequestSideEffects {
182 method: request.method().clone(),
183 uri: request.uri().clone(),
184 };
185 run_once_with_reason(tt.clone(), reason, async move {
186 let issue_reporter = get_issue_reporter();
189
190 if hyper_tungstenite::is_upgrade_request(&request) {
191 let uri = request.uri();
192 let path = uri.path();
193
194 if path == "/turbopack-hmr" {
195 let (response, websocket) =
196 hyper_tungstenite::upgrade(request, None)?;
197 let update_server =
198 UpdateServer::new(source_provider, issue_reporter);
199 update_server.run(&*tt, websocket);
200 return Ok(response);
201 }
202
203 println!("[404] {path} (WebSocket)");
204 if path == "/_next/hmr" {
205 println!(
210 "A non-turbopack next.js client is trying to connect."
211 );
212 println!(
213 "Make sure to reload/close any browser window which has \
214 been opened without --turbo."
215 );
216 }
217
218 return Ok(Response::builder()
219 .status(404)
220 .body(hyper::Body::empty())?);
221 }
222
223 let uri = request.uri();
224 let path = uri.path().to_string();
225 let source_with_issues_op =
226 get_source_with_issues_operation(source_provider.get_source());
227 let read = read_strongly_consistent_and_apply_effects(
228 source_with_issues_op,
229 |v| &v.effects,
230 )
231 .await?;
232 let ContentSourceWithIssues { source_op, .. } = &*read;
233 handle_issues(
234 source_with_issues_op,
235 issue_reporter,
236 IssueSeverity::Fatal,
237 Some(&path),
238 Some("get source"),
239 )
240 .await?;
241 let (response, side_effects) =
242 http::process_request_with_content_source(
243 *source_op,
251 request,
252 issue_reporter,
253 )
254 .await?;
255 let status = response.status().as_u16();
256 let is_error = response.status().is_client_error()
257 || response.status().is_server_error();
258 let elapsed = start.elapsed();
259 if is_error
260 || (cfg!(feature = "log_request_stats")
261 && elapsed > Duration::from_secs(1))
262 {
263 println!(
264 "[{status}] {path} ({duration})",
265 duration = FormatDuration(elapsed)
266 );
267 }
268 if !side_effects.is_empty() {
269 let join_handle = tokio::spawn(run_once_with_reason(
270 tt.clone(),
271 side_effects_reason,
272 async move {
273 for side_effect in side_effects {
274 side_effect.apply().await?;
275 }
276 Ok(())
277 },
278 ));
279 ongoing_side_effects.lock().push_back(Arc::new(
280 tokio::sync::Mutex::new(Some(join_handle)),
281 ));
282 }
283 Ok(response)
284 })
285 .await
286 };
287 async move {
288 match future.await {
289 Ok(r) => Ok::<_, hyper::http::Error>(r),
290 Err(e) => {
291 println!(
292 "[500] error ({}): {}",
293 FormatDuration(start.elapsed()),
294 PrettyPrintError(&e),
295 );
296 Ok(Response::builder()
297 .status(500)
298 .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
299 }
300 }
301 }
302 .instrument(request_span)
303 };
304 anyhow::Ok(service_fn(handler))
305 }
306 });
307 let server = self.server.serve(make_svc);
308
309 DevServer {
310 addr: self.addr,
311 future: Box::pin(async move {
312 server.await?;
313 Ok(())
314 }),
315 }
316 }
317}