Skip to main content

turbopack_node/process_pool/
mod.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    mem::take,
5    path::{Path, PathBuf},
6    process::{ExitStatus, Stdio},
7    sync::{Arc, LazyLock},
8    time::{Duration, Instant},
9};
10
11use anyhow::{Context, Result, bail};
12use bytes::Bytes;
13use futures::join;
14use owo_colors::OwoColorize;
15use parking_lot::Mutex;
16use rustc_hash::FxHashMap;
17use tokio::{
18    io::{
19        AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stderr,
20        Stdout, stderr, stdout,
21    },
22    net::{TcpListener, TcpStream},
23    process::{Child, ChildStderr, ChildStdout, Command},
24    select,
25    sync::Semaphore,
26    time::{sleep, timeout},
27};
28use turbo_rcstr::{RcStr, rcstr};
29use turbo_tasks::{FxIndexSet, ResolvedVc, Vc, duration_span};
30use turbo_tasks_fs::FileSystemPath;
31use turbopack_ecmascript::magic_identifier::unmangle_identifiers;
32
33use crate::{
34    AssetsForSourceMapping,
35    backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
36    evaluate::{EvaluateOperation, EvaluatePool, Operation},
37    format::FormattingMode,
38    pool_stats::{AcquiredPermits, NodeJsPoolStats, PoolStatsSnapshot},
39    source_map::apply_source_mapping,
40};
41
42mod heap_queue;
43use heap_queue::HeapQueue;
44
45struct NodeJsPoolProcess {
46    child: Option<Child>,
47    connection: TcpStream,
48    stdout_handler: OutputStreamHandler<ChildStdout, Stdout>,
49    stderr_handler: OutputStreamHandler<ChildStderr, Stderr>,
50    /// Shared ring buffer of recent stdout lines; lives independently of
51    /// `stdout_handler` so the post-mortem on a recv failure can still read
52    /// it after the handler future has returned an error.
53    recent_stdout: RecentLines,
54    recent_stderr: RecentLines,
55    debug: bool,
56    cpu_time_invested: Duration,
57}
58
59impl Ord for NodeJsPoolProcess {
60    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61        self.cpu_time_invested
62            .cmp(&other.cpu_time_invested)
63            .then_with(|| {
64                self.child
65                    .as_ref()
66                    .map(|c| c.id())
67                    .cmp(&other.child.as_ref().map(|c| c.id()))
68            })
69    }
70}
71
72impl PartialOrd for NodeJsPoolProcess {
73    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
74        Some(self.cmp(other))
75    }
76}
77
78impl Eq for NodeJsPoolProcess {}
79
80impl PartialEq for NodeJsPoolProcess {
81    fn eq(&self, other: &Self) -> bool {
82        self.cmp(other) == std::cmp::Ordering::Equal
83    }
84}
85
86const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
87
88#[derive(Clone, PartialEq, Eq, Hash)]
89struct OutputEntry {
90    data: Arc<[u8]>,
91    stack_trace: Option<Arc<[u8]>>,
92}
93
94type SharedOutputSet = Arc<Mutex<FxIndexSet<(OutputEntry, u32)>>>;
95
96static GLOBAL_OUTPUT_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
97static MARKER: &[u8] = b"TURBOPACK_OUTPUT_";
98static MARKER_STR: &str = "TURBOPACK_OUTPUT_";
99
100/// Maximum number of recent output lines retained per stream for diagnostic
101/// purposes — included in the error message when the subprocess crashes
102/// before completing a recv.
103const RECENT_OUTPUT_LINES: usize = 100;
104
105/// A bounded ring buffer of recent lines from a child stream. Shared with the
106/// owning [`NodeJsPoolProcess`] so that the post-mortem code on a recv failure
107/// can read what the child wrote even if the handler future has already
108/// returned an error (and dropped its local state).
109type RecentLines = Arc<Mutex<VecDeque<Vec<u8>>>>;
110
111struct OutputStreamHandler<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
112    stream: BufReader<R>,
113    shared: SharedOutputSet,
114    assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
115    root: FileSystemPath,
116    project_dir: FileSystemPath,
117    final_stream: W,
118    recent_lines: RecentLines,
119}
120
121impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> OutputStreamHandler<R, W> {
122    /// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate
123    /// lines that has beem emitted by other [OutputStreamHandler] instances
124    /// with the same `shared` before.
125    /// Returns when one operation is done.
126    pub async fn handle_operation(&mut self) -> Result<()> {
127        let Self {
128            stream,
129            shared,
130            assets_for_source_mapping,
131            root,
132            project_dir,
133            final_stream,
134            recent_lines,
135        } = self;
136
137        async fn write_final<W: AsyncWrite + Unpin>(
138            mut bytes: &[u8],
139            final_stream: &mut W,
140        ) -> Result<()> {
141            let _lock = GLOBAL_OUTPUT_LOCK.lock().await;
142            while !bytes.is_empty() {
143                let count = final_stream.write(bytes).await?;
144                if count == 0 {
145                    bail!("Failed to write to final stream as it was closed");
146                }
147                bytes = &bytes[count..];
148            }
149            Ok(())
150        }
151
152        async fn write_source_mapped_final<W: AsyncWrite + Unpin>(
153            bytes: &[u8],
154            assets_for_source_mapping: Vc<AssetsForSourceMapping>,
155            root: FileSystemPath,
156            project_dir: FileSystemPath,
157            final_stream: &mut W,
158        ) -> Result<()> {
159            if let Ok(text) = std::str::from_utf8(bytes) {
160                let text = unmangle_identifiers(text, |content| {
161                    format!("{{{content}}}").italic().to_string()
162                });
163                match apply_source_mapping(
164                    text.as_ref(),
165                    assets_for_source_mapping,
166                    root,
167                    project_dir,
168                    FormattingMode::AnsiColors,
169                )
170                .await
171                {
172                    Err(e) => {
173                        write_final(
174                            format!("Error applying source mapping: {e}\n").as_bytes(),
175                            final_stream,
176                        )
177                        .await?;
178                        write_final(text.as_bytes(), final_stream).await?;
179                    }
180                    Ok(text) => {
181                        write_final(text.as_bytes(), final_stream).await?;
182                    }
183                }
184            } else {
185                write_final(bytes, final_stream).await?;
186            }
187            Ok(())
188        }
189
190        let mut buffer = Vec::new();
191        let mut own_output = FxHashMap::default();
192        let mut nesting: u32 = 0;
193        let mut in_stack = None;
194        let mut stack_trace_buffer = Vec::new();
195        loop {
196            let start = buffer.len();
197            if stream
198                .read_until(b'\n', &mut buffer)
199                .await
200                .context("error reading from stream")?
201                == 0
202            {
203                bail!("stream closed unexpectedly")
204            }
205            // Mirror the just-read line into the recent-lines ring buffer for
206            // diagnostic capture on subprocess crash. Skip protocol markers.
207            let line_is_marker = buffer.len() - start == MARKER.len() + 2
208                && &buffer[start..buffer.len() - 2] == MARKER;
209            if !line_is_marker {
210                let mut lines = recent_lines.lock();
211                if lines.len() >= RECENT_OUTPUT_LINES {
212                    lines.pop_front();
213                }
214                lines.push_back(buffer[start..].to_vec());
215            } else {
216                // This is new line
217                buffer.pop();
218                // This is the type
219                match buffer.pop() {
220                    Some(b'B') => {
221                        stack_trace_buffer.clear();
222                        buffer.truncate(start);
223                        nesting += 1;
224                        in_stack = None;
225                        continue;
226                    }
227                    Some(b'E') => {
228                        buffer.truncate(start);
229                        if let Some(in_stack) = in_stack {
230                            if nesting != 0 {
231                                stack_trace_buffer = buffer[in_stack..].to_vec();
232                            }
233                            buffer.truncate(in_stack);
234                        }
235                        nesting = nesting.saturating_sub(1);
236                        in_stack = None;
237                        if nesting == 0 {
238                            let line = Arc::from(take(&mut buffer).into_boxed_slice());
239                            let stack_trace = if stack_trace_buffer.is_empty() {
240                                None
241                            } else {
242                                Some(Arc::from(take(&mut stack_trace_buffer).into_boxed_slice()))
243                            };
244                            let entry = OutputEntry {
245                                data: line,
246                                stack_trace,
247                            };
248                            let occurrence_number = *own_output
249                                .entry(entry.clone())
250                                .and_modify(|c| *c += 1)
251                                .or_insert(0);
252                            let new_entry = {
253                                let mut shared = shared.lock();
254                                shared.insert((entry.clone(), occurrence_number))
255                            };
256                            if !new_entry {
257                                // This line has been printed by another process, so we don't need
258                                // to print it again.
259                                continue;
260                            }
261                            write_source_mapped_final(
262                                &entry.data,
263                                **assets_for_source_mapping,
264                                root.clone(),
265                                project_dir.clone(),
266                                final_stream,
267                            )
268                            .await?;
269                        }
270                    }
271                    Some(b'S') => {
272                        buffer.truncate(start);
273                        in_stack = Some(start);
274                        continue;
275                    }
276                    Some(b'D') => {
277                        // operation done
278                        break;
279                    }
280                    _ => {}
281                }
282            }
283            if nesting != 0 {
284                // When inside of a marked output we want to aggregate until the end marker
285                continue;
286            }
287
288            write_source_mapped_final(
289                &buffer,
290                **assets_for_source_mapping,
291                root.clone(),
292                project_dir.clone(),
293                final_stream,
294            )
295            .await?;
296            buffer.clear();
297        }
298        Ok(())
299    }
300}
301
302impl NodeJsPoolProcess {
303    async fn new(
304        cwd: &Path,
305        env: &FxHashMap<RcStr, RcStr>,
306        entrypoint: &Path,
307        assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
308        assets_root: FileSystemPath,
309        project_dir: FileSystemPath,
310        shared_stdout: SharedOutputSet,
311        shared_stderr: SharedOutputSet,
312        debug: bool,
313    ) -> Result<Self> {
314        let guard = duration_span!("Node.js process startup");
315        let listener = TcpListener::bind("127.0.0.1:0")
316            .await
317            .context("binding to a port")?;
318        let port = listener.local_addr().context("getting port")?.port();
319        let mut cmd = Command::new("node");
320        cmd.current_dir(cwd);
321        if debug {
322            cmd.arg("--inspect-brk");
323        }
324        cmd.arg(entrypoint);
325        cmd.arg(port.to_string());
326        cmd.env_clear();
327        cmd.env(
328            "PATH",
329            std::env::var("PATH").expect("the PATH environment variable should always be set"),
330        );
331        #[cfg(target_family = "windows")]
332        cmd.env(
333            "SystemRoot",
334            std::env::var("SystemRoot")
335                .expect("the SystemRoot environment variable should always be set"),
336        );
337        cmd.envs(env);
338        cmd.stderr(Stdio::piped());
339        cmd.stdout(Stdio::piped());
340        cmd.kill_on_drop(true);
341
342        let mut child = cmd.spawn().context("spawning node pooled process")?;
343
344        let timeout = if debug {
345            Duration::MAX
346        } else {
347            CONNECT_TIMEOUT
348        };
349
350        async fn get_output(child: &mut Child) -> Result<(String, String)> {
351            let mut stdout = Vec::new();
352            let mut stderr = Vec::new();
353            child
354                .stdout
355                .take()
356                .unwrap()
357                .read_to_end(&mut stdout)
358                .await?;
359            child
360                .stderr
361                .take()
362                .unwrap()
363                .read_to_end(&mut stderr)
364                .await?;
365            fn clean(buffer: Vec<u8>) -> Result<String> {
366                Ok(String::from_utf8(buffer)?
367                    .lines()
368                    .filter(|line| {
369                        line.len() != MARKER_STR.len() + 1 && !line.starts_with(MARKER_STR)
370                    })
371                    .collect::<Vec<_>>()
372                    .join("\n"))
373            }
374            Ok((clean(stdout)?, clean(stderr)?))
375        }
376
377        let (connection, _) = select! {
378            connection = listener.accept() => connection.context("accepting connection")?,
379            status = child.wait() => {
380                match status {
381                    Ok(status) => {
382                        let (stdout, stderr) = get_output(&mut child).await?;
383                        bail!("node process exited before we could connect to it with {status}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
384                    }
385                    Err(err) => {
386                        let _ = child.start_kill();
387                        let (stdout, stderr) = get_output(&mut child).await?;
388                        bail!("node process exited before we could connect to it: {err:?}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
389                    },
390                }
391            },
392            _ = sleep(timeout) => {
393                let _ = child.start_kill();
394                let (stdout, stderr) = get_output(&mut child).await?;
395                bail!("timed out waiting for the Node.js process to connect ({timeout:?} timeout)\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
396            },
397        };
398        connection.set_nodelay(true)?;
399
400        let child_stdout = BufReader::new(child.stdout.take().unwrap());
401        let child_stderr = BufReader::new(child.stderr.take().unwrap());
402
403        let recent_stdout: RecentLines =
404            Arc::new(Mutex::new(VecDeque::with_capacity(RECENT_OUTPUT_LINES)));
405        let recent_stderr: RecentLines =
406            Arc::new(Mutex::new(VecDeque::with_capacity(RECENT_OUTPUT_LINES)));
407
408        let stdout_handler = OutputStreamHandler {
409            stream: child_stdout,
410            shared: shared_stdout,
411            assets_for_source_mapping,
412            root: assets_root.clone(),
413            project_dir: project_dir.clone(),
414            final_stream: stdout(),
415            recent_lines: recent_stdout.clone(),
416        };
417        let stderr_handler = OutputStreamHandler {
418            stream: child_stderr,
419            shared: shared_stderr,
420            assets_for_source_mapping,
421            root: assets_root.clone(),
422            project_dir: project_dir.clone(),
423            final_stream: stderr(),
424            recent_lines: recent_stderr.clone(),
425        };
426
427        let mut process = Self {
428            child: Some(child),
429            connection,
430            stdout_handler,
431            stderr_handler,
432            recent_stdout,
433            recent_stderr,
434            debug,
435            cpu_time_invested: Duration::ZERO,
436        };
437
438        drop(guard);
439
440        let guard = duration_span!("Node.js initialization");
441        let ready_signal = process.recv().await?;
442
443        if !ready_signal.is_empty() {
444            bail!(
445                "Node.js process didn't send the expected ready signal\nOutput:\n{}",
446                String::from_utf8_lossy(&ready_signal)
447            );
448        }
449
450        drop(guard);
451
452        Ok(process)
453    }
454
455    async fn recv(&mut self) -> Result<Bytes> {
456        let connection = &mut self.connection;
457        async fn with_timeout<T, E: Into<anyhow::Error>>(
458            debug: bool,
459            fast: bool,
460            future: impl Future<Output = Result<T, E>> + Send,
461        ) -> Result<T> {
462            if debug {
463                future.await.map_err(Into::into)
464            } else {
465                let time = if fast {
466                    Duration::from_secs(20)
467                } else {
468                    Duration::from_secs(5 * 60)
469                };
470                timeout(time, future)
471                    .await
472                    .context("timeout while receiving message from process")?
473                    .map_err(Into::into)
474            }
475        }
476        let debug = self.debug;
477        let recv_future = async move {
478            let packet_len = with_timeout(debug, false, connection.read_u32())
479                .await
480                .context("reading packet length")?
481                .try_into()
482                .context("storing packet length")?;
483            let mut packet_data = vec![0; packet_len];
484            with_timeout(debug, true, connection.read_exact(&mut packet_data))
485                .await
486                .context("reading packet data")?;
487            Ok::<_, anyhow::Error>(packet_data.into())
488        };
489        let (result, stdout, stderr) = join!(
490            recv_future,
491            self.stdout_handler.handle_operation(),
492            self.stderr_handler.handle_operation(),
493        );
494        let result = match result {
495            Err(err) => {
496                // The IPC read failed — the most common cause is the child
497                // process crashing or exiting before sending a response.
498                // Attach whatever output the child wrote (recent lines from
499                // both stream handlers + the child's exit status, if
500                // available) so the user sees something diagnostic instead
501                // of an opaque "unexpected end of file" cascade.
502                return Err(self.diagnostic_recv_error(err).await);
503            }
504            Ok(result) => result,
505        };
506        stdout.context("unable to handle stdout from the Node.js process in a structured way")?;
507        stderr.context("unable to handle stderr from the Node.js process in a structured way")?;
508        Ok(result)
509    }
510
511    /// Build a contextualized error for a failed [`Self::recv`]. Includes
512    /// the captured stdout/stderr ring buffers and, if the child has
513    /// terminated, its exit status.
514    async fn diagnostic_recv_error(&mut self, err: anyhow::Error) -> anyhow::Error {
515        let exit_status = match self.child.as_mut() {
516            Some(child) => match timeout(Duration::from_secs(2), child.wait()).await {
517                Ok(Ok(status)) => Some(status),
518                _ => None,
519            },
520            None => None,
521        };
522        let mut output = String::new();
523        let stdout_lines: Vec<Vec<u8>> = self.recent_stdout.lock().iter().cloned().collect();
524        let stderr_lines: Vec<Vec<u8>> = self.recent_stderr.lock().iter().cloned().collect();
525        if !stdout_lines.is_empty() {
526            output.push_str("\nRecent process stdout:\n");
527            for line in stdout_lines {
528                output.push_str(&String::from_utf8_lossy(&line));
529            }
530        }
531        if !stderr_lines.is_empty() {
532            output.push_str("\nRecent process stderr:\n");
533            for line in stderr_lines {
534                output.push_str(&String::from_utf8_lossy(&line));
535            }
536        }
537        let exit_note = match exit_status {
538            Some(status) => format!("Node.js process exited with {status}"),
539            None => "Node.js process is still running or its exit status is unavailable".into(),
540        };
541        // ast-grep-ignore: no-context-format
542        err.context(format!("{exit_note}{output}"))
543    }
544    async fn send(&mut self, packet_data: Bytes) -> Result<()> {
545        self.connection
546            .write_u32(
547                packet_data
548                    .len()
549                    .try_into()
550                    .context("packet length does not fit into u32")?,
551            )
552            .await
553            .context("writing packet length")?;
554        self.connection
555            .write_all(&packet_data)
556            .await
557            .context("writing packet data")?;
558        self.connection
559            .flush()
560            .await
561            .context("flushing packet data")?;
562        Ok(())
563    }
564}
565
566type IdleProcessQueues = Mutex<Vec<Arc<HeapQueue<NodeJsPoolProcess>>>>;
567
568/// All non-empty `IdleProcessQueues`s of the whole application.
569/// This is used to scale down processes globally.
570static ACTIVE_POOLS: LazyLock<IdleProcessQueues> = LazyLock::new(Default::default);
571
572/// Arguments needed to spawn a new Node.js process. Extracted so that
573/// `pre_warm` can clone them once instead of cloning each pool field
574/// individually.
575struct ProcessArgs {
576    cwd: PathBuf,
577    env: FxHashMap<RcStr, RcStr>,
578    entrypoint: PathBuf,
579    assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
580    assets_root: FileSystemPath,
581    project_dir: FileSystemPath,
582    shared_stdout: SharedOutputSet,
583    shared_stderr: SharedOutputSet,
584    debug: bool,
585}
586
587impl ProcessArgs {
588    async fn create_process(self) -> Result<(NodeJsPoolProcess, Duration)> {
589        let start = Instant::now();
590        let process = NodeJsPoolProcess::new(
591            &self.cwd,
592            &self.env,
593            &self.entrypoint,
594            self.assets_for_source_mapping,
595            self.assets_root,
596            self.project_dir,
597            self.shared_stdout,
598            self.shared_stderr,
599            self.debug,
600        )
601        .await?;
602        Ok((process, start.elapsed()))
603    }
604}
605
606/// A pool of Node.js workers operating on an `entrypoint` with specific `cwd` and `env`.
607///
608/// The pool will spawn processes when needed and reuses old ones. It will never spawn more then a
609/// certain number of concurrent processes. This is specified with the `concurrency` argument in the
610/// constructor.
611///
612/// The worker will *not* use the `env` of the parent process by default. All environment variables
613/// need to be provided to make the execution as pure as possible.
614#[turbo_tasks::value(
615    cell = "new",
616    serialization = "skip",
617    evict = "last",
618    eq = "manual",
619    shared
620)]
621pub struct ChildProcessPool {
622    cwd: PathBuf,
623    entrypoint: PathBuf,
624    env: FxHashMap<RcStr, RcStr>,
625    pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
626    pub assets_root: FileSystemPath,
627    pub project_dir: FileSystemPath,
628    #[turbo_tasks(trace_ignore, debug_ignore)]
629    idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
630    /// Semaphore to limit the number of concurrent operations in general
631    #[turbo_tasks(trace_ignore, debug_ignore)]
632    concurrency_semaphore: Arc<Semaphore>,
633    /// Semaphore to limit the number of concurrently booting up processes
634    /// (excludes one-off processes)
635    #[turbo_tasks(trace_ignore, debug_ignore)]
636    bootup_semaphore: Arc<Semaphore>,
637    #[turbo_tasks(trace_ignore, debug_ignore)]
638    shared_stdout: SharedOutputSet,
639    #[turbo_tasks(trace_ignore, debug_ignore)]
640    shared_stderr: SharedOutputSet,
641    debug: bool,
642    #[turbo_tasks(trace_ignore, debug_ignore)]
643    stats: Arc<Mutex<NodeJsPoolStats>>,
644}
645
646impl ChildProcessPool {
647    /// * debug: Whether to automatically enable Node's `--inspect-brk` when spawning it. Note:
648    ///   automatically overrides concurrency to 1.
649    pub fn create(
650        cwd: PathBuf,
651        entrypoint: PathBuf,
652        env: FxHashMap<RcStr, RcStr>,
653        assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
654        assets_root: FileSystemPath,
655        project_dir: FileSystemPath,
656        concurrency: usize,
657        debug: bool,
658    ) -> EvaluatePool {
659        EvaluatePool::new(
660            Box::new(Self {
661                cwd,
662                entrypoint,
663                env,
664                assets_for_source_mapping,
665                assets_root: assets_root.clone(),
666                project_dir: project_dir.clone(),
667                concurrency_semaphore: Arc::new(Semaphore::new(if debug {
668                    1
669                } else {
670                    concurrency
671                })),
672                bootup_semaphore: Arc::new(Semaphore::new(1)),
673                idle_processes: Arc::new(HeapQueue::new()),
674                shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())),
675                shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())),
676                debug,
677                stats: Default::default(),
678            }),
679            assets_for_source_mapping,
680            assets_root,
681            project_dir,
682        )
683    }
684}
685
686#[turbo_tasks::value(shared)]
687pub(crate) struct ChildProcessesBackend;
688
689#[turbo_tasks::value_impl]
690impl NodeBackend for ChildProcessesBackend {
691    fn runtime_module_path(&self) -> RcStr {
692        rcstr!("child_process/evaluate.ts")
693    }
694
695    fn globals_module_path(&self) -> RcStr {
696        rcstr!("child_process/globals.ts")
697    }
698
699    fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
700        Box::pin(async move {
701            let CreatePoolOptions {
702                cwd,
703                entrypoint,
704                env,
705                assets_for_source_mapping,
706                assets_root,
707                project_dir,
708                concurrency,
709                debug,
710            } = options;
711
712            Ok(ChildProcessPool::create(
713                cwd,
714                entrypoint,
715                env,
716                assets_for_source_mapping,
717                assets_root,
718                project_dir,
719                concurrency,
720                debug,
721            ))
722        })
723    }
724
725    fn scale_down(&self) -> Result<()> {
726        ChildProcessPool::scale_down();
727        Ok(())
728    }
729
730    fn scale_zero(&self) -> Result<()> {
731        ChildProcessPool::scale_zero();
732        Ok(())
733    }
734}
735
736#[async_trait::async_trait]
737impl EvaluateOperation for ChildProcessPool {
738    async fn operation(&self) -> Result<Box<dyn Operation>> {
739        // Acquire a running process (handles concurrency limits, boots up the process)
740
741        let operation = {
742            let _guard = duration_span!("Node.js operation");
743            let (process, permits) = self.acquire_process().await?;
744            ChildProcessOperation {
745                process: Some(process),
746                permits,
747                idle_processes: self.idle_processes.clone(),
748                start: Instant::now(),
749                stats: self.stats.clone(),
750                allow_process_reuse: true,
751            }
752        };
753
754        Ok(Box::new(operation))
755    }
756
757    /// Returns a snapshot of the pool's internal statistics.
758    fn stats(&self) -> PoolStatsSnapshot {
759        self.stats.lock().snapshot()
760    }
761
762    /// Eagerly spawn a Node.js process so it's ready when the first
763    /// `operation()` is called. The process goes into the idle queue.
764    /// If a node request comes in while this is still initializing, it waits
765    /// on the bootup semaphore and will resume when the process is ready.
766    fn pre_warm(&self) {
767        let args = self.process_args();
768        let bootup_semaphore = self.bootup_semaphore.clone();
769        let idle_processes = self.idle_processes.clone();
770        let stats = self.stats.clone();
771
772        tokio::spawn(async move {
773            let Ok(bootup_permit) = bootup_semaphore.clone().acquire_owned().await else {
774                return;
775            };
776            {
777                stats.lock().add_booting_worker();
778            }
779            match args.create_process().await {
780                Ok((process, bootup_time)) => {
781                    {
782                        let mut s = stats.lock();
783                        s.add_bootup_time(bootup_time);
784                        s.finished_booting_worker();
785                    }
786                    drop(bootup_permit);
787                    idle_processes.push(process, &ACTIVE_POOLS);
788                }
789                Err(_e) => {
790                    let mut s = stats.lock();
791                    s.finished_booting_worker();
792                    s.remove_worker();
793                }
794            }
795        });
796    }
797}
798
799impl ChildProcessPool {
800    async fn acquire_process(&self) -> Result<(NodeJsPoolProcess, AcquiredPermits)> {
801        {
802            self.stats.lock().add_queued_task();
803        }
804
805        let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
806
807        let bootup = async {
808            let permit = self.bootup_semaphore.clone().acquire_owned().await;
809            let wait_time = self.stats.lock().wait_time_before_bootup();
810            tokio::time::sleep(wait_time).await;
811            permit
812        };
813
814        select! {
815            idle_process_result = self.idle_processes.pop(&ACTIVE_POOLS) => {
816                let process = idle_process_result.context("acquiring idle process permit")?;
817                Ok((process, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
818            },
819            bootup_permit = bootup => {
820                let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
821                {
822                    self.stats.lock().add_booting_worker();
823                }
824                let (process, bootup_time) = self.create_process().await?;
825                // Update the worker count
826                {
827                    let mut stats = self.stats.lock();
828                    stats.add_bootup_time(bootup_time);
829                    stats.finished_booting_worker();
830                }
831                // Increase the allowed booting up processes
832                self.bootup_semaphore.add_permits(1);
833                Ok((process, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
834            }
835        }
836    }
837
838    fn process_args(&self) -> ProcessArgs {
839        ProcessArgs {
840            cwd: self.cwd.clone(),
841            env: self.env.clone(),
842            entrypoint: self.entrypoint.clone(),
843            assets_for_source_mapping: self.assets_for_source_mapping,
844            assets_root: self.assets_root.clone(),
845            project_dir: self.project_dir.clone(),
846            shared_stdout: self.shared_stdout.clone(),
847            shared_stderr: self.shared_stderr.clone(),
848            debug: self.debug,
849        }
850    }
851
852    async fn create_process(&self) -> Result<(NodeJsPoolProcess, Duration), anyhow::Error> {
853        self.process_args()
854            .create_process()
855            .await
856            .context("creating new process")
857    }
858
859    pub fn scale_down() {
860        let pools = ACTIVE_POOLS.lock().clone();
861        for pool in pools {
862            pool.reduce_to_one();
863        }
864    }
865
866    pub fn scale_zero() {
867        let pools = ACTIVE_POOLS.lock().clone();
868        for pool in pools {
869            pool.reduce_to_zero(&ACTIVE_POOLS);
870        }
871    }
872}
873
874pub struct ChildProcessOperation {
875    process: Option<NodeJsPoolProcess>,
876    // This is used for drop
877    #[allow(dead_code)]
878    permits: AcquiredPermits,
879    idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
880    start: Instant,
881    stats: Arc<Mutex<NodeJsPoolStats>>,
882    allow_process_reuse: bool,
883}
884
885#[async_trait::async_trait]
886impl Operation for ChildProcessOperation {
887    async fn recv(&mut self) -> Result<Bytes> {
888        let bytes = self
889            .with_process(|process| async move {
890                process.recv().await.context("failed to receive message")
891            })
892            .await?;
893        Ok(bytes)
894    }
895
896    async fn send(&mut self, message: Bytes) -> Result<()> {
897        self.with_process(|process| async move {
898            timeout(Duration::from_secs(30), process.send(message))
899                .await
900                .context("timeout while sending message")?
901                .context("failed to send message")?;
902            Ok(())
903        })
904        .await
905    }
906
907    async fn wait_or_kill(&mut self) -> Result<ExitStatus> {
908        let mut process = self
909            .process
910            .take()
911            .context("Node.js operation already finished")?;
912
913        if self.allow_process_reuse {
914            self.stats.lock().remove_worker();
915        }
916
917        let mut child = process
918            .child
919            .take()
920            .context("Node.js operation already finished")?;
921
922        // Ignore error since we are not sure if the process is still alive
923        let _ = child.start_kill();
924        let status = timeout(Duration::from_secs(30), child.wait())
925            .await
926            .context("timeout while waiting for process end")?
927            .context("waiting for process end")?;
928
929        Ok(status)
930    }
931
932    fn disallow_reuse(&mut self) {
933        if self.allow_process_reuse {
934            self.stats.lock().remove_worker();
935            self.allow_process_reuse = false;
936        }
937    }
938}
939
940impl ChildProcessOperation {
941    async fn with_process<'a, F: Future<Output = Result<T>> + Send + 'a, T>(
942        &'a mut self,
943        f: impl FnOnce(&'a mut NodeJsPoolProcess) -> F,
944    ) -> Result<T> {
945        let process = self
946            .process
947            .as_mut()
948            .context("Node.js operation already finished")?;
949
950        if !self.allow_process_reuse {
951            bail!("Node.js process is no longer usable");
952        }
953
954        let result = f(process).await;
955        if result.is_err() && self.allow_process_reuse {
956            self.stats.lock().remove_worker();
957            self.allow_process_reuse = false;
958        }
959        result
960    }
961}
962
963impl Drop for ChildProcessOperation {
964    fn drop(&mut self) {
965        if let Some(mut process) = self.process.take() {
966            let elapsed = self.start.elapsed();
967            {
968                let stats = &mut self.stats.lock();
969                match self.permits {
970                    AcquiredPermits::Idle { .. } => stats.add_warm_process_time(elapsed),
971                    AcquiredPermits::Fresh { .. } => stats.add_cold_process_time(elapsed),
972                }
973            }
974            if self.allow_process_reuse {
975                process.cpu_time_invested += elapsed;
976                self.idle_processes.push(process, &ACTIVE_POOLS);
977            }
978        }
979    }
980}