1use std::{
2 future::Future,
3 mem::take,
4 path::{Path, PathBuf},
5 process::{ExitStatus, Stdio},
6 sync::Arc,
7 time::{Duration, Instant},
8};
9
10use anyhow::{Context, Result, bail};
11use futures::join;
12use once_cell::sync::Lazy;
13use owo_colors::OwoColorize;
14use parking_lot::Mutex;
15use rustc_hash::FxHashMap;
16use tokio::{
17 io::{
18 AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stderr,
19 Stdout, stderr, stdout,
20 },
21 net::{TcpListener, TcpStream},
22 process::{Child, ChildStderr, ChildStdout, Command},
23 select,
24 sync::Semaphore,
25 time::{sleep, timeout},
26};
27use turbo_rcstr::{RcStr, rcstr};
28use turbo_tasks::{FxIndexSet, ResolvedVc, Vc, duration_span};
29use turbo_tasks_fs::FileSystemPath;
30use turbopack_ecmascript::magic_identifier::unmangle_identifiers;
31
32use crate::{
33 AssetsForSourceMapping,
34 backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
35 evaluate::{EvaluateOperation, EvaluatePool, Operation},
36 format::FormattingMode,
37 pool_stats::{AcquiredPermits, NodeJsPoolStats, PoolStatsSnapshot},
38 source_map::apply_source_mapping,
39};
40
41mod heap_queue;
42use heap_queue::HeapQueue;
43
44struct NodeJsPoolProcess {
45 child: Option<Child>,
46 connection: TcpStream,
47 stdout_handler: OutputStreamHandler<ChildStdout, Stdout>,
48 stderr_handler: OutputStreamHandler<ChildStderr, Stderr>,
49 debug: bool,
50 cpu_time_invested: Duration,
51}
52
53impl Ord for NodeJsPoolProcess {
54 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
55 self.cpu_time_invested
56 .cmp(&other.cpu_time_invested)
57 .then_with(|| {
58 self.child
59 .as_ref()
60 .map(|c| c.id())
61 .cmp(&other.child.as_ref().map(|c| c.id()))
62 })
63 }
64}
65
66impl PartialOrd for NodeJsPoolProcess {
67 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
68 Some(self.cmp(other))
69 }
70}
71
72impl Eq for NodeJsPoolProcess {}
73
74impl PartialEq for NodeJsPoolProcess {
75 fn eq(&self, other: &Self) -> bool {
76 self.cmp(other) == std::cmp::Ordering::Equal
77 }
78}
79
80const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
81
82#[derive(Clone, PartialEq, Eq, Hash)]
83struct OutputEntry {
84 data: Arc<[u8]>,
85 stack_trace: Option<Arc<[u8]>>,
86}
87
88type SharedOutputSet = Arc<Mutex<FxIndexSet<(OutputEntry, u32)>>>;
89
90static GLOBAL_OUTPUT_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
91static MARKER: &[u8] = b"TURBOPACK_OUTPUT_";
92static MARKER_STR: &str = "TURBOPACK_OUTPUT_";
93
94struct OutputStreamHandler<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
95 stream: BufReader<R>,
96 shared: SharedOutputSet,
97 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
98 root: FileSystemPath,
99 project_dir: FileSystemPath,
100 final_stream: W,
101}
102
103impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> OutputStreamHandler<R, W> {
104 pub async fn handle_operation(&mut self) -> Result<()> {
109 let Self {
110 stream,
111 shared,
112 assets_for_source_mapping,
113 root,
114 project_dir,
115 final_stream,
116 } = self;
117
118 async fn write_final<W: AsyncWrite + Unpin>(
119 mut bytes: &[u8],
120 final_stream: &mut W,
121 ) -> Result<()> {
122 let _lock = GLOBAL_OUTPUT_LOCK.lock().await;
123 while !bytes.is_empty() {
124 let count = final_stream.write(bytes).await?;
125 if count == 0 {
126 bail!("Failed to write to final stream as it was closed");
127 }
128 bytes = &bytes[count..];
129 }
130 Ok(())
131 }
132
133 async fn write_source_mapped_final<W: AsyncWrite + Unpin>(
134 bytes: &[u8],
135 assets_for_source_mapping: Vc<AssetsForSourceMapping>,
136 root: FileSystemPath,
137 project_dir: FileSystemPath,
138 final_stream: &mut W,
139 ) -> Result<()> {
140 if let Ok(text) = std::str::from_utf8(bytes) {
141 let text = unmangle_identifiers(text, |content| {
142 format!("{{{content}}}").italic().to_string()
143 });
144 match apply_source_mapping(
145 text.as_ref(),
146 assets_for_source_mapping,
147 root,
148 project_dir,
149 FormattingMode::AnsiColors,
150 )
151 .await
152 {
153 Err(e) => {
154 write_final(
155 format!("Error applying source mapping: {e}\n").as_bytes(),
156 final_stream,
157 )
158 .await?;
159 write_final(text.as_bytes(), final_stream).await?;
160 }
161 Ok(text) => {
162 write_final(text.as_bytes(), final_stream).await?;
163 }
164 }
165 } else {
166 write_final(bytes, final_stream).await?;
167 }
168 Ok(())
169 }
170
171 let mut buffer = Vec::new();
172 let mut own_output = FxHashMap::default();
173 let mut nesting: u32 = 0;
174 let mut in_stack = None;
175 let mut stack_trace_buffer = Vec::new();
176 loop {
177 let start = buffer.len();
178 if stream
179 .read_until(b'\n', &mut buffer)
180 .await
181 .context("error reading from stream")?
182 == 0
183 {
184 bail!("stream closed unexpectedly")
185 }
186 if buffer.len() - start == MARKER.len() + 2
187 && &buffer[start..buffer.len() - 2] == MARKER
188 {
189 buffer.pop();
191 match buffer.pop() {
193 Some(b'B') => {
194 stack_trace_buffer.clear();
195 buffer.truncate(start);
196 nesting += 1;
197 in_stack = None;
198 continue;
199 }
200 Some(b'E') => {
201 buffer.truncate(start);
202 if let Some(in_stack) = in_stack {
203 if nesting != 0 {
204 stack_trace_buffer = buffer[in_stack..].to_vec();
205 }
206 buffer.truncate(in_stack);
207 }
208 nesting = nesting.saturating_sub(1);
209 in_stack = None;
210 if nesting == 0 {
211 let line = Arc::from(take(&mut buffer).into_boxed_slice());
212 let stack_trace = if stack_trace_buffer.is_empty() {
213 None
214 } else {
215 Some(Arc::from(take(&mut stack_trace_buffer).into_boxed_slice()))
216 };
217 let entry = OutputEntry {
218 data: line,
219 stack_trace,
220 };
221 let occurrence_number = *own_output
222 .entry(entry.clone())
223 .and_modify(|c| *c += 1)
224 .or_insert(0);
225 let new_entry = {
226 let mut shared = shared.lock();
227 shared.insert((entry.clone(), occurrence_number))
228 };
229 if !new_entry {
230 continue;
233 }
234 write_source_mapped_final(
235 &entry.data,
236 **assets_for_source_mapping,
237 root.clone(),
238 project_dir.clone(),
239 final_stream,
240 )
241 .await?;
242 }
243 }
244 Some(b'S') => {
245 buffer.truncate(start);
246 in_stack = Some(start);
247 continue;
248 }
249 Some(b'D') => {
250 break;
252 }
253 _ => {}
254 }
255 }
256 if nesting != 0 {
257 continue;
259 }
260
261 write_source_mapped_final(
262 &buffer,
263 **assets_for_source_mapping,
264 root.clone(),
265 project_dir.clone(),
266 final_stream,
267 )
268 .await?;
269 buffer.clear();
270 }
271 Ok(())
272 }
273}
274
275impl NodeJsPoolProcess {
276 async fn new(
277 cwd: &Path,
278 env: &FxHashMap<RcStr, RcStr>,
279 entrypoint: &Path,
280 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
281 assets_root: FileSystemPath,
282 project_dir: FileSystemPath,
283 shared_stdout: SharedOutputSet,
284 shared_stderr: SharedOutputSet,
285 debug: bool,
286 ) -> Result<Self> {
287 let guard = duration_span!("Node.js process startup");
288 let listener = TcpListener::bind("127.0.0.1:0")
289 .await
290 .context("binding to a port")?;
291 let port = listener.local_addr().context("getting port")?.port();
292 let mut cmd = Command::new("node");
293 cmd.current_dir(cwd);
294 if debug {
295 cmd.arg("--inspect-brk");
296 }
297 cmd.arg(entrypoint);
298 cmd.arg(port.to_string());
299 cmd.env_clear();
300 cmd.env(
301 "PATH",
302 std::env::var("PATH").expect("the PATH environment variable should always be set"),
303 );
304 #[cfg(target_family = "windows")]
305 cmd.env(
306 "SystemRoot",
307 std::env::var("SystemRoot")
308 .expect("the SystemRoot environment variable should always be set"),
309 );
310 cmd.envs(env);
311 cmd.stderr(Stdio::piped());
312 cmd.stdout(Stdio::piped());
313 cmd.kill_on_drop(true);
314
315 let mut child = cmd.spawn().context("spawning node pooled process")?;
316
317 let timeout = if debug {
318 Duration::MAX
319 } else {
320 CONNECT_TIMEOUT
321 };
322
323 async fn get_output(child: &mut Child) -> Result<(String, String)> {
324 let mut stdout = Vec::new();
325 let mut stderr = Vec::new();
326 child
327 .stdout
328 .take()
329 .unwrap()
330 .read_to_end(&mut stdout)
331 .await?;
332 child
333 .stderr
334 .take()
335 .unwrap()
336 .read_to_end(&mut stderr)
337 .await?;
338 fn clean(buffer: Vec<u8>) -> Result<String> {
339 Ok(String::from_utf8(buffer)?
340 .lines()
341 .filter(|line| {
342 line.len() != MARKER_STR.len() + 1 && !line.starts_with(MARKER_STR)
343 })
344 .collect::<Vec<_>>()
345 .join("\n"))
346 }
347 Ok((clean(stdout)?, clean(stderr)?))
348 }
349
350 let (connection, _) = select! {
351 connection = listener.accept() => connection.context("accepting connection")?,
352 status = child.wait() => {
353 match status {
354 Ok(status) => {
355 let (stdout, stderr) = get_output(&mut child).await?;
356 bail!("node process exited before we could connect to it with {status}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
357 }
358 Err(err) => {
359 let _ = child.start_kill();
360 let (stdout, stderr) = get_output(&mut child).await?;
361 bail!("node process exited before we could connect to it: {err:?}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
362 },
363 }
364 },
365 _ = sleep(timeout) => {
366 let _ = child.start_kill();
367 let (stdout, stderr) = get_output(&mut child).await?;
368 bail!("timed out waiting for the Node.js process to connect ({timeout:?} timeout)\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
369 },
370 };
371 connection.set_nodelay(true)?;
372
373 let child_stdout = BufReader::new(child.stdout.take().unwrap());
374 let child_stderr = BufReader::new(child.stderr.take().unwrap());
375
376 let stdout_handler = OutputStreamHandler {
377 stream: child_stdout,
378 shared: shared_stdout,
379 assets_for_source_mapping,
380 root: assets_root.clone(),
381 project_dir: project_dir.clone(),
382 final_stream: stdout(),
383 };
384 let stderr_handler = OutputStreamHandler {
385 stream: child_stderr,
386 shared: shared_stderr,
387 assets_for_source_mapping,
388 root: assets_root.clone(),
389 project_dir: project_dir.clone(),
390 final_stream: stderr(),
391 };
392
393 let mut process = Self {
394 child: Some(child),
395 connection,
396 stdout_handler,
397 stderr_handler,
398 debug,
399 cpu_time_invested: Duration::ZERO,
400 };
401
402 drop(guard);
403
404 let guard = duration_span!("Node.js initialization");
405 let ready_signal = process.recv().await?;
406
407 if !ready_signal.is_empty() {
408 bail!(
409 "Node.js process didn't send the expected ready signal\nOutput:\n{}",
410 String::from_utf8_lossy(&ready_signal)
411 );
412 }
413
414 drop(guard);
415
416 Ok(process)
417 }
418
419 async fn recv(&mut self) -> Result<Vec<u8>> {
420 let connection = &mut self.connection;
421 async fn with_timeout<T, E: Into<anyhow::Error>>(
422 debug: bool,
423 fast: bool,
424 future: impl Future<Output = Result<T, E>> + Send,
425 ) -> Result<T> {
426 if debug {
427 future.await.map_err(Into::into)
428 } else {
429 let time = if fast {
430 Duration::from_secs(20)
431 } else {
432 Duration::from_secs(5 * 60)
433 };
434 timeout(time, future)
435 .await
436 .context("timeout while receiving message from process")?
437 .map_err(Into::into)
438 }
439 }
440 let debug = self.debug;
441 let recv_future = async move {
442 let packet_len = with_timeout(debug, false, connection.read_u32())
443 .await
444 .context("reading packet length")?
445 .try_into()
446 .context("storing packet length")?;
447 let mut packet_data = vec![0; packet_len];
448 with_timeout(debug, true, connection.read_exact(&mut packet_data))
449 .await
450 .context("reading packet data")?;
451 Ok::<_, anyhow::Error>(packet_data)
452 };
453 let (result, stdout, stderr) = join!(
454 recv_future,
455 self.stdout_handler.handle_operation(),
456 self.stderr_handler.handle_operation(),
457 );
458 let result = result?;
459 stdout.context("unable to handle stdout from the Node.js process in a structured way")?;
460 stderr.context("unable to handle stderr from the Node.js process in a structured way")?;
461 Ok(result)
462 }
463
464 async fn send(&mut self, packet_data: Vec<u8>) -> Result<()> {
465 self.connection
466 .write_u32(
467 packet_data
468 .len()
469 .try_into()
470 .context("packet length does not fit into u32")?,
471 )
472 .await
473 .context("writing packet length")?;
474 self.connection
475 .write_all(&packet_data)
476 .await
477 .context("writing packet data")?;
478 self.connection
479 .flush()
480 .await
481 .context("flushing packet data")?;
482 Ok(())
483 }
484}
485
486type IdleProcessQueues = Mutex<Vec<Arc<HeapQueue<NodeJsPoolProcess>>>>;
487
488static ACTIVE_POOLS: Lazy<IdleProcessQueues> = Lazy::new(Default::default);
491
492struct ProcessArgs {
496 cwd: PathBuf,
497 env: FxHashMap<RcStr, RcStr>,
498 entrypoint: PathBuf,
499 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
500 assets_root: FileSystemPath,
501 project_dir: FileSystemPath,
502 shared_stdout: SharedOutputSet,
503 shared_stderr: SharedOutputSet,
504 debug: bool,
505}
506
507impl ProcessArgs {
508 async fn create_process(self) -> Result<(NodeJsPoolProcess, Duration)> {
509 let start = Instant::now();
510 let process = NodeJsPoolProcess::new(
511 &self.cwd,
512 &self.env,
513 &self.entrypoint,
514 self.assets_for_source_mapping,
515 self.assets_root,
516 self.project_dir,
517 self.shared_stdout,
518 self.shared_stderr,
519 self.debug,
520 )
521 .await?;
522 Ok((process, start.elapsed()))
523 }
524}
525
526#[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)]
536pub struct ChildProcessPool {
537 cwd: PathBuf,
538 entrypoint: PathBuf,
539 env: FxHashMap<RcStr, RcStr>,
540 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
541 pub assets_root: FileSystemPath,
542 pub project_dir: FileSystemPath,
543 #[turbo_tasks(trace_ignore, debug_ignore)]
544 idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
545 #[turbo_tasks(trace_ignore, debug_ignore)]
547 concurrency_semaphore: Arc<Semaphore>,
548 #[turbo_tasks(trace_ignore, debug_ignore)]
551 bootup_semaphore: Arc<Semaphore>,
552 #[turbo_tasks(trace_ignore, debug_ignore)]
553 shared_stdout: SharedOutputSet,
554 #[turbo_tasks(trace_ignore, debug_ignore)]
555 shared_stderr: SharedOutputSet,
556 debug: bool,
557 #[turbo_tasks(trace_ignore, debug_ignore)]
558 stats: Arc<Mutex<NodeJsPoolStats>>,
559}
560
561impl ChildProcessPool {
562 pub fn create(
565 cwd: PathBuf,
566 entrypoint: PathBuf,
567 env: FxHashMap<RcStr, RcStr>,
568 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
569 assets_root: FileSystemPath,
570 project_dir: FileSystemPath,
571 concurrency: usize,
572 debug: bool,
573 ) -> EvaluatePool {
574 EvaluatePool::new(
575 Box::new(Self {
576 cwd,
577 entrypoint,
578 env,
579 assets_for_source_mapping,
580 assets_root: assets_root.clone(),
581 project_dir: project_dir.clone(),
582 concurrency_semaphore: Arc::new(Semaphore::new(if debug {
583 1
584 } else {
585 concurrency
586 })),
587 bootup_semaphore: Arc::new(Semaphore::new(1)),
588 idle_processes: Arc::new(HeapQueue::new()),
589 shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())),
590 shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())),
591 debug,
592 stats: Default::default(),
593 }),
594 assets_for_source_mapping,
595 assets_root,
596 project_dir,
597 )
598 }
599}
600
601#[turbo_tasks::value(shared)]
602pub(crate) struct ChildProcessesBackend;
603
604#[turbo_tasks::value_impl]
605impl NodeBackend for ChildProcessesBackend {
606 fn runtime_module_path(&self) -> RcStr {
607 rcstr!("child_process/evaluate.ts")
608 }
609
610 fn globals_module_path(&self) -> RcStr {
611 rcstr!("child_process/globals.ts")
612 }
613
614 fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
615 Box::pin(async move {
616 let CreatePoolOptions {
617 cwd,
618 entrypoint,
619 env,
620 assets_for_source_mapping,
621 assets_root,
622 project_dir,
623 concurrency,
624 debug,
625 } = options;
626
627 Ok(ChildProcessPool::create(
628 cwd,
629 entrypoint,
630 env,
631 assets_for_source_mapping,
632 assets_root,
633 project_dir,
634 concurrency,
635 debug,
636 ))
637 })
638 }
639
640 fn scale_down(&self) -> Result<()> {
641 ChildProcessPool::scale_down();
642 Ok(())
643 }
644
645 fn scale_zero(&self) -> Result<()> {
646 ChildProcessPool::scale_zero();
647 Ok(())
648 }
649}
650
651#[async_trait::async_trait]
652impl EvaluateOperation for ChildProcessPool {
653 async fn operation(&self) -> Result<Box<dyn Operation>> {
654 let operation = {
657 let _guard = duration_span!("Node.js operation");
658 let (process, permits) = self.acquire_process().await?;
659 ChildProcessOperation {
660 process: Some(process),
661 permits,
662 idle_processes: self.idle_processes.clone(),
663 start: Instant::now(),
664 stats: self.stats.clone(),
665 allow_process_reuse: true,
666 }
667 };
668
669 Ok(Box::new(operation))
670 }
671
672 fn stats(&self) -> PoolStatsSnapshot {
674 self.stats.lock().snapshot()
675 }
676
677 fn pre_warm(&self) {
682 let args = self.process_args();
683 let bootup_semaphore = self.bootup_semaphore.clone();
684 let idle_processes = self.idle_processes.clone();
685 let stats = self.stats.clone();
686
687 tokio::spawn(async move {
688 let Ok(bootup_permit) = bootup_semaphore.clone().acquire_owned().await else {
689 return;
690 };
691 {
692 stats.lock().add_booting_worker();
693 }
694 match args.create_process().await {
695 Ok((process, bootup_time)) => {
696 {
697 let mut s = stats.lock();
698 s.add_bootup_time(bootup_time);
699 s.finished_booting_worker();
700 }
701 drop(bootup_permit);
702 idle_processes.push(process, &ACTIVE_POOLS);
703 }
704 Err(_e) => {
705 let mut s = stats.lock();
706 s.finished_booting_worker();
707 s.remove_worker();
708 }
709 }
710 });
711 }
712}
713
714impl ChildProcessPool {
715 async fn acquire_process(&self) -> Result<(NodeJsPoolProcess, AcquiredPermits)> {
716 {
717 self.stats.lock().add_queued_task();
718 }
719
720 let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
721
722 let bootup = async {
723 let permit = self.bootup_semaphore.clone().acquire_owned().await;
724 let wait_time = self.stats.lock().wait_time_before_bootup();
725 tokio::time::sleep(wait_time).await;
726 permit
727 };
728
729 select! {
730 idle_process_result = self.idle_processes.pop(&ACTIVE_POOLS) => {
731 let process = idle_process_result.context("acquiring idle process permit")?;
732 Ok((process, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
733 },
734 bootup_permit = bootup => {
735 let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
736 {
737 self.stats.lock().add_booting_worker();
738 }
739 let (process, bootup_time) = self.create_process().await?;
740 {
742 let mut stats = self.stats.lock();
743 stats.add_bootup_time(bootup_time);
744 stats.finished_booting_worker();
745 }
746 self.bootup_semaphore.add_permits(1);
748 Ok((process, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
749 }
750 }
751 }
752
753 fn process_args(&self) -> ProcessArgs {
754 ProcessArgs {
755 cwd: self.cwd.clone(),
756 env: self.env.clone(),
757 entrypoint: self.entrypoint.clone(),
758 assets_for_source_mapping: self.assets_for_source_mapping,
759 assets_root: self.assets_root.clone(),
760 project_dir: self.project_dir.clone(),
761 shared_stdout: self.shared_stdout.clone(),
762 shared_stderr: self.shared_stderr.clone(),
763 debug: self.debug,
764 }
765 }
766
767 async fn create_process(&self) -> Result<(NodeJsPoolProcess, Duration), anyhow::Error> {
768 self.process_args()
769 .create_process()
770 .await
771 .context("creating new process")
772 }
773
774 pub fn scale_down() {
775 let pools = ACTIVE_POOLS.lock().clone();
776 for pool in pools {
777 pool.reduce_to_one();
778 }
779 }
780
781 pub fn scale_zero() {
782 let pools = ACTIVE_POOLS.lock().clone();
783 for pool in pools {
784 pool.reduce_to_zero(&ACTIVE_POOLS);
785 }
786 }
787}
788
789pub struct ChildProcessOperation {
790 process: Option<NodeJsPoolProcess>,
791 #[allow(dead_code)]
793 permits: AcquiredPermits,
794 idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
795 start: Instant,
796 stats: Arc<Mutex<NodeJsPoolStats>>,
797 allow_process_reuse: bool,
798}
799
800#[async_trait::async_trait]
801impl Operation for ChildProcessOperation {
802 async fn recv(&mut self) -> Result<Vec<u8>> {
803 let vec = self
804 .with_process(|process| async move {
805 process.recv().await.context("failed to receive message")
806 })
807 .await?;
808 Ok(vec)
809 }
810
811 async fn send(&mut self, message: Vec<u8>) -> Result<()> {
812 self.with_process(|process| async move {
813 timeout(Duration::from_secs(30), process.send(message))
814 .await
815 .context("timeout while sending message")?
816 .context("failed to send message")?;
817 Ok(())
818 })
819 .await
820 }
821
822 async fn wait_or_kill(&mut self) -> Result<ExitStatus> {
823 let mut process = self
824 .process
825 .take()
826 .context("Node.js operation already finished")?;
827
828 if self.allow_process_reuse {
829 self.stats.lock().remove_worker();
830 }
831
832 let mut child = process
833 .child
834 .take()
835 .context("Node.js operation already finished")?;
836
837 let _ = child.start_kill();
839 let status = timeout(Duration::from_secs(30), child.wait())
840 .await
841 .context("timeout while waiting for process end")?
842 .context("waiting for process end")?;
843
844 Ok(status)
845 }
846
847 fn disallow_reuse(&mut self) {
848 if self.allow_process_reuse {
849 self.stats.lock().remove_worker();
850 self.allow_process_reuse = false;
851 }
852 }
853}
854
855impl ChildProcessOperation {
856 async fn with_process<'a, F: Future<Output = Result<T>> + Send + 'a, T>(
857 &'a mut self,
858 f: impl FnOnce(&'a mut NodeJsPoolProcess) -> F,
859 ) -> Result<T> {
860 let process = self
861 .process
862 .as_mut()
863 .context("Node.js operation already finished")?;
864
865 if !self.allow_process_reuse {
866 bail!("Node.js process is no longer usable");
867 }
868
869 let result = f(process).await;
870 if result.is_err() && self.allow_process_reuse {
871 self.stats.lock().remove_worker();
872 self.allow_process_reuse = false;
873 }
874 result
875 }
876}
877
878impl Drop for ChildProcessOperation {
879 fn drop(&mut self) {
880 if let Some(mut process) = self.process.take() {
881 let elapsed = self.start.elapsed();
882 {
883 let stats = &mut self.stats.lock();
884 match self.permits {
885 AcquiredPermits::Idle { .. } => stats.add_warm_process_time(elapsed),
886 AcquiredPermits::Fresh { .. } => stats.add_cold_process_time(elapsed),
887 }
888 }
889 if self.allow_process_reuse {
890 process.cpu_time_invested += elapsed;
891 self.idle_processes.push(process, &ACTIVE_POOLS);
892 }
893 }
894 }
895}