turbopack_dev_server/
lib.rs

1#![feature(min_specialization)]
2#![feature(trait_alias)]
3#![feature(iter_intersperse)]
4#![feature(str_split_remainder)]
5#![feature(arbitrary_self_types)]
6#![feature(arbitrary_self_types_pointers)]
7
8pub mod html;
9mod http;
10pub mod introspect;
11mod invalidation;
12pub mod source;
13pub mod update;
14
15use std::{
16    collections::VecDeque,
17    future::Future,
18    net::{SocketAddr, TcpListener},
19    pin::Pin,
20    sync::Arc,
21    time::{Duration, Instant},
22};
23
24use anyhow::{Context, Result};
25use hyper::{
26    Request, Response, Server,
27    server::{Builder, conn::AddrIncoming},
28    service::{make_service_fn, service_fn},
29};
30use parking_lot::Mutex;
31use socket2::{Domain, Protocol, Socket, Type};
32use tokio::task::JoinHandle;
33use tracing::{Instrument, Level, Span, event, info_span};
34use turbo_tasks::{
35    NonLocalValue, OperationVc, TurboTasksApi, Vc, apply_effects, run_once_with_reason,
36    trace::TraceRawVcs, util::FormatDuration,
37};
38use turbopack_core::{
39    error::PrettyPrintError,
40    issue::{IssueReporter, IssueSeverity, handle_issues},
41};
42
43use self::{source::ContentSource, update::UpdateServer};
44use crate::{
45    invalidation::{ServerRequest, ServerRequestSideEffects},
46    source::ContentSourceSideEffect,
47};
48
49pub trait SourceProvider: Send + Clone + 'static {
50    /// must call a turbo-tasks function internally
51    fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
52}
53
54impl<T> SourceProvider for T
55where
56    T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
57{
58    fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
59        self()
60    }
61}
62
63#[derive(TraceRawVcs, Debug, NonLocalValue)]
64pub struct DevServerBuilder {
65    #[turbo_tasks(trace_ignore)]
66    pub addr: SocketAddr,
67    #[turbo_tasks(trace_ignore)]
68    server: Builder<AddrIncoming>,
69}
70
71#[derive(TraceRawVcs, NonLocalValue)]
72pub struct DevServer {
73    #[turbo_tasks(trace_ignore)]
74    pub addr: SocketAddr,
75    #[turbo_tasks(trace_ignore)]
76    pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
77}
78
79impl DevServer {
80    pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
81        // This is annoying. The hyper::Server doesn't allow us to know which port was
82        // bound (until we build it with a request handler) when using the standard
83        // `server::try_bind` approach. This is important when binding the `0` port,
84        // because the OS will remap that to an actual free port, and we need to know
85        // that port before we build the request handler. So we need to construct a
86        // real TCP listener, see if it bound, and get its bound address.
87        let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
88            .context("unable to create socket")?;
89        // Allow the socket to be reused immediately after closing. This ensures that
90        // the dev server can be restarted on the same address without a buffer time for
91        // the OS to release the socket.
92        // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
93        #[cfg(not(windows))]
94        let _ = socket.set_reuse_address(true);
95        if matches!(addr, SocketAddr::V6(_)) {
96            // When possible bind to v4 and v6, otherwise ignore the error
97            let _ = socket.set_only_v6(false);
98        }
99        let sock_addr = addr.into();
100        socket
101            .bind(&sock_addr)
102            .context("not able to bind address")?;
103        socket.listen(128).context("not able to listen on socket")?;
104
105        let listener: TcpListener = socket.into();
106        let addr = listener
107            .local_addr()
108            .context("not able to get bound address")?;
109        let server = Server::from_tcp(listener).context("Not able to start server")?;
110        Ok(DevServerBuilder { addr, server })
111    }
112}
113
114impl DevServerBuilder {
115    pub fn serve(
116        self,
117        turbo_tasks: Arc<dyn TurboTasksApi>,
118        source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
119        get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
120    ) -> DevServer {
121        let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
122            Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
123        >::with_capacity(16)));
124        let make_svc = make_service_fn(move |_| {
125            let tt = turbo_tasks.clone();
126            let source_provider = source_provider.clone();
127            let get_issue_reporter = get_issue_reporter.clone();
128            let ongoing_side_effects = ongoing_side_effects.clone();
129            async move {
130                let handler = move |request: Request<hyper::Body>| {
131                    let request_span = info_span!(parent: None, "request", name = ?request.uri());
132                    let start = Instant::now();
133                    let tt = tt.clone();
134                    let get_issue_reporter = get_issue_reporter.clone();
135                    let ongoing_side_effects = ongoing_side_effects.clone();
136                    let source_provider = source_provider.clone();
137                    let future = async move {
138                        event!(parent: Span::current(), Level::DEBUG, "request start");
139                        // Wait until all ongoing side effects are completed
140                        // We only need to wait for the ongoing side effects that were started
141                        // before this request. Later added side effects are not relevant for this.
142                        let current_ongoing_side_effects = {
143                            // Cleanup the ongoing_side_effects list
144                            let mut guard = ongoing_side_effects.lock();
145                            while let Some(front) = guard.front() {
146                                let Ok(front_guard) = front.try_lock() else {
147                                    break;
148                                };
149                                if front_guard.is_some() {
150                                    break;
151                                }
152                                drop(front_guard);
153                                guard.pop_front();
154                            }
155                            // Get a clone of the remaining list
156                            (*guard).clone()
157                        };
158                        // Wait for the side effects to complete
159                        for side_effect_mutex in current_ongoing_side_effects {
160                            let mut guard = side_effect_mutex.lock().await;
161                            if let Some(join_handle) = guard.take() {
162                                join_handle.await??;
163                            }
164                            drop(guard);
165                        }
166                        let reason = ServerRequest {
167                            method: request.method().clone(),
168                            uri: request.uri().clone(),
169                        };
170                        let side_effects_reason = ServerRequestSideEffects {
171                            method: request.method().clone(),
172                            uri: request.uri().clone(),
173                        };
174                        run_once_with_reason(tt.clone(), reason, async move {
175                            // TODO: `get_issue_reporter` should be an `OperationVc`, as there's a
176                            // risk it could be a task-local Vc, which is not safe for us to await.
177                            let issue_reporter = get_issue_reporter();
178
179                            if hyper_tungstenite::is_upgrade_request(&request) {
180                                let uri = request.uri();
181                                let path = uri.path();
182
183                                if path == "/turbopack-hmr" {
184                                    let (response, websocket) =
185                                        hyper_tungstenite::upgrade(request, None)?;
186                                    let update_server =
187                                        UpdateServer::new(source_provider, issue_reporter);
188                                    update_server.run(&*tt, websocket);
189                                    return Ok(response);
190                                }
191
192                                println!("[404] {path} (WebSocket)");
193                                if path == "/_next/webpack-hmr" {
194                                    // Special-case requests to webpack-hmr as these are made by
195                                    // Next.js clients built
196                                    // without turbopack, which may be making requests in
197                                    // development.
198                                    println!(
199                                        "A non-turbopack next.js client is trying to connect."
200                                    );
201                                    println!(
202                                        "Make sure to reload/close any browser window which has \
203                                         been opened without --turbo."
204                                    );
205                                }
206
207                                return Ok(Response::builder()
208                                    .status(404)
209                                    .body(hyper::Body::empty())?);
210                            }
211
212                            let uri = request.uri();
213                            let path = uri.path().to_string();
214                            let source_op = source_provider.get_source();
215                            // HACK: Resolve `source` now so that we can get any issues on it
216                            let _ = source_op.resolve_strongly_consistent().await?;
217                            apply_effects(source_op).await?;
218                            handle_issues(
219                                source_op,
220                                issue_reporter,
221                                IssueSeverity::Fatal,
222                                Some(&path),
223                                Some("get source"),
224                            )
225                            .await?;
226                            let (response, side_effects) =
227                                http::process_request_with_content_source(
228                                    // HACK: pass `source` here (instead of `resolved_source`
229                                    // because the underlying API wants to do it's own
230                                    // `resolve_strongly_consistent` call.
231                                    //
232                                    // It's unlikely (the calls happen one-after-another), but this
233                                    // could cause inconsistency between the reported issues and
234                                    // the generated HTTP response.
235                                    source_op,
236                                    request,
237                                    issue_reporter,
238                                )
239                                .await?;
240                            let status = response.status().as_u16();
241                            let is_error = response.status().is_client_error()
242                                || response.status().is_server_error();
243                            let elapsed = start.elapsed();
244                            if is_error
245                                || (cfg!(feature = "log_request_stats")
246                                    && elapsed > Duration::from_secs(1))
247                            {
248                                println!(
249                                    "[{status}] {path} ({duration})",
250                                    duration = FormatDuration(elapsed)
251                                );
252                            }
253                            if !side_effects.is_empty() {
254                                let join_handle = tokio::spawn(run_once_with_reason(
255                                    tt.clone(),
256                                    side_effects_reason,
257                                    async move {
258                                        for side_effect in side_effects {
259                                            side_effect.apply().await?;
260                                        }
261                                        Ok(())
262                                    },
263                                ));
264                                ongoing_side_effects.lock().push_back(Arc::new(
265                                    tokio::sync::Mutex::new(Some(join_handle)),
266                                ));
267                            }
268                            Ok(response)
269                        })
270                        .await
271                    };
272                    async move {
273                        match future.await {
274                            Ok(r) => Ok::<_, hyper::http::Error>(r),
275                            Err(e) => {
276                                println!(
277                                    "[500] error ({}): {}",
278                                    FormatDuration(start.elapsed()),
279                                    PrettyPrintError(&e),
280                                );
281                                Ok(Response::builder()
282                                    .status(500)
283                                    .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
284                            }
285                        }
286                    }
287                    .instrument(request_span)
288                };
289                anyhow::Ok(service_fn(handler))
290            }
291        });
292        let server = self.server.serve(make_svc);
293
294        DevServer {
295            addr: self.addr,
296            future: Box::pin(async move {
297                server.await?;
298                Ok(())
299            }),
300        }
301    }
302}