Skip to main content

turbopack_node/process_pool/
mod.rs

1use std::{
2    future::Future,
3    mem::take,
4    path::{Path, PathBuf},
5    process::{ExitStatus, Stdio},
6    sync::Arc,
7    time::{Duration, Instant},
8};
9
10use anyhow::{Context, Result, bail};
11use futures::join;
12use once_cell::sync::Lazy;
13use owo_colors::OwoColorize;
14use parking_lot::Mutex;
15use rustc_hash::FxHashMap;
16use tokio::{
17    io::{
18        AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stderr,
19        Stdout, stderr, stdout,
20    },
21    net::{TcpListener, TcpStream},
22    process::{Child, ChildStderr, ChildStdout, Command},
23    select,
24    sync::Semaphore,
25    time::{sleep, timeout},
26};
27use turbo_rcstr::{RcStr, rcstr};
28use turbo_tasks::{FxIndexSet, ResolvedVc, Vc, duration_span};
29use turbo_tasks_fs::FileSystemPath;
30use turbopack_ecmascript::magic_identifier::unmangle_identifiers;
31
32use crate::{
33    AssetsForSourceMapping,
34    backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
35    evaluate::{EvaluateOperation, EvaluatePool, Operation},
36    format::FormattingMode,
37    pool_stats::{AcquiredPermits, NodeJsPoolStats, PoolStatsSnapshot},
38    source_map::apply_source_mapping,
39};
40
41mod heap_queue;
42use heap_queue::HeapQueue;
43
44struct NodeJsPoolProcess {
45    child: Option<Child>,
46    connection: TcpStream,
47    stdout_handler: OutputStreamHandler<ChildStdout, Stdout>,
48    stderr_handler: OutputStreamHandler<ChildStderr, Stderr>,
49    debug: bool,
50    cpu_time_invested: Duration,
51}
52
53impl Ord for NodeJsPoolProcess {
54    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
55        self.cpu_time_invested
56            .cmp(&other.cpu_time_invested)
57            .then_with(|| {
58                self.child
59                    .as_ref()
60                    .map(|c| c.id())
61                    .cmp(&other.child.as_ref().map(|c| c.id()))
62            })
63    }
64}
65
66impl PartialOrd for NodeJsPoolProcess {
67    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
68        Some(self.cmp(other))
69    }
70}
71
72impl Eq for NodeJsPoolProcess {}
73
74impl PartialEq for NodeJsPoolProcess {
75    fn eq(&self, other: &Self) -> bool {
76        self.cmp(other) == std::cmp::Ordering::Equal
77    }
78}
79
80const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
81
82#[derive(Clone, PartialEq, Eq, Hash)]
83struct OutputEntry {
84    data: Arc<[u8]>,
85    stack_trace: Option<Arc<[u8]>>,
86}
87
88type SharedOutputSet = Arc<Mutex<FxIndexSet<(OutputEntry, u32)>>>;
89
90static GLOBAL_OUTPUT_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
91static MARKER: &[u8] = b"TURBOPACK_OUTPUT_";
92static MARKER_STR: &str = "TURBOPACK_OUTPUT_";
93
94struct OutputStreamHandler<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
95    stream: BufReader<R>,
96    shared: SharedOutputSet,
97    assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
98    root: FileSystemPath,
99    project_dir: FileSystemPath,
100    final_stream: W,
101}
102
103impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> OutputStreamHandler<R, W> {
104    /// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate
105    /// lines that has beem emitted by other [OutputStreamHandler] instances
106    /// with the same `shared` before.
107    /// Returns when one operation is done.
108    pub async fn handle_operation(&mut self) -> Result<()> {
109        let Self {
110            stream,
111            shared,
112            assets_for_source_mapping,
113            root,
114            project_dir,
115            final_stream,
116        } = self;
117
118        async fn write_final<W: AsyncWrite + Unpin>(
119            mut bytes: &[u8],
120            final_stream: &mut W,
121        ) -> Result<()> {
122            let _lock = GLOBAL_OUTPUT_LOCK.lock().await;
123            while !bytes.is_empty() {
124                let count = final_stream.write(bytes).await?;
125                if count == 0 {
126                    bail!("Failed to write to final stream as it was closed");
127                }
128                bytes = &bytes[count..];
129            }
130            Ok(())
131        }
132
133        async fn write_source_mapped_final<W: AsyncWrite + Unpin>(
134            bytes: &[u8],
135            assets_for_source_mapping: Vc<AssetsForSourceMapping>,
136            root: FileSystemPath,
137            project_dir: FileSystemPath,
138            final_stream: &mut W,
139        ) -> Result<()> {
140            if let Ok(text) = std::str::from_utf8(bytes) {
141                let text = unmangle_identifiers(text, |content| {
142                    format!("{{{content}}}").italic().to_string()
143                });
144                match apply_source_mapping(
145                    text.as_ref(),
146                    assets_for_source_mapping,
147                    root,
148                    project_dir,
149                    FormattingMode::AnsiColors,
150                )
151                .await
152                {
153                    Err(e) => {
154                        write_final(
155                            format!("Error applying source mapping: {e}\n").as_bytes(),
156                            final_stream,
157                        )
158                        .await?;
159                        write_final(text.as_bytes(), final_stream).await?;
160                    }
161                    Ok(text) => {
162                        write_final(text.as_bytes(), final_stream).await?;
163                    }
164                }
165            } else {
166                write_final(bytes, final_stream).await?;
167            }
168            Ok(())
169        }
170
171        let mut buffer = Vec::new();
172        let mut own_output = FxHashMap::default();
173        let mut nesting: u32 = 0;
174        let mut in_stack = None;
175        let mut stack_trace_buffer = Vec::new();
176        loop {
177            let start = buffer.len();
178            if stream
179                .read_until(b'\n', &mut buffer)
180                .await
181                .context("error reading from stream")?
182                == 0
183            {
184                bail!("stream closed unexpectedly")
185            }
186            if buffer.len() - start == MARKER.len() + 2
187                && &buffer[start..buffer.len() - 2] == MARKER
188            {
189                // This is new line
190                buffer.pop();
191                // This is the type
192                match buffer.pop() {
193                    Some(b'B') => {
194                        stack_trace_buffer.clear();
195                        buffer.truncate(start);
196                        nesting += 1;
197                        in_stack = None;
198                        continue;
199                    }
200                    Some(b'E') => {
201                        buffer.truncate(start);
202                        if let Some(in_stack) = in_stack {
203                            if nesting != 0 {
204                                stack_trace_buffer = buffer[in_stack..].to_vec();
205                            }
206                            buffer.truncate(in_stack);
207                        }
208                        nesting = nesting.saturating_sub(1);
209                        in_stack = None;
210                        if nesting == 0 {
211                            let line = Arc::from(take(&mut buffer).into_boxed_slice());
212                            let stack_trace = if stack_trace_buffer.is_empty() {
213                                None
214                            } else {
215                                Some(Arc::from(take(&mut stack_trace_buffer).into_boxed_slice()))
216                            };
217                            let entry = OutputEntry {
218                                data: line,
219                                stack_trace,
220                            };
221                            let occurrence_number = *own_output
222                                .entry(entry.clone())
223                                .and_modify(|c| *c += 1)
224                                .or_insert(0);
225                            let new_entry = {
226                                let mut shared = shared.lock();
227                                shared.insert((entry.clone(), occurrence_number))
228                            };
229                            if !new_entry {
230                                // This line has been printed by another process, so we don't need
231                                // to print it again.
232                                continue;
233                            }
234                            write_source_mapped_final(
235                                &entry.data,
236                                **assets_for_source_mapping,
237                                root.clone(),
238                                project_dir.clone(),
239                                final_stream,
240                            )
241                            .await?;
242                        }
243                    }
244                    Some(b'S') => {
245                        buffer.truncate(start);
246                        in_stack = Some(start);
247                        continue;
248                    }
249                    Some(b'D') => {
250                        // operation done
251                        break;
252                    }
253                    _ => {}
254                }
255            }
256            if nesting != 0 {
257                // When inside of a marked output we want to aggregate until the end marker
258                continue;
259            }
260
261            write_source_mapped_final(
262                &buffer,
263                **assets_for_source_mapping,
264                root.clone(),
265                project_dir.clone(),
266                final_stream,
267            )
268            .await?;
269            buffer.clear();
270        }
271        Ok(())
272    }
273}
274
275impl NodeJsPoolProcess {
276    async fn new(
277        cwd: &Path,
278        env: &FxHashMap<RcStr, RcStr>,
279        entrypoint: &Path,
280        assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
281        assets_root: FileSystemPath,
282        project_dir: FileSystemPath,
283        shared_stdout: SharedOutputSet,
284        shared_stderr: SharedOutputSet,
285        debug: bool,
286    ) -> Result<Self> {
287        let guard = duration_span!("Node.js process startup");
288        let listener = TcpListener::bind("127.0.0.1:0")
289            .await
290            .context("binding to a port")?;
291        let port = listener.local_addr().context("getting port")?.port();
292        let mut cmd = Command::new("node");
293        cmd.current_dir(cwd);
294        if debug {
295            cmd.arg("--inspect-brk");
296        }
297        cmd.arg(entrypoint);
298        cmd.arg(port.to_string());
299        cmd.env_clear();
300        cmd.env(
301            "PATH",
302            std::env::var("PATH").expect("the PATH environment variable should always be set"),
303        );
304        #[cfg(target_family = "windows")]
305        cmd.env(
306            "SystemRoot",
307            std::env::var("SystemRoot")
308                .expect("the SystemRoot environment variable should always be set"),
309        );
310        cmd.envs(env);
311        cmd.stderr(Stdio::piped());
312        cmd.stdout(Stdio::piped());
313        cmd.kill_on_drop(true);
314
315        let mut child = cmd.spawn().context("spawning node pooled process")?;
316
317        let timeout = if debug {
318            Duration::MAX
319        } else {
320            CONNECT_TIMEOUT
321        };
322
323        async fn get_output(child: &mut Child) -> Result<(String, String)> {
324            let mut stdout = Vec::new();
325            let mut stderr = Vec::new();
326            child
327                .stdout
328                .take()
329                .unwrap()
330                .read_to_end(&mut stdout)
331                .await?;
332            child
333                .stderr
334                .take()
335                .unwrap()
336                .read_to_end(&mut stderr)
337                .await?;
338            fn clean(buffer: Vec<u8>) -> Result<String> {
339                Ok(String::from_utf8(buffer)?
340                    .lines()
341                    .filter(|line| {
342                        line.len() != MARKER_STR.len() + 1 && !line.starts_with(MARKER_STR)
343                    })
344                    .collect::<Vec<_>>()
345                    .join("\n"))
346            }
347            Ok((clean(stdout)?, clean(stderr)?))
348        }
349
350        let (connection, _) = select! {
351            connection = listener.accept() => connection.context("accepting connection")?,
352            status = child.wait() => {
353                match status {
354                    Ok(status) => {
355                        let (stdout, stderr) = get_output(&mut child).await?;
356                        bail!("node process exited before we could connect to it with {status}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
357                    }
358                    Err(err) => {
359                        let _ = child.start_kill();
360                        let (stdout, stderr) = get_output(&mut child).await?;
361                        bail!("node process exited before we could connect to it: {err:?}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
362                    },
363                }
364            },
365            _ = sleep(timeout) => {
366                let _ = child.start_kill();
367                let (stdout, stderr) = get_output(&mut child).await?;
368                bail!("timed out waiting for the Node.js process to connect ({timeout:?} timeout)\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
369            },
370        };
371        connection.set_nodelay(true)?;
372
373        let child_stdout = BufReader::new(child.stdout.take().unwrap());
374        let child_stderr = BufReader::new(child.stderr.take().unwrap());
375
376        let stdout_handler = OutputStreamHandler {
377            stream: child_stdout,
378            shared: shared_stdout,
379            assets_for_source_mapping,
380            root: assets_root.clone(),
381            project_dir: project_dir.clone(),
382            final_stream: stdout(),
383        };
384        let stderr_handler = OutputStreamHandler {
385            stream: child_stderr,
386            shared: shared_stderr,
387            assets_for_source_mapping,
388            root: assets_root.clone(),
389            project_dir: project_dir.clone(),
390            final_stream: stderr(),
391        };
392
393        let mut process = Self {
394            child: Some(child),
395            connection,
396            stdout_handler,
397            stderr_handler,
398            debug,
399            cpu_time_invested: Duration::ZERO,
400        };
401
402        drop(guard);
403
404        let guard = duration_span!("Node.js initialization");
405        let ready_signal = process.recv().await?;
406
407        if !ready_signal.is_empty() {
408            bail!(
409                "Node.js process didn't send the expected ready signal\nOutput:\n{}",
410                String::from_utf8_lossy(&ready_signal)
411            );
412        }
413
414        drop(guard);
415
416        Ok(process)
417    }
418
419    async fn recv(&mut self) -> Result<Vec<u8>> {
420        let connection = &mut self.connection;
421        async fn with_timeout<T, E: Into<anyhow::Error>>(
422            debug: bool,
423            fast: bool,
424            future: impl Future<Output = Result<T, E>> + Send,
425        ) -> Result<T> {
426            if debug {
427                future.await.map_err(Into::into)
428            } else {
429                let time = if fast {
430                    Duration::from_secs(20)
431                } else {
432                    Duration::from_secs(5 * 60)
433                };
434                timeout(time, future)
435                    .await
436                    .context("timeout while receiving message from process")?
437                    .map_err(Into::into)
438            }
439        }
440        let debug = self.debug;
441        let recv_future = async move {
442            let packet_len = with_timeout(debug, false, connection.read_u32())
443                .await
444                .context("reading packet length")?
445                .try_into()
446                .context("storing packet length")?;
447            let mut packet_data = vec![0; packet_len];
448            with_timeout(debug, true, connection.read_exact(&mut packet_data))
449                .await
450                .context("reading packet data")?;
451            Ok::<_, anyhow::Error>(packet_data)
452        };
453        let (result, stdout, stderr) = join!(
454            recv_future,
455            self.stdout_handler.handle_operation(),
456            self.stderr_handler.handle_operation(),
457        );
458        let result = result?;
459        stdout.context("unable to handle stdout from the Node.js process in a structured way")?;
460        stderr.context("unable to handle stderr from the Node.js process in a structured way")?;
461        Ok(result)
462    }
463
464    async fn send(&mut self, packet_data: Vec<u8>) -> Result<()> {
465        self.connection
466            .write_u32(
467                packet_data
468                    .len()
469                    .try_into()
470                    .context("packet length does not fit into u32")?,
471            )
472            .await
473            .context("writing packet length")?;
474        self.connection
475            .write_all(&packet_data)
476            .await
477            .context("writing packet data")?;
478        self.connection
479            .flush()
480            .await
481            .context("flushing packet data")?;
482        Ok(())
483    }
484}
485
486type IdleProcessQueues = Mutex<Vec<Arc<HeapQueue<NodeJsPoolProcess>>>>;
487
488/// All non-empty `IdleProcessQueues`s of the whole application.
489/// This is used to scale down processes globally.
490static ACTIVE_POOLS: Lazy<IdleProcessQueues> = Lazy::new(Default::default);
491
492/// Arguments needed to spawn a new Node.js process. Extracted so that
493/// `pre_warm` can clone them once instead of cloning each pool field
494/// individually.
495struct ProcessArgs {
496    cwd: PathBuf,
497    env: FxHashMap<RcStr, RcStr>,
498    entrypoint: PathBuf,
499    assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
500    assets_root: FileSystemPath,
501    project_dir: FileSystemPath,
502    shared_stdout: SharedOutputSet,
503    shared_stderr: SharedOutputSet,
504    debug: bool,
505}
506
507impl ProcessArgs {
508    async fn create_process(self) -> Result<(NodeJsPoolProcess, Duration)> {
509        let start = Instant::now();
510        let process = NodeJsPoolProcess::new(
511            &self.cwd,
512            &self.env,
513            &self.entrypoint,
514            self.assets_for_source_mapping,
515            self.assets_root,
516            self.project_dir,
517            self.shared_stdout,
518            self.shared_stderr,
519            self.debug,
520        )
521        .await?;
522        Ok((process, start.elapsed()))
523    }
524}
525
526/// A pool of Node.js workers operating on [entrypoint] with specific [cwd] and
527/// [env].
528///
529/// The pool will spawn processes when needed and reuses old ones. It will never
530/// spawn more then a certain number of concurrent processes. This is specified
531/// with the `concurrency` argument in the constructor.
532///
533/// The worker will *not* use the env of the parent process by default. All env
534/// vars need to be provided to make the execution as pure as possible.
535#[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)]
536pub struct ChildProcessPool {
537    cwd: PathBuf,
538    entrypoint: PathBuf,
539    env: FxHashMap<RcStr, RcStr>,
540    pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
541    pub assets_root: FileSystemPath,
542    pub project_dir: FileSystemPath,
543    #[turbo_tasks(trace_ignore, debug_ignore)]
544    idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
545    /// Semaphore to limit the number of concurrent operations in general
546    #[turbo_tasks(trace_ignore, debug_ignore)]
547    concurrency_semaphore: Arc<Semaphore>,
548    /// Semaphore to limit the number of concurrently booting up processes
549    /// (excludes one-off processes)
550    #[turbo_tasks(trace_ignore, debug_ignore)]
551    bootup_semaphore: Arc<Semaphore>,
552    #[turbo_tasks(trace_ignore, debug_ignore)]
553    shared_stdout: SharedOutputSet,
554    #[turbo_tasks(trace_ignore, debug_ignore)]
555    shared_stderr: SharedOutputSet,
556    debug: bool,
557    #[turbo_tasks(trace_ignore, debug_ignore)]
558    stats: Arc<Mutex<NodeJsPoolStats>>,
559}
560
561impl ChildProcessPool {
562    /// * debug: Whether to automatically enable Node's `--inspect-brk` when spawning it. Note:
563    ///   automatically overrides concurrency to 1.
564    pub fn create(
565        cwd: PathBuf,
566        entrypoint: PathBuf,
567        env: FxHashMap<RcStr, RcStr>,
568        assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
569        assets_root: FileSystemPath,
570        project_dir: FileSystemPath,
571        concurrency: usize,
572        debug: bool,
573    ) -> EvaluatePool {
574        EvaluatePool::new(
575            Box::new(Self {
576                cwd,
577                entrypoint,
578                env,
579                assets_for_source_mapping,
580                assets_root: assets_root.clone(),
581                project_dir: project_dir.clone(),
582                concurrency_semaphore: Arc::new(Semaphore::new(if debug {
583                    1
584                } else {
585                    concurrency
586                })),
587                bootup_semaphore: Arc::new(Semaphore::new(1)),
588                idle_processes: Arc::new(HeapQueue::new()),
589                shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())),
590                shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())),
591                debug,
592                stats: Default::default(),
593            }),
594            assets_for_source_mapping,
595            assets_root,
596            project_dir,
597        )
598    }
599}
600
601#[turbo_tasks::value(shared)]
602pub(crate) struct ChildProcessesBackend;
603
604#[turbo_tasks::value_impl]
605impl NodeBackend for ChildProcessesBackend {
606    fn runtime_module_path(&self) -> RcStr {
607        rcstr!("child_process/evaluate.ts")
608    }
609
610    fn globals_module_path(&self) -> RcStr {
611        rcstr!("child_process/globals.ts")
612    }
613
614    fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
615        Box::pin(async move {
616            let CreatePoolOptions {
617                cwd,
618                entrypoint,
619                env,
620                assets_for_source_mapping,
621                assets_root,
622                project_dir,
623                concurrency,
624                debug,
625            } = options;
626
627            Ok(ChildProcessPool::create(
628                cwd,
629                entrypoint,
630                env,
631                assets_for_source_mapping,
632                assets_root,
633                project_dir,
634                concurrency,
635                debug,
636            ))
637        })
638    }
639
640    fn scale_down(&self) -> Result<()> {
641        ChildProcessPool::scale_down();
642        Ok(())
643    }
644
645    fn scale_zero(&self) -> Result<()> {
646        ChildProcessPool::scale_zero();
647        Ok(())
648    }
649}
650
651#[async_trait::async_trait]
652impl EvaluateOperation for ChildProcessPool {
653    async fn operation(&self) -> Result<Box<dyn Operation>> {
654        // Acquire a running process (handles concurrency limits, boots up the process)
655
656        let operation = {
657            let _guard = duration_span!("Node.js operation");
658            let (process, permits) = self.acquire_process().await?;
659            ChildProcessOperation {
660                process: Some(process),
661                permits,
662                idle_processes: self.idle_processes.clone(),
663                start: Instant::now(),
664                stats: self.stats.clone(),
665                allow_process_reuse: true,
666            }
667        };
668
669        Ok(Box::new(operation))
670    }
671
672    /// Returns a snapshot of the pool's internal statistics.
673    fn stats(&self) -> PoolStatsSnapshot {
674        self.stats.lock().snapshot()
675    }
676
677    /// Eagerly spawn a Node.js process so it's ready when the first
678    /// `operation()` is called. The process goes into the idle queue.
679    /// If a node request comes in while this is still initializing, it waits
680    /// on the bootup semaphore and will resume when the process is ready.
681    fn pre_warm(&self) {
682        let args = self.process_args();
683        let bootup_semaphore = self.bootup_semaphore.clone();
684        let idle_processes = self.idle_processes.clone();
685        let stats = self.stats.clone();
686
687        tokio::spawn(async move {
688            let Ok(bootup_permit) = bootup_semaphore.clone().acquire_owned().await else {
689                return;
690            };
691            {
692                stats.lock().add_booting_worker();
693            }
694            match args.create_process().await {
695                Ok((process, bootup_time)) => {
696                    {
697                        let mut s = stats.lock();
698                        s.add_bootup_time(bootup_time);
699                        s.finished_booting_worker();
700                    }
701                    drop(bootup_permit);
702                    idle_processes.push(process, &ACTIVE_POOLS);
703                }
704                Err(_e) => {
705                    let mut s = stats.lock();
706                    s.finished_booting_worker();
707                    s.remove_worker();
708                }
709            }
710        });
711    }
712}
713
714impl ChildProcessPool {
715    async fn acquire_process(&self) -> Result<(NodeJsPoolProcess, AcquiredPermits)> {
716        {
717            self.stats.lock().add_queued_task();
718        }
719
720        let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
721
722        let bootup = async {
723            let permit = self.bootup_semaphore.clone().acquire_owned().await;
724            let wait_time = self.stats.lock().wait_time_before_bootup();
725            tokio::time::sleep(wait_time).await;
726            permit
727        };
728
729        select! {
730            idle_process_result = self.idle_processes.pop(&ACTIVE_POOLS) => {
731                let process = idle_process_result.context("acquiring idle process permit")?;
732                Ok((process, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
733            },
734            bootup_permit = bootup => {
735                let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
736                {
737                    self.stats.lock().add_booting_worker();
738                }
739                let (process, bootup_time) = self.create_process().await?;
740                // Update the worker count
741                {
742                    let mut stats = self.stats.lock();
743                    stats.add_bootup_time(bootup_time);
744                    stats.finished_booting_worker();
745                }
746                // Increase the allowed booting up processes
747                self.bootup_semaphore.add_permits(1);
748                Ok((process, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
749            }
750        }
751    }
752
753    fn process_args(&self) -> ProcessArgs {
754        ProcessArgs {
755            cwd: self.cwd.clone(),
756            env: self.env.clone(),
757            entrypoint: self.entrypoint.clone(),
758            assets_for_source_mapping: self.assets_for_source_mapping,
759            assets_root: self.assets_root.clone(),
760            project_dir: self.project_dir.clone(),
761            shared_stdout: self.shared_stdout.clone(),
762            shared_stderr: self.shared_stderr.clone(),
763            debug: self.debug,
764        }
765    }
766
767    async fn create_process(&self) -> Result<(NodeJsPoolProcess, Duration), anyhow::Error> {
768        self.process_args()
769            .create_process()
770            .await
771            .context("creating new process")
772    }
773
774    pub fn scale_down() {
775        let pools = ACTIVE_POOLS.lock().clone();
776        for pool in pools {
777            pool.reduce_to_one();
778        }
779    }
780
781    pub fn scale_zero() {
782        let pools = ACTIVE_POOLS.lock().clone();
783        for pool in pools {
784            pool.reduce_to_zero(&ACTIVE_POOLS);
785        }
786    }
787}
788
789pub struct ChildProcessOperation {
790    process: Option<NodeJsPoolProcess>,
791    // This is used for drop
792    #[allow(dead_code)]
793    permits: AcquiredPermits,
794    idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
795    start: Instant,
796    stats: Arc<Mutex<NodeJsPoolStats>>,
797    allow_process_reuse: bool,
798}
799
800#[async_trait::async_trait]
801impl Operation for ChildProcessOperation {
802    async fn recv(&mut self) -> Result<Vec<u8>> {
803        let vec = self
804            .with_process(|process| async move {
805                process.recv().await.context("failed to receive message")
806            })
807            .await?;
808        Ok(vec)
809    }
810
811    async fn send(&mut self, message: Vec<u8>) -> Result<()> {
812        self.with_process(|process| async move {
813            timeout(Duration::from_secs(30), process.send(message))
814                .await
815                .context("timeout while sending message")?
816                .context("failed to send message")?;
817            Ok(())
818        })
819        .await
820    }
821
822    async fn wait_or_kill(&mut self) -> Result<ExitStatus> {
823        let mut process = self
824            .process
825            .take()
826            .context("Node.js operation already finished")?;
827
828        if self.allow_process_reuse {
829            self.stats.lock().remove_worker();
830        }
831
832        let mut child = process
833            .child
834            .take()
835            .context("Node.js operation already finished")?;
836
837        // Ignore error since we are not sure if the process is still alive
838        let _ = child.start_kill();
839        let status = timeout(Duration::from_secs(30), child.wait())
840            .await
841            .context("timeout while waiting for process end")?
842            .context("waiting for process end")?;
843
844        Ok(status)
845    }
846
847    fn disallow_reuse(&mut self) {
848        if self.allow_process_reuse {
849            self.stats.lock().remove_worker();
850            self.allow_process_reuse = false;
851        }
852    }
853}
854
855impl ChildProcessOperation {
856    async fn with_process<'a, F: Future<Output = Result<T>> + Send + 'a, T>(
857        &'a mut self,
858        f: impl FnOnce(&'a mut NodeJsPoolProcess) -> F,
859    ) -> Result<T> {
860        let process = self
861            .process
862            .as_mut()
863            .context("Node.js operation already finished")?;
864
865        if !self.allow_process_reuse {
866            bail!("Node.js process is no longer usable");
867        }
868
869        let result = f(process).await;
870        if result.is_err() && self.allow_process_reuse {
871            self.stats.lock().remove_worker();
872            self.allow_process_reuse = false;
873        }
874        result
875    }
876}
877
878impl Drop for ChildProcessOperation {
879    fn drop(&mut self) {
880        if let Some(mut process) = self.process.take() {
881            let elapsed = self.start.elapsed();
882            {
883                let stats = &mut self.stats.lock();
884                match self.permits {
885                    AcquiredPermits::Idle { .. } => stats.add_warm_process_time(elapsed),
886                    AcquiredPermits::Fresh { .. } => stats.add_cold_process_time(elapsed),
887                }
888            }
889            if self.allow_process_reuse {
890                process.cpu_time_invested += elapsed;
891                self.idle_processes.push(process, &ACTIVE_POOLS);
892            }
893        }
894    }
895}