Skip to main content

turbopack_node/process_pool/
mod.rs

1use std::{
2    future::Future,
3    mem::take,
4    path::{Path, PathBuf},
5    process::{ExitStatus, Stdio},
6    sync::{Arc, LazyLock},
7    time::{Duration, Instant},
8};
9
10use anyhow::{Context, Result, bail};
11use bytes::Bytes;
12use futures::join;
13use owo_colors::OwoColorize;
14use parking_lot::Mutex;
15use rustc_hash::FxHashMap;
16use tokio::{
17    io::{
18        AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stderr,
19        Stdout, stderr, stdout,
20    },
21    net::{TcpListener, TcpStream},
22    process::{Child, ChildStderr, ChildStdout, Command},
23    select,
24    sync::Semaphore,
25    time::{sleep, timeout},
26};
27use turbo_rcstr::{RcStr, rcstr};
28use turbo_tasks::{FxIndexSet, ResolvedVc, Vc, duration_span};
29use turbo_tasks_fs::FileSystemPath;
30use turbopack_ecmascript::magic_identifier::unmangle_identifiers;
31
32use crate::{
33    AssetsForSourceMapping,
34    backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
35    evaluate::{EvaluateOperation, EvaluatePool, Operation},
36    format::FormattingMode,
37    pool_stats::{AcquiredPermits, NodeJsPoolStats, PoolStatsSnapshot},
38    source_map::apply_source_mapping,
39};
40
41mod heap_queue;
42use heap_queue::HeapQueue;
43
44struct NodeJsPoolProcess {
45    child: Option<Child>,
46    connection: TcpStream,
47    stdout_handler: OutputStreamHandler<ChildStdout, Stdout>,
48    stderr_handler: OutputStreamHandler<ChildStderr, Stderr>,
49    debug: bool,
50    cpu_time_invested: Duration,
51}
52
53impl Ord for NodeJsPoolProcess {
54    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
55        self.cpu_time_invested
56            .cmp(&other.cpu_time_invested)
57            .then_with(|| {
58                self.child
59                    .as_ref()
60                    .map(|c| c.id())
61                    .cmp(&other.child.as_ref().map(|c| c.id()))
62            })
63    }
64}
65
66impl PartialOrd for NodeJsPoolProcess {
67    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
68        Some(self.cmp(other))
69    }
70}
71
72impl Eq for NodeJsPoolProcess {}
73
74impl PartialEq for NodeJsPoolProcess {
75    fn eq(&self, other: &Self) -> bool {
76        self.cmp(other) == std::cmp::Ordering::Equal
77    }
78}
79
80const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
81
82#[derive(Clone, PartialEq, Eq, Hash)]
83struct OutputEntry {
84    data: Arc<[u8]>,
85    stack_trace: Option<Arc<[u8]>>,
86}
87
88type SharedOutputSet = Arc<Mutex<FxIndexSet<(OutputEntry, u32)>>>;
89
90static GLOBAL_OUTPUT_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
91static MARKER: &[u8] = b"TURBOPACK_OUTPUT_";
92static MARKER_STR: &str = "TURBOPACK_OUTPUT_";
93
94struct OutputStreamHandler<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
95    stream: BufReader<R>,
96    shared: SharedOutputSet,
97    assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
98    root: FileSystemPath,
99    project_dir: FileSystemPath,
100    final_stream: W,
101}
102
103impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> OutputStreamHandler<R, W> {
104    /// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate
105    /// lines that has beem emitted by other [OutputStreamHandler] instances
106    /// with the same `shared` before.
107    /// Returns when one operation is done.
108    pub async fn handle_operation(&mut self) -> Result<()> {
109        let Self {
110            stream,
111            shared,
112            assets_for_source_mapping,
113            root,
114            project_dir,
115            final_stream,
116        } = self;
117
118        async fn write_final<W: AsyncWrite + Unpin>(
119            mut bytes: &[u8],
120            final_stream: &mut W,
121        ) -> Result<()> {
122            let _lock = GLOBAL_OUTPUT_LOCK.lock().await;
123            while !bytes.is_empty() {
124                let count = final_stream.write(bytes).await?;
125                if count == 0 {
126                    bail!("Failed to write to final stream as it was closed");
127                }
128                bytes = &bytes[count..];
129            }
130            Ok(())
131        }
132
133        async fn write_source_mapped_final<W: AsyncWrite + Unpin>(
134            bytes: &[u8],
135            assets_for_source_mapping: Vc<AssetsForSourceMapping>,
136            root: FileSystemPath,
137            project_dir: FileSystemPath,
138            final_stream: &mut W,
139        ) -> Result<()> {
140            if let Ok(text) = std::str::from_utf8(bytes) {
141                let text = unmangle_identifiers(text, |content| {
142                    format!("{{{content}}}").italic().to_string()
143                });
144                match apply_source_mapping(
145                    text.as_ref(),
146                    assets_for_source_mapping,
147                    root,
148                    project_dir,
149                    FormattingMode::AnsiColors,
150                )
151                .await
152                {
153                    Err(e) => {
154                        write_final(
155                            format!("Error applying source mapping: {e}\n").as_bytes(),
156                            final_stream,
157                        )
158                        .await?;
159                        write_final(text.as_bytes(), final_stream).await?;
160                    }
161                    Ok(text) => {
162                        write_final(text.as_bytes(), final_stream).await?;
163                    }
164                }
165            } else {
166                write_final(bytes, final_stream).await?;
167            }
168            Ok(())
169        }
170
171        let mut buffer = Vec::new();
172        let mut own_output = FxHashMap::default();
173        let mut nesting: u32 = 0;
174        let mut in_stack = None;
175        let mut stack_trace_buffer = Vec::new();
176        loop {
177            let start = buffer.len();
178            if stream
179                .read_until(b'\n', &mut buffer)
180                .await
181                .context("error reading from stream")?
182                == 0
183            {
184                bail!("stream closed unexpectedly")
185            }
186            if buffer.len() - start == MARKER.len() + 2
187                && &buffer[start..buffer.len() - 2] == MARKER
188            {
189                // This is new line
190                buffer.pop();
191                // This is the type
192                match buffer.pop() {
193                    Some(b'B') => {
194                        stack_trace_buffer.clear();
195                        buffer.truncate(start);
196                        nesting += 1;
197                        in_stack = None;
198                        continue;
199                    }
200                    Some(b'E') => {
201                        buffer.truncate(start);
202                        if let Some(in_stack) = in_stack {
203                            if nesting != 0 {
204                                stack_trace_buffer = buffer[in_stack..].to_vec();
205                            }
206                            buffer.truncate(in_stack);
207                        }
208                        nesting = nesting.saturating_sub(1);
209                        in_stack = None;
210                        if nesting == 0 {
211                            let line = Arc::from(take(&mut buffer).into_boxed_slice());
212                            let stack_trace = if stack_trace_buffer.is_empty() {
213                                None
214                            } else {
215                                Some(Arc::from(take(&mut stack_trace_buffer).into_boxed_slice()))
216                            };
217                            let entry = OutputEntry {
218                                data: line,
219                                stack_trace,
220                            };
221                            let occurrence_number = *own_output
222                                .entry(entry.clone())
223                                .and_modify(|c| *c += 1)
224                                .or_insert(0);
225                            let new_entry = {
226                                let mut shared = shared.lock();
227                                shared.insert((entry.clone(), occurrence_number))
228                            };
229                            if !new_entry {
230                                // This line has been printed by another process, so we don't need
231                                // to print it again.
232                                continue;
233                            }
234                            write_source_mapped_final(
235                                &entry.data,
236                                **assets_for_source_mapping,
237                                root.clone(),
238                                project_dir.clone(),
239                                final_stream,
240                            )
241                            .await?;
242                        }
243                    }
244                    Some(b'S') => {
245                        buffer.truncate(start);
246                        in_stack = Some(start);
247                        continue;
248                    }
249                    Some(b'D') => {
250                        // operation done
251                        break;
252                    }
253                    _ => {}
254                }
255            }
256            if nesting != 0 {
257                // When inside of a marked output we want to aggregate until the end marker
258                continue;
259            }
260
261            write_source_mapped_final(
262                &buffer,
263                **assets_for_source_mapping,
264                root.clone(),
265                project_dir.clone(),
266                final_stream,
267            )
268            .await?;
269            buffer.clear();
270        }
271        Ok(())
272    }
273}
274
275impl NodeJsPoolProcess {
276    async fn new(
277        cwd: &Path,
278        env: &FxHashMap<RcStr, RcStr>,
279        entrypoint: &Path,
280        assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
281        assets_root: FileSystemPath,
282        project_dir: FileSystemPath,
283        shared_stdout: SharedOutputSet,
284        shared_stderr: SharedOutputSet,
285        debug: bool,
286    ) -> Result<Self> {
287        let guard = duration_span!("Node.js process startup");
288        let listener = TcpListener::bind("127.0.0.1:0")
289            .await
290            .context("binding to a port")?;
291        let port = listener.local_addr().context("getting port")?.port();
292        let mut cmd = Command::new("node");
293        cmd.current_dir(cwd);
294        if debug {
295            cmd.arg("--inspect-brk");
296        }
297        cmd.arg(entrypoint);
298        cmd.arg(port.to_string());
299        cmd.env_clear();
300        cmd.env(
301            "PATH",
302            std::env::var("PATH").expect("the PATH environment variable should always be set"),
303        );
304        #[cfg(target_family = "windows")]
305        cmd.env(
306            "SystemRoot",
307            std::env::var("SystemRoot")
308                .expect("the SystemRoot environment variable should always be set"),
309        );
310        cmd.envs(env);
311        cmd.stderr(Stdio::piped());
312        cmd.stdout(Stdio::piped());
313        cmd.kill_on_drop(true);
314
315        let mut child = cmd.spawn().context("spawning node pooled process")?;
316
317        let timeout = if debug {
318            Duration::MAX
319        } else {
320            CONNECT_TIMEOUT
321        };
322
323        async fn get_output(child: &mut Child) -> Result<(String, String)> {
324            let mut stdout = Vec::new();
325            let mut stderr = Vec::new();
326            child
327                .stdout
328                .take()
329                .unwrap()
330                .read_to_end(&mut stdout)
331                .await?;
332            child
333                .stderr
334                .take()
335                .unwrap()
336                .read_to_end(&mut stderr)
337                .await?;
338            fn clean(buffer: Vec<u8>) -> Result<String> {
339                Ok(String::from_utf8(buffer)?
340                    .lines()
341                    .filter(|line| {
342                        line.len() != MARKER_STR.len() + 1 && !line.starts_with(MARKER_STR)
343                    })
344                    .collect::<Vec<_>>()
345                    .join("\n"))
346            }
347            Ok((clean(stdout)?, clean(stderr)?))
348        }
349
350        let (connection, _) = select! {
351            connection = listener.accept() => connection.context("accepting connection")?,
352            status = child.wait() => {
353                match status {
354                    Ok(status) => {
355                        let (stdout, stderr) = get_output(&mut child).await?;
356                        bail!("node process exited before we could connect to it with {status}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
357                    }
358                    Err(err) => {
359                        let _ = child.start_kill();
360                        let (stdout, stderr) = get_output(&mut child).await?;
361                        bail!("node process exited before we could connect to it: {err:?}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
362                    },
363                }
364            },
365            _ = sleep(timeout) => {
366                let _ = child.start_kill();
367                let (stdout, stderr) = get_output(&mut child).await?;
368                bail!("timed out waiting for the Node.js process to connect ({timeout:?} timeout)\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
369            },
370        };
371        connection.set_nodelay(true)?;
372
373        let child_stdout = BufReader::new(child.stdout.take().unwrap());
374        let child_stderr = BufReader::new(child.stderr.take().unwrap());
375
376        let stdout_handler = OutputStreamHandler {
377            stream: child_stdout,
378            shared: shared_stdout,
379            assets_for_source_mapping,
380            root: assets_root.clone(),
381            project_dir: project_dir.clone(),
382            final_stream: stdout(),
383        };
384        let stderr_handler = OutputStreamHandler {
385            stream: child_stderr,
386            shared: shared_stderr,
387            assets_for_source_mapping,
388            root: assets_root.clone(),
389            project_dir: project_dir.clone(),
390            final_stream: stderr(),
391        };
392
393        let mut process = Self {
394            child: Some(child),
395            connection,
396            stdout_handler,
397            stderr_handler,
398            debug,
399            cpu_time_invested: Duration::ZERO,
400        };
401
402        drop(guard);
403
404        let guard = duration_span!("Node.js initialization");
405        let ready_signal = process.recv().await?;
406
407        if !ready_signal.is_empty() {
408            bail!(
409                "Node.js process didn't send the expected ready signal\nOutput:\n{}",
410                String::from_utf8_lossy(&ready_signal)
411            );
412        }
413
414        drop(guard);
415
416        Ok(process)
417    }
418
419    async fn recv(&mut self) -> Result<Bytes> {
420        let connection = &mut self.connection;
421        async fn with_timeout<T, E: Into<anyhow::Error>>(
422            debug: bool,
423            fast: bool,
424            future: impl Future<Output = Result<T, E>> + Send,
425        ) -> Result<T> {
426            if debug {
427                future.await.map_err(Into::into)
428            } else {
429                let time = if fast {
430                    Duration::from_secs(20)
431                } else {
432                    Duration::from_secs(5 * 60)
433                };
434                timeout(time, future)
435                    .await
436                    .context("timeout while receiving message from process")?
437                    .map_err(Into::into)
438            }
439        }
440        let debug = self.debug;
441        let recv_future = async move {
442            let packet_len = with_timeout(debug, false, connection.read_u32())
443                .await
444                .context("reading packet length")?
445                .try_into()
446                .context("storing packet length")?;
447            let mut packet_data = vec![0; packet_len];
448            with_timeout(debug, true, connection.read_exact(&mut packet_data))
449                .await
450                .context("reading packet data")?;
451            Ok::<_, anyhow::Error>(packet_data.into())
452        };
453        let (result, stdout, stderr) = join!(
454            recv_future,
455            self.stdout_handler.handle_operation(),
456            self.stderr_handler.handle_operation(),
457        );
458        let result: Bytes = result?;
459        stdout.context("unable to handle stdout from the Node.js process in a structured way")?;
460        stderr.context("unable to handle stderr from the Node.js process in a structured way")?;
461        Ok(result)
462    }
463    async fn send(&mut self, packet_data: Bytes) -> Result<()> {
464        self.connection
465            .write_u32(
466                packet_data
467                    .len()
468                    .try_into()
469                    .context("packet length does not fit into u32")?,
470            )
471            .await
472            .context("writing packet length")?;
473        self.connection
474            .write_all(&packet_data)
475            .await
476            .context("writing packet data")?;
477        self.connection
478            .flush()
479            .await
480            .context("flushing packet data")?;
481        Ok(())
482    }
483}
484
485type IdleProcessQueues = Mutex<Vec<Arc<HeapQueue<NodeJsPoolProcess>>>>;
486
487/// All non-empty `IdleProcessQueues`s of the whole application.
488/// This is used to scale down processes globally.
489static ACTIVE_POOLS: LazyLock<IdleProcessQueues> = LazyLock::new(Default::default);
490
491/// Arguments needed to spawn a new Node.js process. Extracted so that
492/// `pre_warm` can clone them once instead of cloning each pool field
493/// individually.
494struct ProcessArgs {
495    cwd: PathBuf,
496    env: FxHashMap<RcStr, RcStr>,
497    entrypoint: PathBuf,
498    assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
499    assets_root: FileSystemPath,
500    project_dir: FileSystemPath,
501    shared_stdout: SharedOutputSet,
502    shared_stderr: SharedOutputSet,
503    debug: bool,
504}
505
506impl ProcessArgs {
507    async fn create_process(self) -> Result<(NodeJsPoolProcess, Duration)> {
508        let start = Instant::now();
509        let process = NodeJsPoolProcess::new(
510            &self.cwd,
511            &self.env,
512            &self.entrypoint,
513            self.assets_for_source_mapping,
514            self.assets_root,
515            self.project_dir,
516            self.shared_stdout,
517            self.shared_stderr,
518            self.debug,
519        )
520        .await?;
521        Ok((process, start.elapsed()))
522    }
523}
524
525/// A pool of Node.js workers operating on an `entrypoint` with specific `cwd` and `env`.
526///
527/// The pool will spawn processes when needed and reuses old ones. It will never spawn more then a
528/// certain number of concurrent processes. This is specified with the `concurrency` argument in the
529/// constructor.
530///
531/// The worker will *not* use the `env` of the parent process by default. All environment variables
532/// need to be provided to make the execution as pure as possible.
533#[turbo_tasks::value(
534    cell = "new",
535    serialization = "skip",
536    evict = "last",
537    eq = "manual",
538    shared
539)]
540pub struct ChildProcessPool {
541    cwd: PathBuf,
542    entrypoint: PathBuf,
543    env: FxHashMap<RcStr, RcStr>,
544    pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
545    pub assets_root: FileSystemPath,
546    pub project_dir: FileSystemPath,
547    #[turbo_tasks(trace_ignore, debug_ignore)]
548    idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
549    /// Semaphore to limit the number of concurrent operations in general
550    #[turbo_tasks(trace_ignore, debug_ignore)]
551    concurrency_semaphore: Arc<Semaphore>,
552    /// Semaphore to limit the number of concurrently booting up processes
553    /// (excludes one-off processes)
554    #[turbo_tasks(trace_ignore, debug_ignore)]
555    bootup_semaphore: Arc<Semaphore>,
556    #[turbo_tasks(trace_ignore, debug_ignore)]
557    shared_stdout: SharedOutputSet,
558    #[turbo_tasks(trace_ignore, debug_ignore)]
559    shared_stderr: SharedOutputSet,
560    debug: bool,
561    #[turbo_tasks(trace_ignore, debug_ignore)]
562    stats: Arc<Mutex<NodeJsPoolStats>>,
563}
564
565impl ChildProcessPool {
566    /// * debug: Whether to automatically enable Node's `--inspect-brk` when spawning it. Note:
567    ///   automatically overrides concurrency to 1.
568    pub fn create(
569        cwd: PathBuf,
570        entrypoint: PathBuf,
571        env: FxHashMap<RcStr, RcStr>,
572        assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
573        assets_root: FileSystemPath,
574        project_dir: FileSystemPath,
575        concurrency: usize,
576        debug: bool,
577    ) -> EvaluatePool {
578        EvaluatePool::new(
579            Box::new(Self {
580                cwd,
581                entrypoint,
582                env,
583                assets_for_source_mapping,
584                assets_root: assets_root.clone(),
585                project_dir: project_dir.clone(),
586                concurrency_semaphore: Arc::new(Semaphore::new(if debug {
587                    1
588                } else {
589                    concurrency
590                })),
591                bootup_semaphore: Arc::new(Semaphore::new(1)),
592                idle_processes: Arc::new(HeapQueue::new()),
593                shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())),
594                shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())),
595                debug,
596                stats: Default::default(),
597            }),
598            assets_for_source_mapping,
599            assets_root,
600            project_dir,
601        )
602    }
603}
604
605#[turbo_tasks::value(shared)]
606pub(crate) struct ChildProcessesBackend;
607
608#[turbo_tasks::value_impl]
609impl NodeBackend for ChildProcessesBackend {
610    fn runtime_module_path(&self) -> RcStr {
611        rcstr!("child_process/evaluate.ts")
612    }
613
614    fn globals_module_path(&self) -> RcStr {
615        rcstr!("child_process/globals.ts")
616    }
617
618    fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
619        Box::pin(async move {
620            let CreatePoolOptions {
621                cwd,
622                entrypoint,
623                env,
624                assets_for_source_mapping,
625                assets_root,
626                project_dir,
627                concurrency,
628                debug,
629            } = options;
630
631            Ok(ChildProcessPool::create(
632                cwd,
633                entrypoint,
634                env,
635                assets_for_source_mapping,
636                assets_root,
637                project_dir,
638                concurrency,
639                debug,
640            ))
641        })
642    }
643
644    fn scale_down(&self) -> Result<()> {
645        ChildProcessPool::scale_down();
646        Ok(())
647    }
648
649    fn scale_zero(&self) -> Result<()> {
650        ChildProcessPool::scale_zero();
651        Ok(())
652    }
653}
654
655#[async_trait::async_trait]
656impl EvaluateOperation for ChildProcessPool {
657    async fn operation(&self) -> Result<Box<dyn Operation>> {
658        // Acquire a running process (handles concurrency limits, boots up the process)
659
660        let operation = {
661            let _guard = duration_span!("Node.js operation");
662            let (process, permits) = self.acquire_process().await?;
663            ChildProcessOperation {
664                process: Some(process),
665                permits,
666                idle_processes: self.idle_processes.clone(),
667                start: Instant::now(),
668                stats: self.stats.clone(),
669                allow_process_reuse: true,
670            }
671        };
672
673        Ok(Box::new(operation))
674    }
675
676    /// Returns a snapshot of the pool's internal statistics.
677    fn stats(&self) -> PoolStatsSnapshot {
678        self.stats.lock().snapshot()
679    }
680
681    /// Eagerly spawn a Node.js process so it's ready when the first
682    /// `operation()` is called. The process goes into the idle queue.
683    /// If a node request comes in while this is still initializing, it waits
684    /// on the bootup semaphore and will resume when the process is ready.
685    fn pre_warm(&self) {
686        let args = self.process_args();
687        let bootup_semaphore = self.bootup_semaphore.clone();
688        let idle_processes = self.idle_processes.clone();
689        let stats = self.stats.clone();
690
691        tokio::spawn(async move {
692            let Ok(bootup_permit) = bootup_semaphore.clone().acquire_owned().await else {
693                return;
694            };
695            {
696                stats.lock().add_booting_worker();
697            }
698            match args.create_process().await {
699                Ok((process, bootup_time)) => {
700                    {
701                        let mut s = stats.lock();
702                        s.add_bootup_time(bootup_time);
703                        s.finished_booting_worker();
704                    }
705                    drop(bootup_permit);
706                    idle_processes.push(process, &ACTIVE_POOLS);
707                }
708                Err(_e) => {
709                    let mut s = stats.lock();
710                    s.finished_booting_worker();
711                    s.remove_worker();
712                }
713            }
714        });
715    }
716}
717
718impl ChildProcessPool {
719    async fn acquire_process(&self) -> Result<(NodeJsPoolProcess, AcquiredPermits)> {
720        {
721            self.stats.lock().add_queued_task();
722        }
723
724        let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
725
726        let bootup = async {
727            let permit = self.bootup_semaphore.clone().acquire_owned().await;
728            let wait_time = self.stats.lock().wait_time_before_bootup();
729            tokio::time::sleep(wait_time).await;
730            permit
731        };
732
733        select! {
734            idle_process_result = self.idle_processes.pop(&ACTIVE_POOLS) => {
735                let process = idle_process_result.context("acquiring idle process permit")?;
736                Ok((process, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
737            },
738            bootup_permit = bootup => {
739                let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
740                {
741                    self.stats.lock().add_booting_worker();
742                }
743                let (process, bootup_time) = self.create_process().await?;
744                // Update the worker count
745                {
746                    let mut stats = self.stats.lock();
747                    stats.add_bootup_time(bootup_time);
748                    stats.finished_booting_worker();
749                }
750                // Increase the allowed booting up processes
751                self.bootup_semaphore.add_permits(1);
752                Ok((process, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
753            }
754        }
755    }
756
757    fn process_args(&self) -> ProcessArgs {
758        ProcessArgs {
759            cwd: self.cwd.clone(),
760            env: self.env.clone(),
761            entrypoint: self.entrypoint.clone(),
762            assets_for_source_mapping: self.assets_for_source_mapping,
763            assets_root: self.assets_root.clone(),
764            project_dir: self.project_dir.clone(),
765            shared_stdout: self.shared_stdout.clone(),
766            shared_stderr: self.shared_stderr.clone(),
767            debug: self.debug,
768        }
769    }
770
771    async fn create_process(&self) -> Result<(NodeJsPoolProcess, Duration), anyhow::Error> {
772        self.process_args()
773            .create_process()
774            .await
775            .context("creating new process")
776    }
777
778    pub fn scale_down() {
779        let pools = ACTIVE_POOLS.lock().clone();
780        for pool in pools {
781            pool.reduce_to_one();
782        }
783    }
784
785    pub fn scale_zero() {
786        let pools = ACTIVE_POOLS.lock().clone();
787        for pool in pools {
788            pool.reduce_to_zero(&ACTIVE_POOLS);
789        }
790    }
791}
792
793pub struct ChildProcessOperation {
794    process: Option<NodeJsPoolProcess>,
795    // This is used for drop
796    #[allow(dead_code)]
797    permits: AcquiredPermits,
798    idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
799    start: Instant,
800    stats: Arc<Mutex<NodeJsPoolStats>>,
801    allow_process_reuse: bool,
802}
803
804#[async_trait::async_trait]
805impl Operation for ChildProcessOperation {
806    async fn recv(&mut self) -> Result<Bytes> {
807        let bytes = self
808            .with_process(|process| async move {
809                process.recv().await.context("failed to receive message")
810            })
811            .await?;
812        Ok(bytes)
813    }
814
815    async fn send(&mut self, message: Bytes) -> Result<()> {
816        self.with_process(|process| async move {
817            timeout(Duration::from_secs(30), process.send(message))
818                .await
819                .context("timeout while sending message")?
820                .context("failed to send message")?;
821            Ok(())
822        })
823        .await
824    }
825
826    async fn wait_or_kill(&mut self) -> Result<ExitStatus> {
827        let mut process = self
828            .process
829            .take()
830            .context("Node.js operation already finished")?;
831
832        if self.allow_process_reuse {
833            self.stats.lock().remove_worker();
834        }
835
836        let mut child = process
837            .child
838            .take()
839            .context("Node.js operation already finished")?;
840
841        // Ignore error since we are not sure if the process is still alive
842        let _ = child.start_kill();
843        let status = timeout(Duration::from_secs(30), child.wait())
844            .await
845            .context("timeout while waiting for process end")?
846            .context("waiting for process end")?;
847
848        Ok(status)
849    }
850
851    fn disallow_reuse(&mut self) {
852        if self.allow_process_reuse {
853            self.stats.lock().remove_worker();
854            self.allow_process_reuse = false;
855        }
856    }
857}
858
859impl ChildProcessOperation {
860    async fn with_process<'a, F: Future<Output = Result<T>> + Send + 'a, T>(
861        &'a mut self,
862        f: impl FnOnce(&'a mut NodeJsPoolProcess) -> F,
863    ) -> Result<T> {
864        let process = self
865            .process
866            .as_mut()
867            .context("Node.js operation already finished")?;
868
869        if !self.allow_process_reuse {
870            bail!("Node.js process is no longer usable");
871        }
872
873        let result = f(process).await;
874        if result.is_err() && self.allow_process_reuse {
875            self.stats.lock().remove_worker();
876            self.allow_process_reuse = false;
877        }
878        result
879    }
880}
881
882impl Drop for ChildProcessOperation {
883    fn drop(&mut self) {
884        if let Some(mut process) = self.process.take() {
885            let elapsed = self.start.elapsed();
886            {
887                let stats = &mut self.stats.lock();
888                match self.permits {
889                    AcquiredPermits::Idle { .. } => stats.add_warm_process_time(elapsed),
890                    AcquiredPermits::Fresh { .. } => stats.add_cold_process_time(elapsed),
891                }
892            }
893            if self.allow_process_reuse {
894                process.cpu_time_invested += elapsed;
895                self.idle_processes.push(process, &ACTIVE_POOLS);
896            }
897        }
898    }
899}