1use std::{
2 future::Future,
3 mem::take,
4 path::{Path, PathBuf},
5 process::{ExitStatus, Stdio},
6 sync::{Arc, LazyLock},
7 time::{Duration, Instant},
8};
9
10use anyhow::{Context, Result, bail};
11use bytes::Bytes;
12use futures::join;
13use owo_colors::OwoColorize;
14use parking_lot::Mutex;
15use rustc_hash::FxHashMap;
16use tokio::{
17 io::{
18 AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stderr,
19 Stdout, stderr, stdout,
20 },
21 net::{TcpListener, TcpStream},
22 process::{Child, ChildStderr, ChildStdout, Command},
23 select,
24 sync::Semaphore,
25 time::{sleep, timeout},
26};
27use turbo_rcstr::{RcStr, rcstr};
28use turbo_tasks::{FxIndexSet, ResolvedVc, Vc, duration_span};
29use turbo_tasks_fs::FileSystemPath;
30use turbopack_ecmascript::magic_identifier::unmangle_identifiers;
31
32use crate::{
33 AssetsForSourceMapping,
34 backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
35 evaluate::{EvaluateOperation, EvaluatePool, Operation},
36 format::FormattingMode,
37 pool_stats::{AcquiredPermits, NodeJsPoolStats, PoolStatsSnapshot},
38 source_map::apply_source_mapping,
39};
40
41mod heap_queue;
42use heap_queue::HeapQueue;
43
44struct NodeJsPoolProcess {
45 child: Option<Child>,
46 connection: TcpStream,
47 stdout_handler: OutputStreamHandler<ChildStdout, Stdout>,
48 stderr_handler: OutputStreamHandler<ChildStderr, Stderr>,
49 debug: bool,
50 cpu_time_invested: Duration,
51}
52
53impl Ord for NodeJsPoolProcess {
54 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
55 self.cpu_time_invested
56 .cmp(&other.cpu_time_invested)
57 .then_with(|| {
58 self.child
59 .as_ref()
60 .map(|c| c.id())
61 .cmp(&other.child.as_ref().map(|c| c.id()))
62 })
63 }
64}
65
66impl PartialOrd for NodeJsPoolProcess {
67 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
68 Some(self.cmp(other))
69 }
70}
71
72impl Eq for NodeJsPoolProcess {}
73
74impl PartialEq for NodeJsPoolProcess {
75 fn eq(&self, other: &Self) -> bool {
76 self.cmp(other) == std::cmp::Ordering::Equal
77 }
78}
79
80const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
81
82#[derive(Clone, PartialEq, Eq, Hash)]
83struct OutputEntry {
84 data: Arc<[u8]>,
85 stack_trace: Option<Arc<[u8]>>,
86}
87
88type SharedOutputSet = Arc<Mutex<FxIndexSet<(OutputEntry, u32)>>>;
89
90static GLOBAL_OUTPUT_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
91static MARKER: &[u8] = b"TURBOPACK_OUTPUT_";
92static MARKER_STR: &str = "TURBOPACK_OUTPUT_";
93
94struct OutputStreamHandler<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
95 stream: BufReader<R>,
96 shared: SharedOutputSet,
97 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
98 root: FileSystemPath,
99 project_dir: FileSystemPath,
100 final_stream: W,
101}
102
103impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> OutputStreamHandler<R, W> {
104 pub async fn handle_operation(&mut self) -> Result<()> {
109 let Self {
110 stream,
111 shared,
112 assets_for_source_mapping,
113 root,
114 project_dir,
115 final_stream,
116 } = self;
117
118 async fn write_final<W: AsyncWrite + Unpin>(
119 mut bytes: &[u8],
120 final_stream: &mut W,
121 ) -> Result<()> {
122 let _lock = GLOBAL_OUTPUT_LOCK.lock().await;
123 while !bytes.is_empty() {
124 let count = final_stream.write(bytes).await?;
125 if count == 0 {
126 bail!("Failed to write to final stream as it was closed");
127 }
128 bytes = &bytes[count..];
129 }
130 Ok(())
131 }
132
133 async fn write_source_mapped_final<W: AsyncWrite + Unpin>(
134 bytes: &[u8],
135 assets_for_source_mapping: Vc<AssetsForSourceMapping>,
136 root: FileSystemPath,
137 project_dir: FileSystemPath,
138 final_stream: &mut W,
139 ) -> Result<()> {
140 if let Ok(text) = std::str::from_utf8(bytes) {
141 let text = unmangle_identifiers(text, |content| {
142 format!("{{{content}}}").italic().to_string()
143 });
144 match apply_source_mapping(
145 text.as_ref(),
146 assets_for_source_mapping,
147 root,
148 project_dir,
149 FormattingMode::AnsiColors,
150 )
151 .await
152 {
153 Err(e) => {
154 write_final(
155 format!("Error applying source mapping: {e}\n").as_bytes(),
156 final_stream,
157 )
158 .await?;
159 write_final(text.as_bytes(), final_stream).await?;
160 }
161 Ok(text) => {
162 write_final(text.as_bytes(), final_stream).await?;
163 }
164 }
165 } else {
166 write_final(bytes, final_stream).await?;
167 }
168 Ok(())
169 }
170
171 let mut buffer = Vec::new();
172 let mut own_output = FxHashMap::default();
173 let mut nesting: u32 = 0;
174 let mut in_stack = None;
175 let mut stack_trace_buffer = Vec::new();
176 loop {
177 let start = buffer.len();
178 if stream
179 .read_until(b'\n', &mut buffer)
180 .await
181 .context("error reading from stream")?
182 == 0
183 {
184 bail!("stream closed unexpectedly")
185 }
186 if buffer.len() - start == MARKER.len() + 2
187 && &buffer[start..buffer.len() - 2] == MARKER
188 {
189 buffer.pop();
191 match buffer.pop() {
193 Some(b'B') => {
194 stack_trace_buffer.clear();
195 buffer.truncate(start);
196 nesting += 1;
197 in_stack = None;
198 continue;
199 }
200 Some(b'E') => {
201 buffer.truncate(start);
202 if let Some(in_stack) = in_stack {
203 if nesting != 0 {
204 stack_trace_buffer = buffer[in_stack..].to_vec();
205 }
206 buffer.truncate(in_stack);
207 }
208 nesting = nesting.saturating_sub(1);
209 in_stack = None;
210 if nesting == 0 {
211 let line = Arc::from(take(&mut buffer).into_boxed_slice());
212 let stack_trace = if stack_trace_buffer.is_empty() {
213 None
214 } else {
215 Some(Arc::from(take(&mut stack_trace_buffer).into_boxed_slice()))
216 };
217 let entry = OutputEntry {
218 data: line,
219 stack_trace,
220 };
221 let occurrence_number = *own_output
222 .entry(entry.clone())
223 .and_modify(|c| *c += 1)
224 .or_insert(0);
225 let new_entry = {
226 let mut shared = shared.lock();
227 shared.insert((entry.clone(), occurrence_number))
228 };
229 if !new_entry {
230 continue;
233 }
234 write_source_mapped_final(
235 &entry.data,
236 **assets_for_source_mapping,
237 root.clone(),
238 project_dir.clone(),
239 final_stream,
240 )
241 .await?;
242 }
243 }
244 Some(b'S') => {
245 buffer.truncate(start);
246 in_stack = Some(start);
247 continue;
248 }
249 Some(b'D') => {
250 break;
252 }
253 _ => {}
254 }
255 }
256 if nesting != 0 {
257 continue;
259 }
260
261 write_source_mapped_final(
262 &buffer,
263 **assets_for_source_mapping,
264 root.clone(),
265 project_dir.clone(),
266 final_stream,
267 )
268 .await?;
269 buffer.clear();
270 }
271 Ok(())
272 }
273}
274
275impl NodeJsPoolProcess {
276 async fn new(
277 cwd: &Path,
278 env: &FxHashMap<RcStr, RcStr>,
279 entrypoint: &Path,
280 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
281 assets_root: FileSystemPath,
282 project_dir: FileSystemPath,
283 shared_stdout: SharedOutputSet,
284 shared_stderr: SharedOutputSet,
285 debug: bool,
286 ) -> Result<Self> {
287 let guard = duration_span!("Node.js process startup");
288 let listener = TcpListener::bind("127.0.0.1:0")
289 .await
290 .context("binding to a port")?;
291 let port = listener.local_addr().context("getting port")?.port();
292 let mut cmd = Command::new("node");
293 cmd.current_dir(cwd);
294 if debug {
295 cmd.arg("--inspect-brk");
296 }
297 cmd.arg(entrypoint);
298 cmd.arg(port.to_string());
299 cmd.env_clear();
300 cmd.env(
301 "PATH",
302 std::env::var("PATH").expect("the PATH environment variable should always be set"),
303 );
304 #[cfg(target_family = "windows")]
305 cmd.env(
306 "SystemRoot",
307 std::env::var("SystemRoot")
308 .expect("the SystemRoot environment variable should always be set"),
309 );
310 cmd.envs(env);
311 cmd.stderr(Stdio::piped());
312 cmd.stdout(Stdio::piped());
313 cmd.kill_on_drop(true);
314
315 let mut child = cmd.spawn().context("spawning node pooled process")?;
316
317 let timeout = if debug {
318 Duration::MAX
319 } else {
320 CONNECT_TIMEOUT
321 };
322
323 async fn get_output(child: &mut Child) -> Result<(String, String)> {
324 let mut stdout = Vec::new();
325 let mut stderr = Vec::new();
326 child
327 .stdout
328 .take()
329 .unwrap()
330 .read_to_end(&mut stdout)
331 .await?;
332 child
333 .stderr
334 .take()
335 .unwrap()
336 .read_to_end(&mut stderr)
337 .await?;
338 fn clean(buffer: Vec<u8>) -> Result<String> {
339 Ok(String::from_utf8(buffer)?
340 .lines()
341 .filter(|line| {
342 line.len() != MARKER_STR.len() + 1 && !line.starts_with(MARKER_STR)
343 })
344 .collect::<Vec<_>>()
345 .join("\n"))
346 }
347 Ok((clean(stdout)?, clean(stderr)?))
348 }
349
350 let (connection, _) = select! {
351 connection = listener.accept() => connection.context("accepting connection")?,
352 status = child.wait() => {
353 match status {
354 Ok(status) => {
355 let (stdout, stderr) = get_output(&mut child).await?;
356 bail!("node process exited before we could connect to it with {status}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
357 }
358 Err(err) => {
359 let _ = child.start_kill();
360 let (stdout, stderr) = get_output(&mut child).await?;
361 bail!("node process exited before we could connect to it: {err:?}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
362 },
363 }
364 },
365 _ = sleep(timeout) => {
366 let _ = child.start_kill();
367 let (stdout, stderr) = get_output(&mut child).await?;
368 bail!("timed out waiting for the Node.js process to connect ({timeout:?} timeout)\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
369 },
370 };
371 connection.set_nodelay(true)?;
372
373 let child_stdout = BufReader::new(child.stdout.take().unwrap());
374 let child_stderr = BufReader::new(child.stderr.take().unwrap());
375
376 let stdout_handler = OutputStreamHandler {
377 stream: child_stdout,
378 shared: shared_stdout,
379 assets_for_source_mapping,
380 root: assets_root.clone(),
381 project_dir: project_dir.clone(),
382 final_stream: stdout(),
383 };
384 let stderr_handler = OutputStreamHandler {
385 stream: child_stderr,
386 shared: shared_stderr,
387 assets_for_source_mapping,
388 root: assets_root.clone(),
389 project_dir: project_dir.clone(),
390 final_stream: stderr(),
391 };
392
393 let mut process = Self {
394 child: Some(child),
395 connection,
396 stdout_handler,
397 stderr_handler,
398 debug,
399 cpu_time_invested: Duration::ZERO,
400 };
401
402 drop(guard);
403
404 let guard = duration_span!("Node.js initialization");
405 let ready_signal = process.recv().await?;
406
407 if !ready_signal.is_empty() {
408 bail!(
409 "Node.js process didn't send the expected ready signal\nOutput:\n{}",
410 String::from_utf8_lossy(&ready_signal)
411 );
412 }
413
414 drop(guard);
415
416 Ok(process)
417 }
418
419 async fn recv(&mut self) -> Result<Bytes> {
420 let connection = &mut self.connection;
421 async fn with_timeout<T, E: Into<anyhow::Error>>(
422 debug: bool,
423 fast: bool,
424 future: impl Future<Output = Result<T, E>> + Send,
425 ) -> Result<T> {
426 if debug {
427 future.await.map_err(Into::into)
428 } else {
429 let time = if fast {
430 Duration::from_secs(20)
431 } else {
432 Duration::from_secs(5 * 60)
433 };
434 timeout(time, future)
435 .await
436 .context("timeout while receiving message from process")?
437 .map_err(Into::into)
438 }
439 }
440 let debug = self.debug;
441 let recv_future = async move {
442 let packet_len = with_timeout(debug, false, connection.read_u32())
443 .await
444 .context("reading packet length")?
445 .try_into()
446 .context("storing packet length")?;
447 let mut packet_data = vec![0; packet_len];
448 with_timeout(debug, true, connection.read_exact(&mut packet_data))
449 .await
450 .context("reading packet data")?;
451 Ok::<_, anyhow::Error>(packet_data.into())
452 };
453 let (result, stdout, stderr) = join!(
454 recv_future,
455 self.stdout_handler.handle_operation(),
456 self.stderr_handler.handle_operation(),
457 );
458 let result: Bytes = result?;
459 stdout.context("unable to handle stdout from the Node.js process in a structured way")?;
460 stderr.context("unable to handle stderr from the Node.js process in a structured way")?;
461 Ok(result)
462 }
463 async fn send(&mut self, packet_data: Bytes) -> Result<()> {
464 self.connection
465 .write_u32(
466 packet_data
467 .len()
468 .try_into()
469 .context("packet length does not fit into u32")?,
470 )
471 .await
472 .context("writing packet length")?;
473 self.connection
474 .write_all(&packet_data)
475 .await
476 .context("writing packet data")?;
477 self.connection
478 .flush()
479 .await
480 .context("flushing packet data")?;
481 Ok(())
482 }
483}
484
485type IdleProcessQueues = Mutex<Vec<Arc<HeapQueue<NodeJsPoolProcess>>>>;
486
487static ACTIVE_POOLS: LazyLock<IdleProcessQueues> = LazyLock::new(Default::default);
490
491struct ProcessArgs {
495 cwd: PathBuf,
496 env: FxHashMap<RcStr, RcStr>,
497 entrypoint: PathBuf,
498 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
499 assets_root: FileSystemPath,
500 project_dir: FileSystemPath,
501 shared_stdout: SharedOutputSet,
502 shared_stderr: SharedOutputSet,
503 debug: bool,
504}
505
506impl ProcessArgs {
507 async fn create_process(self) -> Result<(NodeJsPoolProcess, Duration)> {
508 let start = Instant::now();
509 let process = NodeJsPoolProcess::new(
510 &self.cwd,
511 &self.env,
512 &self.entrypoint,
513 self.assets_for_source_mapping,
514 self.assets_root,
515 self.project_dir,
516 self.shared_stdout,
517 self.shared_stderr,
518 self.debug,
519 )
520 .await?;
521 Ok((process, start.elapsed()))
522 }
523}
524
525#[turbo_tasks::value(
534 cell = "new",
535 serialization = "skip",
536 evict = "last",
537 eq = "manual",
538 shared
539)]
540pub struct ChildProcessPool {
541 cwd: PathBuf,
542 entrypoint: PathBuf,
543 env: FxHashMap<RcStr, RcStr>,
544 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
545 pub assets_root: FileSystemPath,
546 pub project_dir: FileSystemPath,
547 #[turbo_tasks(trace_ignore, debug_ignore)]
548 idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
549 #[turbo_tasks(trace_ignore, debug_ignore)]
551 concurrency_semaphore: Arc<Semaphore>,
552 #[turbo_tasks(trace_ignore, debug_ignore)]
555 bootup_semaphore: Arc<Semaphore>,
556 #[turbo_tasks(trace_ignore, debug_ignore)]
557 shared_stdout: SharedOutputSet,
558 #[turbo_tasks(trace_ignore, debug_ignore)]
559 shared_stderr: SharedOutputSet,
560 debug: bool,
561 #[turbo_tasks(trace_ignore, debug_ignore)]
562 stats: Arc<Mutex<NodeJsPoolStats>>,
563}
564
565impl ChildProcessPool {
566 pub fn create(
569 cwd: PathBuf,
570 entrypoint: PathBuf,
571 env: FxHashMap<RcStr, RcStr>,
572 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
573 assets_root: FileSystemPath,
574 project_dir: FileSystemPath,
575 concurrency: usize,
576 debug: bool,
577 ) -> EvaluatePool {
578 EvaluatePool::new(
579 Box::new(Self {
580 cwd,
581 entrypoint,
582 env,
583 assets_for_source_mapping,
584 assets_root: assets_root.clone(),
585 project_dir: project_dir.clone(),
586 concurrency_semaphore: Arc::new(Semaphore::new(if debug {
587 1
588 } else {
589 concurrency
590 })),
591 bootup_semaphore: Arc::new(Semaphore::new(1)),
592 idle_processes: Arc::new(HeapQueue::new()),
593 shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())),
594 shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())),
595 debug,
596 stats: Default::default(),
597 }),
598 assets_for_source_mapping,
599 assets_root,
600 project_dir,
601 )
602 }
603}
604
605#[turbo_tasks::value(shared)]
606pub(crate) struct ChildProcessesBackend;
607
608#[turbo_tasks::value_impl]
609impl NodeBackend for ChildProcessesBackend {
610 fn runtime_module_path(&self) -> RcStr {
611 rcstr!("child_process/evaluate.ts")
612 }
613
614 fn globals_module_path(&self) -> RcStr {
615 rcstr!("child_process/globals.ts")
616 }
617
618 fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
619 Box::pin(async move {
620 let CreatePoolOptions {
621 cwd,
622 entrypoint,
623 env,
624 assets_for_source_mapping,
625 assets_root,
626 project_dir,
627 concurrency,
628 debug,
629 } = options;
630
631 Ok(ChildProcessPool::create(
632 cwd,
633 entrypoint,
634 env,
635 assets_for_source_mapping,
636 assets_root,
637 project_dir,
638 concurrency,
639 debug,
640 ))
641 })
642 }
643
644 fn scale_down(&self) -> Result<()> {
645 ChildProcessPool::scale_down();
646 Ok(())
647 }
648
649 fn scale_zero(&self) -> Result<()> {
650 ChildProcessPool::scale_zero();
651 Ok(())
652 }
653}
654
655#[async_trait::async_trait]
656impl EvaluateOperation for ChildProcessPool {
657 async fn operation(&self) -> Result<Box<dyn Operation>> {
658 let operation = {
661 let _guard = duration_span!("Node.js operation");
662 let (process, permits) = self.acquire_process().await?;
663 ChildProcessOperation {
664 process: Some(process),
665 permits,
666 idle_processes: self.idle_processes.clone(),
667 start: Instant::now(),
668 stats: self.stats.clone(),
669 allow_process_reuse: true,
670 }
671 };
672
673 Ok(Box::new(operation))
674 }
675
676 fn stats(&self) -> PoolStatsSnapshot {
678 self.stats.lock().snapshot()
679 }
680
681 fn pre_warm(&self) {
686 let args = self.process_args();
687 let bootup_semaphore = self.bootup_semaphore.clone();
688 let idle_processes = self.idle_processes.clone();
689 let stats = self.stats.clone();
690
691 tokio::spawn(async move {
692 let Ok(bootup_permit) = bootup_semaphore.clone().acquire_owned().await else {
693 return;
694 };
695 {
696 stats.lock().add_booting_worker();
697 }
698 match args.create_process().await {
699 Ok((process, bootup_time)) => {
700 {
701 let mut s = stats.lock();
702 s.add_bootup_time(bootup_time);
703 s.finished_booting_worker();
704 }
705 drop(bootup_permit);
706 idle_processes.push(process, &ACTIVE_POOLS);
707 }
708 Err(_e) => {
709 let mut s = stats.lock();
710 s.finished_booting_worker();
711 s.remove_worker();
712 }
713 }
714 });
715 }
716}
717
718impl ChildProcessPool {
719 async fn acquire_process(&self) -> Result<(NodeJsPoolProcess, AcquiredPermits)> {
720 {
721 self.stats.lock().add_queued_task();
722 }
723
724 let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
725
726 let bootup = async {
727 let permit = self.bootup_semaphore.clone().acquire_owned().await;
728 let wait_time = self.stats.lock().wait_time_before_bootup();
729 tokio::time::sleep(wait_time).await;
730 permit
731 };
732
733 select! {
734 idle_process_result = self.idle_processes.pop(&ACTIVE_POOLS) => {
735 let process = idle_process_result.context("acquiring idle process permit")?;
736 Ok((process, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
737 },
738 bootup_permit = bootup => {
739 let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
740 {
741 self.stats.lock().add_booting_worker();
742 }
743 let (process, bootup_time) = self.create_process().await?;
744 {
746 let mut stats = self.stats.lock();
747 stats.add_bootup_time(bootup_time);
748 stats.finished_booting_worker();
749 }
750 self.bootup_semaphore.add_permits(1);
752 Ok((process, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
753 }
754 }
755 }
756
757 fn process_args(&self) -> ProcessArgs {
758 ProcessArgs {
759 cwd: self.cwd.clone(),
760 env: self.env.clone(),
761 entrypoint: self.entrypoint.clone(),
762 assets_for_source_mapping: self.assets_for_source_mapping,
763 assets_root: self.assets_root.clone(),
764 project_dir: self.project_dir.clone(),
765 shared_stdout: self.shared_stdout.clone(),
766 shared_stderr: self.shared_stderr.clone(),
767 debug: self.debug,
768 }
769 }
770
771 async fn create_process(&self) -> Result<(NodeJsPoolProcess, Duration), anyhow::Error> {
772 self.process_args()
773 .create_process()
774 .await
775 .context("creating new process")
776 }
777
778 pub fn scale_down() {
779 let pools = ACTIVE_POOLS.lock().clone();
780 for pool in pools {
781 pool.reduce_to_one();
782 }
783 }
784
785 pub fn scale_zero() {
786 let pools = ACTIVE_POOLS.lock().clone();
787 for pool in pools {
788 pool.reduce_to_zero(&ACTIVE_POOLS);
789 }
790 }
791}
792
793pub struct ChildProcessOperation {
794 process: Option<NodeJsPoolProcess>,
795 #[allow(dead_code)]
797 permits: AcquiredPermits,
798 idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
799 start: Instant,
800 stats: Arc<Mutex<NodeJsPoolStats>>,
801 allow_process_reuse: bool,
802}
803
804#[async_trait::async_trait]
805impl Operation for ChildProcessOperation {
806 async fn recv(&mut self) -> Result<Bytes> {
807 let bytes = self
808 .with_process(|process| async move {
809 process.recv().await.context("failed to receive message")
810 })
811 .await?;
812 Ok(bytes)
813 }
814
815 async fn send(&mut self, message: Bytes) -> Result<()> {
816 self.with_process(|process| async move {
817 timeout(Duration::from_secs(30), process.send(message))
818 .await
819 .context("timeout while sending message")?
820 .context("failed to send message")?;
821 Ok(())
822 })
823 .await
824 }
825
826 async fn wait_or_kill(&mut self) -> Result<ExitStatus> {
827 let mut process = self
828 .process
829 .take()
830 .context("Node.js operation already finished")?;
831
832 if self.allow_process_reuse {
833 self.stats.lock().remove_worker();
834 }
835
836 let mut child = process
837 .child
838 .take()
839 .context("Node.js operation already finished")?;
840
841 let _ = child.start_kill();
843 let status = timeout(Duration::from_secs(30), child.wait())
844 .await
845 .context("timeout while waiting for process end")?
846 .context("waiting for process end")?;
847
848 Ok(status)
849 }
850
851 fn disallow_reuse(&mut self) {
852 if self.allow_process_reuse {
853 self.stats.lock().remove_worker();
854 self.allow_process_reuse = false;
855 }
856 }
857}
858
859impl ChildProcessOperation {
860 async fn with_process<'a, F: Future<Output = Result<T>> + Send + 'a, T>(
861 &'a mut self,
862 f: impl FnOnce(&'a mut NodeJsPoolProcess) -> F,
863 ) -> Result<T> {
864 let process = self
865 .process
866 .as_mut()
867 .context("Node.js operation already finished")?;
868
869 if !self.allow_process_reuse {
870 bail!("Node.js process is no longer usable");
871 }
872
873 let result = f(process).await;
874 if result.is_err() && self.allow_process_reuse {
875 self.stats.lock().remove_worker();
876 self.allow_process_reuse = false;
877 }
878 result
879 }
880}
881
882impl Drop for ChildProcessOperation {
883 fn drop(&mut self) {
884 if let Some(mut process) = self.process.take() {
885 let elapsed = self.start.elapsed();
886 {
887 let stats = &mut self.stats.lock();
888 match self.permits {
889 AcquiredPermits::Idle { .. } => stats.add_warm_process_time(elapsed),
890 AcquiredPermits::Fresh { .. } => stats.add_cold_process_time(elapsed),
891 }
892 }
893 if self.allow_process_reuse {
894 process.cpu_time_invested += elapsed;
895 self.idle_processes.push(process, &ACTIVE_POOLS);
896 }
897 }
898 }
899}