Skip to main content

turbopack_dev_server/
lib.rs

1#![feature(min_specialization)]
2#![feature(trait_alias)]
3#![feature(iter_intersperse)]
4#![feature(str_split_remainder)]
5#![feature(arbitrary_self_types)]
6#![feature(arbitrary_self_types_pointers)]
7
8pub mod html;
9mod http;
10pub mod introspect;
11mod invalidation;
12pub mod source;
13pub mod update;
14
15use std::{
16    collections::VecDeque,
17    future::Future,
18    net::{SocketAddr, TcpListener},
19    pin::Pin,
20    sync::Arc,
21    time::{Duration, Instant},
22};
23
24use anyhow::{Context, Result};
25use hyper::{
26    Request, Response, Server,
27    server::{Builder, conn::AddrIncoming},
28    service::{make_service_fn, service_fn},
29};
30use parking_lot::Mutex;
31use socket2::{Domain, Protocol, Socket, Type};
32use tokio::task::JoinHandle;
33use tracing::{Instrument, Level, Span, event, info_span};
34use turbo_tasks::{
35    Effects, NonLocalValue, OperationVc, PrettyPrintError, TurboTasksApi, Vc, get_effects,
36    run_once_with_reason, trace::TraceRawVcs, util::FormatDuration,
37};
38use turbopack_core::issue::{IssueReporter, IssueSeverity, handle_issues};
39
40use self::{source::ContentSource, update::UpdateServer};
41use crate::{
42    invalidation::{ServerRequest, ServerRequestSideEffects},
43    source::ContentSourceSideEffect,
44};
45
46pub trait SourceProvider: Send + Clone + 'static {
47    /// must call a turbo-tasks function internally
48    fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
49}
50
51impl<T> SourceProvider for T
52where
53    T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
54{
55    fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
56        self()
57    }
58}
59
60#[turbo_tasks::value(serialization = "none")]
61struct ContentSourceWithIssues {
62    source_op: OperationVc<Box<dyn ContentSource>>,
63    effects: Effects,
64}
65
66#[turbo_tasks::function(operation)]
67async fn get_source_with_issues_operation(
68    source_op: OperationVc<Box<dyn ContentSource>>,
69) -> Result<Vc<ContentSourceWithIssues>> {
70    let _ = source_op.resolve_strongly_consistent().await?;
71    let effects = get_effects(source_op).await?;
72    Ok(ContentSourceWithIssues { source_op, effects }.cell())
73}
74
75#[derive(TraceRawVcs, Debug, NonLocalValue)]
76pub struct DevServerBuilder {
77    #[turbo_tasks(trace_ignore)]
78    pub addr: SocketAddr,
79    #[turbo_tasks(trace_ignore)]
80    server: Builder<AddrIncoming>,
81}
82
83#[derive(TraceRawVcs, NonLocalValue)]
84pub struct DevServer {
85    #[turbo_tasks(trace_ignore)]
86    pub addr: SocketAddr,
87    #[turbo_tasks(trace_ignore)]
88    pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89}
90
91impl DevServer {
92    pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
93        // This is annoying. The hyper::Server doesn't allow us to know which port was
94        // bound (until we build it with a request handler) when using the standard
95        // `server::try_bind` approach. This is important when binding the `0` port,
96        // because the OS will remap that to an actual free port, and we need to know
97        // that port before we build the request handler. So we need to construct a
98        // real TCP listener, see if it bound, and get its bound address.
99        let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
100            .context("unable to create socket")?;
101        // Allow the socket to be reused immediately after closing. This ensures that
102        // the dev server can be restarted on the same address without a buffer time for
103        // the OS to release the socket.
104        // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
105        #[cfg(not(windows))]
106        let _ = socket.set_reuse_address(true);
107        if matches!(addr, SocketAddr::V6(_)) {
108            // When possible bind to v4 and v6, otherwise ignore the error
109            let _ = socket.set_only_v6(false);
110        }
111        let sock_addr = addr.into();
112        socket
113            .bind(&sock_addr)
114            .context("not able to bind address")?;
115        socket.listen(128).context("not able to listen on socket")?;
116
117        let listener: TcpListener = socket.into();
118        let addr = listener
119            .local_addr()
120            .context("not able to get bound address")?;
121        let server = Server::from_tcp(listener).context("Not able to start server")?;
122        Ok(DevServerBuilder { addr, server })
123    }
124}
125
126impl DevServerBuilder {
127    pub fn serve(
128        self,
129        turbo_tasks: Arc<dyn TurboTasksApi>,
130        source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
131        get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
132    ) -> DevServer {
133        let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
134            Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
135        >::with_capacity(16)));
136        let make_svc = make_service_fn(move |_| {
137            let tt = turbo_tasks.clone();
138            let source_provider = source_provider.clone();
139            let get_issue_reporter = get_issue_reporter.clone();
140            let ongoing_side_effects = ongoing_side_effects.clone();
141            async move {
142                let handler = move |request: Request<hyper::Body>| {
143                    let request_span = info_span!(parent: None, "request", name = ?request.uri());
144                    let start = Instant::now();
145                    let tt = tt.clone();
146                    let get_issue_reporter = get_issue_reporter.clone();
147                    let ongoing_side_effects = ongoing_side_effects.clone();
148                    let source_provider = source_provider.clone();
149                    let future = async move {
150                        event!(parent: Span::current(), Level::DEBUG, "request start");
151                        // Wait until all ongoing side effects are completed
152                        // We only need to wait for the ongoing side effects that were started
153                        // before this request. Later added side effects are not relevant for this.
154                        let current_ongoing_side_effects = {
155                            // Cleanup the ongoing_side_effects list
156                            let mut guard = ongoing_side_effects.lock();
157                            while let Some(front) = guard.front() {
158                                let Ok(front_guard) = front.try_lock() else {
159                                    break;
160                                };
161                                if front_guard.is_some() {
162                                    break;
163                                }
164                                drop(front_guard);
165                                guard.pop_front();
166                            }
167                            // Get a clone of the remaining list
168                            (*guard).clone()
169                        };
170                        // Wait for the side effects to complete
171                        for side_effect_mutex in current_ongoing_side_effects {
172                            let mut guard = side_effect_mutex.lock().await;
173                            if let Some(join_handle) = guard.take() {
174                                join_handle.await??;
175                            }
176                            drop(guard);
177                        }
178                        let reason = ServerRequest {
179                            method: request.method().clone(),
180                            uri: request.uri().clone(),
181                        };
182                        let side_effects_reason = ServerRequestSideEffects {
183                            method: request.method().clone(),
184                            uri: request.uri().clone(),
185                        };
186                        run_once_with_reason(tt.clone(), reason, async move {
187                            // TODO: `get_issue_reporter` should be an `OperationVc`, as there's a
188                            // risk it could be a task-local Vc, which is not safe for us to await.
189                            let issue_reporter = get_issue_reporter();
190
191                            if hyper_tungstenite::is_upgrade_request(&request) {
192                                let uri = request.uri();
193                                let path = uri.path();
194
195                                if path == "/turbopack-hmr" {
196                                    let (response, websocket) =
197                                        hyper_tungstenite::upgrade(request, None)?;
198                                    let update_server =
199                                        UpdateServer::new(source_provider, issue_reporter);
200                                    update_server.run(&*tt, websocket);
201                                    return Ok(response);
202                                }
203
204                                println!("[404] {path} (WebSocket)");
205                                if path == "/_next/hmr" {
206                                    // Special-case requests to hmr as these are made by
207                                    // Next.js clients built
208                                    // without turbopack, which may be making requests in
209                                    // development.
210                                    println!(
211                                        "A non-turbopack next.js client is trying to connect."
212                                    );
213                                    println!(
214                                        "Make sure to reload/close any browser window which has \
215                                         been opened without --turbo."
216                                    );
217                                }
218
219                                return Ok(Response::builder()
220                                    .status(404)
221                                    .body(hyper::Body::empty())?);
222                            }
223
224                            let uri = request.uri();
225                            let path = uri.path().to_string();
226                            let source_with_issues_op =
227                                get_source_with_issues_operation(source_provider.get_source());
228                            let ContentSourceWithIssues { source_op, effects } =
229                                &*source_with_issues_op.read_strongly_consistent().await?;
230                            effects.apply().await?;
231                            handle_issues(
232                                source_with_issues_op,
233                                issue_reporter,
234                                IssueSeverity::Fatal,
235                                Some(&path),
236                                Some("get source"),
237                            )
238                            .await?;
239                            let (response, side_effects) =
240                                http::process_request_with_content_source(
241                                    // HACK: pass `source` here (instead of `resolved_source`
242                                    // because the underlying API wants to do it's own
243                                    // `resolve_strongly_consistent` call.
244                                    //
245                                    // It's unlikely (the calls happen one-after-another), but this
246                                    // could cause inconsistency between the reported issues and
247                                    // the generated HTTP response.
248                                    *source_op,
249                                    request,
250                                    issue_reporter,
251                                )
252                                .await?;
253                            let status = response.status().as_u16();
254                            let is_error = response.status().is_client_error()
255                                || response.status().is_server_error();
256                            let elapsed = start.elapsed();
257                            if is_error
258                                || (cfg!(feature = "log_request_stats")
259                                    && elapsed > Duration::from_secs(1))
260                            {
261                                println!(
262                                    "[{status}] {path} ({duration})",
263                                    duration = FormatDuration(elapsed)
264                                );
265                            }
266                            if !side_effects.is_empty() {
267                                let join_handle = tokio::spawn(run_once_with_reason(
268                                    tt.clone(),
269                                    side_effects_reason,
270                                    async move {
271                                        for side_effect in side_effects {
272                                            side_effect.apply().await?;
273                                        }
274                                        Ok(())
275                                    },
276                                ));
277                                ongoing_side_effects.lock().push_back(Arc::new(
278                                    tokio::sync::Mutex::new(Some(join_handle)),
279                                ));
280                            }
281                            Ok(response)
282                        })
283                        .await
284                    };
285                    async move {
286                        match future.await {
287                            Ok(r) => Ok::<_, hyper::http::Error>(r),
288                            Err(e) => {
289                                println!(
290                                    "[500] error ({}): {}",
291                                    FormatDuration(start.elapsed()),
292                                    PrettyPrintError(&e),
293                                );
294                                Ok(Response::builder()
295                                    .status(500)
296                                    .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
297                            }
298                        }
299                    }
300                    .instrument(request_span)
301                };
302                anyhow::Ok(service_fn(handler))
303            }
304        });
305        let server = self.server.serve(make_svc);
306
307        DevServer {
308            addr: self.addr,
309            future: Box::pin(async move {
310                server.await?;
311                Ok(())
312            }),
313        }
314    }
315}