turbopack_dev_server/
lib.rs

1#![feature(min_specialization)]
2#![feature(trait_alias)]
3#![feature(array_chunks)]
4#![feature(iter_intersperse)]
5#![feature(str_split_remainder)]
6#![feature(arbitrary_self_types)]
7#![feature(arbitrary_self_types_pointers)]
8
9pub mod html;
10mod http;
11pub mod introspect;
12mod invalidation;
13pub mod source;
14pub mod update;
15
16use std::{
17    collections::VecDeque,
18    future::Future,
19    net::{SocketAddr, TcpListener},
20    pin::Pin,
21    sync::Arc,
22    time::{Duration, Instant},
23};
24
25use anyhow::{Context, Result};
26use hyper::{
27    Request, Response, Server,
28    server::{Builder, conn::AddrIncoming},
29    service::{make_service_fn, service_fn},
30};
31use parking_lot::Mutex;
32use socket2::{Domain, Protocol, Socket, Type};
33use tokio::task::JoinHandle;
34use tracing::{Instrument, Level, Span, event, info_span};
35use turbo_tasks::{
36    NonLocalValue, OperationVc, TurboTasksApi, Vc, apply_effects, run_once_with_reason,
37    trace::TraceRawVcs, util::FormatDuration,
38};
39use turbopack_core::{
40    error::PrettyPrintError,
41    issue::{IssueReporter, IssueSeverity, handle_issues},
42};
43
44use self::{source::ContentSource, update::UpdateServer};
45use crate::{
46    invalidation::{ServerRequest, ServerRequestSideEffects},
47    source::ContentSourceSideEffect,
48};
49
50pub trait SourceProvider: Send + Clone + 'static {
51    /// must call a turbo-tasks function internally
52    fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
53}
54
55impl<T> SourceProvider for T
56where
57    T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
58{
59    fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
60        self()
61    }
62}
63
64#[derive(TraceRawVcs, Debug, NonLocalValue)]
65pub struct DevServerBuilder {
66    #[turbo_tasks(trace_ignore)]
67    pub addr: SocketAddr,
68    #[turbo_tasks(trace_ignore)]
69    server: Builder<AddrIncoming>,
70}
71
72#[derive(TraceRawVcs, NonLocalValue)]
73pub struct DevServer {
74    #[turbo_tasks(trace_ignore)]
75    pub addr: SocketAddr,
76    #[turbo_tasks(trace_ignore)]
77    pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
78}
79
80impl DevServer {
81    pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
82        // This is annoying. The hyper::Server doesn't allow us to know which port was
83        // bound (until we build it with a request handler) when using the standard
84        // `server::try_bind` approach. This is important when binding the `0` port,
85        // because the OS will remap that to an actual free port, and we need to know
86        // that port before we build the request handler. So we need to construct a
87        // real TCP listener, see if it bound, and get its bound address.
88        let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
89            .context("unable to create socket")?;
90        // Allow the socket to be reused immediately after closing. This ensures that
91        // the dev server can be restarted on the same address without a buffer time for
92        // the OS to release the socket.
93        // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
94        #[cfg(not(windows))]
95        let _ = socket.set_reuse_address(true);
96        if matches!(addr, SocketAddr::V6(_)) {
97            // When possible bind to v4 and v6, otherwise ignore the error
98            let _ = socket.set_only_v6(false);
99        }
100        let sock_addr = addr.into();
101        socket
102            .bind(&sock_addr)
103            .context("not able to bind address")?;
104        socket.listen(128).context("not able to listen on socket")?;
105
106        let listener: TcpListener = socket.into();
107        let addr = listener
108            .local_addr()
109            .context("not able to get bound address")?;
110        let server = Server::from_tcp(listener).context("Not able to start server")?;
111        Ok(DevServerBuilder { addr, server })
112    }
113}
114
115impl DevServerBuilder {
116    pub fn serve(
117        self,
118        turbo_tasks: Arc<dyn TurboTasksApi>,
119        source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
120        get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
121    ) -> DevServer {
122        let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
123            Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
124        >::with_capacity(16)));
125        let make_svc = make_service_fn(move |_| {
126            let tt = turbo_tasks.clone();
127            let source_provider = source_provider.clone();
128            let get_issue_reporter = get_issue_reporter.clone();
129            let ongoing_side_effects = ongoing_side_effects.clone();
130            async move {
131                let handler = move |request: Request<hyper::Body>| {
132                    let request_span = info_span!(parent: None, "request", name = ?request.uri());
133                    let start = Instant::now();
134                    let tt = tt.clone();
135                    let get_issue_reporter = get_issue_reporter.clone();
136                    let ongoing_side_effects = ongoing_side_effects.clone();
137                    let source_provider = source_provider.clone();
138                    let future = async move {
139                        event!(parent: Span::current(), Level::DEBUG, "request start");
140                        // Wait until all ongoing side effects are completed
141                        // We only need to wait for the ongoing side effects that were started
142                        // before this request. Later added side effects are not relevant for this.
143                        let current_ongoing_side_effects = {
144                            // Cleanup the ongoing_side_effects list
145                            let mut guard = ongoing_side_effects.lock();
146                            while let Some(front) = guard.front() {
147                                let Ok(front_guard) = front.try_lock() else {
148                                    break;
149                                };
150                                if front_guard.is_some() {
151                                    break;
152                                }
153                                drop(front_guard);
154                                guard.pop_front();
155                            }
156                            // Get a clone of the remaining list
157                            (*guard).clone()
158                        };
159                        // Wait for the side effects to complete
160                        for side_effect_mutex in current_ongoing_side_effects {
161                            let mut guard = side_effect_mutex.lock().await;
162                            if let Some(join_handle) = guard.take() {
163                                join_handle.await??;
164                            }
165                            drop(guard);
166                        }
167                        let reason = ServerRequest {
168                            method: request.method().clone(),
169                            uri: request.uri().clone(),
170                        };
171                        let side_effects_reason = ServerRequestSideEffects {
172                            method: request.method().clone(),
173                            uri: request.uri().clone(),
174                        };
175                        run_once_with_reason(tt.clone(), reason, async move {
176                            // TODO: `get_issue_reporter` should be an `OperationVc`, as there's a
177                            // risk it could be a task-local Vc, which is not safe for us to await.
178                            let issue_reporter = get_issue_reporter();
179
180                            if hyper_tungstenite::is_upgrade_request(&request) {
181                                let uri = request.uri();
182                                let path = uri.path();
183
184                                if path == "/turbopack-hmr" {
185                                    let (response, websocket) =
186                                        hyper_tungstenite::upgrade(request, None)?;
187                                    let update_server =
188                                        UpdateServer::new(source_provider, issue_reporter);
189                                    update_server.run(&*tt, websocket);
190                                    return Ok(response);
191                                }
192
193                                println!("[404] {path} (WebSocket)");
194                                if path == "/_next/webpack-hmr" {
195                                    // Special-case requests to webpack-hmr as these are made by
196                                    // Next.js clients built
197                                    // without turbopack, which may be making requests in
198                                    // development.
199                                    println!(
200                                        "A non-turbopack next.js client is trying to connect."
201                                    );
202                                    println!(
203                                        "Make sure to reload/close any browser window which has \
204                                         been opened without --turbo."
205                                    );
206                                }
207
208                                return Ok(Response::builder()
209                                    .status(404)
210                                    .body(hyper::Body::empty())?);
211                            }
212
213                            let uri = request.uri();
214                            let path = uri.path().to_string();
215                            let source_op = source_provider.get_source();
216                            // HACK: Resolve `source` now so that we can get any issues on it
217                            let _ = source_op.resolve_strongly_consistent().await?;
218                            apply_effects(source_op).await?;
219                            handle_issues(
220                                source_op,
221                                issue_reporter,
222                                IssueSeverity::Fatal.cell(),
223                                Some(&path),
224                                Some("get source"),
225                            )
226                            .await?;
227                            let (response, side_effects) =
228                                http::process_request_with_content_source(
229                                    // HACK: pass `source` here (instead of `resolved_source`
230                                    // because the underlying API wants to do it's own
231                                    // `resolve_strongly_consistent` call.
232                                    //
233                                    // It's unlikely (the calls happen one-after-another), but this
234                                    // could cause inconsistency between the reported issues and
235                                    // the generated HTTP response.
236                                    source_op,
237                                    request,
238                                    issue_reporter,
239                                )
240                                .await?;
241                            let status = response.status().as_u16();
242                            let is_error = response.status().is_client_error()
243                                || response.status().is_server_error();
244                            let elapsed = start.elapsed();
245                            if is_error
246                                || (cfg!(feature = "log_request_stats")
247                                    && elapsed > Duration::from_secs(1))
248                            {
249                                println!(
250                                    "[{status}] {path} ({duration})",
251                                    duration = FormatDuration(elapsed)
252                                );
253                            }
254                            if !side_effects.is_empty() {
255                                let join_handle = tokio::spawn(run_once_with_reason(
256                                    tt.clone(),
257                                    side_effects_reason,
258                                    async move {
259                                        for side_effect in side_effects {
260                                            side_effect.apply().await?;
261                                        }
262                                        Ok(())
263                                    },
264                                ));
265                                ongoing_side_effects.lock().push_back(Arc::new(
266                                    tokio::sync::Mutex::new(Some(join_handle)),
267                                ));
268                            }
269                            Ok(response)
270                        })
271                        .await
272                    };
273                    async move {
274                        match future.await {
275                            Ok(r) => Ok::<_, hyper::http::Error>(r),
276                            Err(e) => {
277                                println!(
278                                    "[500] error ({}): {}",
279                                    FormatDuration(start.elapsed()),
280                                    PrettyPrintError(&e),
281                                );
282                                Ok(Response::builder()
283                                    .status(500)
284                                    .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
285                            }
286                        }
287                    }
288                    .instrument(request_span)
289                };
290                anyhow::Ok(service_fn(handler))
291            }
292        });
293        let server = self.server.serve(make_svc);
294
295        DevServer {
296            addr: self.addr,
297            future: Box::pin(async move {
298                server.await?;
299                Ok(())
300            }),
301        }
302    }
303}
304
305pub fn register() {
306    turbo_tasks::register();
307    turbo_tasks_bytes::register();
308    turbo_tasks_fs::register();
309    turbopack_core::register();
310    turbopack_cli_utils::register();
311    turbopack_ecmascript::register();
312    include!(concat!(env!("OUT_DIR"), "/register.rs"));
313}