Skip to main content

turbopack_dev_server/
lib.rs

1#![feature(min_specialization)]
2#![feature(str_split_remainder)]
3#![feature(arbitrary_self_types)]
4#![feature(arbitrary_self_types_pointers)]
5
6pub mod html;
7mod http;
8pub mod introspect;
9mod invalidation;
10pub mod source;
11pub mod update;
12
13use std::{
14    collections::VecDeque,
15    future::Future,
16    net::{SocketAddr, TcpListener},
17    pin::Pin,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use anyhow::{Context, Result};
23use hyper::{
24    Request, Response, Server,
25    server::{Builder, conn::AddrIncoming},
26    service::{make_service_fn, service_fn},
27};
28use parking_lot::Mutex;
29use socket2::{Domain, Protocol, Socket, Type};
30use tokio::task::JoinHandle;
31use tracing::{Instrument, Level, Span, event, info_span};
32use turbo_tasks::{
33    Effects, NonLocalValue, OperationVc, PrettyPrintError, TurboTasksApi, Vc,
34    read_strongly_consistent_and_apply_effects, run_once_with_reason, take_effects,
35    trace::TraceRawVcs, util::FormatDuration,
36};
37use turbopack_core::issue::{IssueReporter, IssueSeverity, handle_issues};
38
39use self::{source::ContentSource, update::UpdateServer};
40use crate::{
41    invalidation::{ServerRequest, ServerRequestSideEffects},
42    source::ContentSourceSideEffect,
43};
44
45pub trait SourceProvider: Send + Clone + 'static {
46    /// must call a turbo-tasks function internally
47    fn get_source(&self) -> OperationVc<Box<dyn ContentSource>>;
48}
49
50impl<T> SourceProvider for T
51where
52    T: Fn() -> OperationVc<Box<dyn ContentSource>> + Send + Clone + 'static,
53{
54    fn get_source(&self) -> OperationVc<Box<dyn ContentSource>> {
55        self()
56    }
57}
58
59#[turbo_tasks::value(serialization = "skip")]
60struct ContentSourceWithIssues {
61    source_op: OperationVc<Box<dyn ContentSource>>,
62    effects: Effects,
63}
64
65#[turbo_tasks::function(operation, root)]
66async fn get_source_with_issues_operation(
67    source_op: OperationVc<Box<dyn ContentSource>>,
68) -> Result<Vc<ContentSourceWithIssues>> {
69    let _ = source_op.resolve().strongly_consistent().await?;
70    let effects = take_effects(source_op).await?;
71    Ok(ContentSourceWithIssues { source_op, effects }.cell())
72}
73
74#[derive(TraceRawVcs, Debug, NonLocalValue)]
75pub struct DevServerBuilder {
76    #[turbo_tasks(trace_ignore)]
77    pub addr: SocketAddr,
78    #[turbo_tasks(trace_ignore)]
79    server: Builder<AddrIncoming>,
80}
81
82#[derive(TraceRawVcs, NonLocalValue)]
83pub struct DevServer {
84    #[turbo_tasks(trace_ignore)]
85    pub addr: SocketAddr,
86    #[turbo_tasks(trace_ignore)]
87    pub future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
88}
89
90impl DevServer {
91    pub fn listen(addr: SocketAddr) -> Result<DevServerBuilder, anyhow::Error> {
92        // This is annoying. The hyper::Server doesn't allow us to know which port was
93        // bound (until we build it with a request handler) when using the standard
94        // `server::try_bind` approach. This is important when binding the `0` port,
95        // because the OS will remap that to an actual free port, and we need to know
96        // that port before we build the request handler. So we need to construct a
97        // real TCP listener, see if it bound, and get its bound address.
98        let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
99            .context("unable to create socket")?;
100        // Allow the socket to be reused immediately after closing. This ensures that
101        // the dev server can be restarted on the same address without a buffer time for
102        // the OS to release the socket.
103        // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
104        #[cfg(not(windows))]
105        let _ = socket.set_reuse_address(true);
106        if matches!(addr, SocketAddr::V6(_)) {
107            // When possible bind to v4 and v6, otherwise ignore the error
108            let _ = socket.set_only_v6(false);
109        }
110        let sock_addr = addr.into();
111        socket
112            .bind(&sock_addr)
113            .context("not able to bind address")?;
114        socket.listen(128).context("not able to listen on socket")?;
115
116        let listener: TcpListener = socket.into();
117        let addr = listener
118            .local_addr()
119            .context("not able to get bound address")?;
120        let server = Server::from_tcp(listener).context("Not able to start server")?;
121        Ok(DevServerBuilder { addr, server })
122    }
123}
124
125impl DevServerBuilder {
126    pub fn serve(
127        self,
128        turbo_tasks: Arc<dyn TurboTasksApi>,
129        source_provider: impl SourceProvider + NonLocalValue + TraceRawVcs + Sync,
130        get_issue_reporter: Arc<dyn Fn() -> Vc<Box<dyn IssueReporter>> + Send + Sync>,
131    ) -> DevServer {
132        let ongoing_side_effects = Arc::new(Mutex::new(VecDeque::<
133            Arc<tokio::sync::Mutex<Option<JoinHandle<Result<()>>>>>,
134        >::with_capacity(16)));
135        let make_svc = make_service_fn(move |_| {
136            let tt = turbo_tasks.clone();
137            let source_provider = source_provider.clone();
138            let get_issue_reporter = get_issue_reporter.clone();
139            let ongoing_side_effects = ongoing_side_effects.clone();
140            async move {
141                let handler = move |request: Request<hyper::Body>| {
142                    let request_span = info_span!(parent: None, "request", name = ?request.uri());
143                    let start = Instant::now();
144                    let tt = tt.clone();
145                    let get_issue_reporter = get_issue_reporter.clone();
146                    let ongoing_side_effects = ongoing_side_effects.clone();
147                    let source_provider = source_provider.clone();
148                    let future = async move {
149                        event!(parent: Span::current(), Level::DEBUG, "request start");
150                        // Wait until all ongoing side effects are completed
151                        // We only need to wait for the ongoing side effects that were started
152                        // before this request. Later added side effects are not relevant for this.
153                        let current_ongoing_side_effects = {
154                            // Cleanup the ongoing_side_effects list
155                            let mut guard = ongoing_side_effects.lock();
156                            while let Some(front) = guard.front() {
157                                let Ok(front_guard) = front.try_lock() else {
158                                    break;
159                                };
160                                if front_guard.is_some() {
161                                    break;
162                                }
163                                drop(front_guard);
164                                guard.pop_front();
165                            }
166                            // Get a clone of the remaining list
167                            (*guard).clone()
168                        };
169                        // Wait for the side effects to complete
170                        for side_effect_mutex in current_ongoing_side_effects {
171                            let mut guard = side_effect_mutex.lock().await;
172                            if let Some(join_handle) = guard.take() {
173                                join_handle.await??;
174                            }
175                            drop(guard);
176                        }
177                        let reason = ServerRequest {
178                            method: request.method().clone(),
179                            uri: request.uri().clone(),
180                        };
181                        let side_effects_reason = ServerRequestSideEffects {
182                            method: request.method().clone(),
183                            uri: request.uri().clone(),
184                        };
185                        run_once_with_reason(tt.clone(), reason, async move {
186                            // TODO: `get_issue_reporter` should be an `OperationVc`, as there's a
187                            // risk it could be a task-local Vc, which is not safe for us to await.
188                            let issue_reporter = get_issue_reporter();
189
190                            if hyper_tungstenite::is_upgrade_request(&request) {
191                                let uri = request.uri();
192                                let path = uri.path();
193
194                                if path == "/turbopack-hmr" {
195                                    let (response, websocket) =
196                                        hyper_tungstenite::upgrade(request, None)?;
197                                    let update_server =
198                                        UpdateServer::new(source_provider, issue_reporter);
199                                    update_server.run(&*tt, websocket);
200                                    return Ok(response);
201                                }
202
203                                println!("[404] {path} (WebSocket)");
204                                if path == "/_next/hmr" {
205                                    // Special-case requests to hmr as these are made by
206                                    // Next.js clients built
207                                    // without turbopack, which may be making requests in
208                                    // development.
209                                    println!(
210                                        "A non-turbopack next.js client is trying to connect."
211                                    );
212                                    println!(
213                                        "Make sure to reload/close any browser window which has \
214                                         been opened without --turbo."
215                                    );
216                                }
217
218                                return Ok(Response::builder()
219                                    .status(404)
220                                    .body(hyper::Body::empty())?);
221                            }
222
223                            let uri = request.uri();
224                            let path = uri.path().to_string();
225                            let source_with_issues_op =
226                                get_source_with_issues_operation(source_provider.get_source());
227                            let read = read_strongly_consistent_and_apply_effects(
228                                source_with_issues_op,
229                                |v| &v.effects,
230                            )
231                            .await?;
232                            let ContentSourceWithIssues { source_op, .. } = &*read;
233                            handle_issues(
234                                source_with_issues_op,
235                                issue_reporter,
236                                IssueSeverity::Fatal,
237                                Some(&path),
238                                Some("get source"),
239                            )
240                            .await?;
241                            let (response, side_effects) =
242                                http::process_request_with_content_source(
243                                    // HACK: pass `source` here (instead of `resolved_source`
244                                    // because the underlying API wants to do it's own
245                                    // `.resolve().strongly_consistent()` call.
246                                    //
247                                    // It's unlikely (the calls happen one-after-another), but this
248                                    // could cause inconsistency between the reported issues and
249                                    // the generated HTTP response.
250                                    *source_op,
251                                    request,
252                                    issue_reporter,
253                                )
254                                .await?;
255                            let status = response.status().as_u16();
256                            let is_error = response.status().is_client_error()
257                                || response.status().is_server_error();
258                            let elapsed = start.elapsed();
259                            if is_error
260                                || (cfg!(feature = "log_request_stats")
261                                    && elapsed > Duration::from_secs(1))
262                            {
263                                println!(
264                                    "[{status}] {path} ({duration})",
265                                    duration = FormatDuration(elapsed)
266                                );
267                            }
268                            if !side_effects.is_empty() {
269                                let join_handle = tokio::spawn(run_once_with_reason(
270                                    tt.clone(),
271                                    side_effects_reason,
272                                    async move {
273                                        for side_effect in side_effects {
274                                            side_effect.apply().await?;
275                                        }
276                                        Ok(())
277                                    },
278                                ));
279                                ongoing_side_effects.lock().push_back(Arc::new(
280                                    tokio::sync::Mutex::new(Some(join_handle)),
281                                ));
282                            }
283                            Ok(response)
284                        })
285                        .await
286                    };
287                    async move {
288                        match future.await {
289                            Ok(r) => Ok::<_, hyper::http::Error>(r),
290                            Err(e) => {
291                                println!(
292                                    "[500] error ({}): {}",
293                                    FormatDuration(start.elapsed()),
294                                    PrettyPrintError(&e),
295                                );
296                                Ok(Response::builder()
297                                    .status(500)
298                                    .body(hyper::Body::from(format!("{}", PrettyPrintError(&e))))?)
299                            }
300                        }
301                    }
302                    .instrument(request_span)
303                };
304                anyhow::Ok(service_fn(handler))
305            }
306        });
307        let server = self.server.serve(make_svc);
308
309        DevServer {
310            addr: self.addr,
311            future: Box::pin(async move {
312                server.await?;
313                Ok(())
314            }),
315        }
316    }
317}