1use std::{
2 collections::VecDeque,
3 future::Future,
4 mem::take,
5 path::{Path, PathBuf},
6 process::{ExitStatus, Stdio},
7 sync::{Arc, LazyLock},
8 time::{Duration, Instant},
9};
10
11use anyhow::{Context, Result, bail};
12use bytes::Bytes;
13use futures::join;
14use owo_colors::OwoColorize;
15use parking_lot::Mutex;
16use rustc_hash::FxHashMap;
17use tokio::{
18 io::{
19 AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stderr,
20 Stdout, stderr, stdout,
21 },
22 net::{TcpListener, TcpStream},
23 process::{Child, ChildStderr, ChildStdout, Command},
24 select,
25 sync::Semaphore,
26 time::{sleep, timeout},
27};
28use turbo_rcstr::{RcStr, rcstr};
29use turbo_tasks::{FxIndexSet, ResolvedVc, Vc, duration_span};
30use turbo_tasks_fs::FileSystemPath;
31use turbopack_ecmascript::magic_identifier::unmangle_identifiers;
32
33use crate::{
34 AssetsForSourceMapping,
35 backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
36 evaluate::{EvaluateOperation, EvaluatePool, Operation},
37 format::FormattingMode,
38 pool_stats::{AcquiredPermits, NodeJsPoolStats, PoolStatsSnapshot},
39 source_map::apply_source_mapping,
40};
41
42mod heap_queue;
43use heap_queue::HeapQueue;
44
45struct NodeJsPoolProcess {
46 child: Option<Child>,
47 connection: TcpStream,
48 stdout_handler: OutputStreamHandler<ChildStdout, Stdout>,
49 stderr_handler: OutputStreamHandler<ChildStderr, Stderr>,
50 recent_stdout: RecentLines,
54 recent_stderr: RecentLines,
55 debug: bool,
56 cpu_time_invested: Duration,
57}
58
59impl Ord for NodeJsPoolProcess {
60 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61 self.cpu_time_invested
62 .cmp(&other.cpu_time_invested)
63 .then_with(|| {
64 self.child
65 .as_ref()
66 .map(|c| c.id())
67 .cmp(&other.child.as_ref().map(|c| c.id()))
68 })
69 }
70}
71
72impl PartialOrd for NodeJsPoolProcess {
73 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
74 Some(self.cmp(other))
75 }
76}
77
78impl Eq for NodeJsPoolProcess {}
79
80impl PartialEq for NodeJsPoolProcess {
81 fn eq(&self, other: &Self) -> bool {
82 self.cmp(other) == std::cmp::Ordering::Equal
83 }
84}
85
86const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
87
88#[derive(Clone, PartialEq, Eq, Hash)]
89struct OutputEntry {
90 data: Arc<[u8]>,
91 stack_trace: Option<Arc<[u8]>>,
92}
93
94type SharedOutputSet = Arc<Mutex<FxIndexSet<(OutputEntry, u32)>>>;
95
96static GLOBAL_OUTPUT_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
97static MARKER: &[u8] = b"TURBOPACK_OUTPUT_";
98static MARKER_STR: &str = "TURBOPACK_OUTPUT_";
99
100const RECENT_OUTPUT_LINES: usize = 100;
104
105type RecentLines = Arc<Mutex<VecDeque<Vec<u8>>>>;
110
111struct OutputStreamHandler<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
112 stream: BufReader<R>,
113 shared: SharedOutputSet,
114 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
115 root: FileSystemPath,
116 project_dir: FileSystemPath,
117 final_stream: W,
118 recent_lines: RecentLines,
119}
120
121impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> OutputStreamHandler<R, W> {
122 pub async fn handle_operation(&mut self) -> Result<()> {
127 let Self {
128 stream,
129 shared,
130 assets_for_source_mapping,
131 root,
132 project_dir,
133 final_stream,
134 recent_lines,
135 } = self;
136
137 async fn write_final<W: AsyncWrite + Unpin>(
138 mut bytes: &[u8],
139 final_stream: &mut W,
140 ) -> Result<()> {
141 let _lock = GLOBAL_OUTPUT_LOCK.lock().await;
142 while !bytes.is_empty() {
143 let count = final_stream.write(bytes).await?;
144 if count == 0 {
145 bail!("Failed to write to final stream as it was closed");
146 }
147 bytes = &bytes[count..];
148 }
149 Ok(())
150 }
151
152 async fn write_source_mapped_final<W: AsyncWrite + Unpin>(
153 bytes: &[u8],
154 assets_for_source_mapping: Vc<AssetsForSourceMapping>,
155 root: FileSystemPath,
156 project_dir: FileSystemPath,
157 final_stream: &mut W,
158 ) -> Result<()> {
159 if let Ok(text) = std::str::from_utf8(bytes) {
160 let text = unmangle_identifiers(text, |content| {
161 format!("{{{content}}}").italic().to_string()
162 });
163 match apply_source_mapping(
164 text.as_ref(),
165 assets_for_source_mapping,
166 root,
167 project_dir,
168 FormattingMode::AnsiColors,
169 )
170 .await
171 {
172 Err(e) => {
173 write_final(
174 format!("Error applying source mapping: {e}\n").as_bytes(),
175 final_stream,
176 )
177 .await?;
178 write_final(text.as_bytes(), final_stream).await?;
179 }
180 Ok(text) => {
181 write_final(text.as_bytes(), final_stream).await?;
182 }
183 }
184 } else {
185 write_final(bytes, final_stream).await?;
186 }
187 Ok(())
188 }
189
190 let mut buffer = Vec::new();
191 let mut own_output = FxHashMap::default();
192 let mut nesting: u32 = 0;
193 let mut in_stack = None;
194 let mut stack_trace_buffer = Vec::new();
195 loop {
196 let start = buffer.len();
197 if stream
198 .read_until(b'\n', &mut buffer)
199 .await
200 .context("error reading from stream")?
201 == 0
202 {
203 bail!("stream closed unexpectedly")
204 }
205 let line_is_marker = buffer.len() - start == MARKER.len() + 2
208 && &buffer[start..buffer.len() - 2] == MARKER;
209 if !line_is_marker {
210 let mut lines = recent_lines.lock();
211 if lines.len() >= RECENT_OUTPUT_LINES {
212 lines.pop_front();
213 }
214 lines.push_back(buffer[start..].to_vec());
215 } else {
216 buffer.pop();
218 match buffer.pop() {
220 Some(b'B') => {
221 stack_trace_buffer.clear();
222 buffer.truncate(start);
223 nesting += 1;
224 in_stack = None;
225 continue;
226 }
227 Some(b'E') => {
228 buffer.truncate(start);
229 if let Some(in_stack) = in_stack {
230 if nesting != 0 {
231 stack_trace_buffer = buffer[in_stack..].to_vec();
232 }
233 buffer.truncate(in_stack);
234 }
235 nesting = nesting.saturating_sub(1);
236 in_stack = None;
237 if nesting == 0 {
238 let line = Arc::from(take(&mut buffer).into_boxed_slice());
239 let stack_trace = if stack_trace_buffer.is_empty() {
240 None
241 } else {
242 Some(Arc::from(take(&mut stack_trace_buffer).into_boxed_slice()))
243 };
244 let entry = OutputEntry {
245 data: line,
246 stack_trace,
247 };
248 let occurrence_number = *own_output
249 .entry(entry.clone())
250 .and_modify(|c| *c += 1)
251 .or_insert(0);
252 let new_entry = {
253 let mut shared = shared.lock();
254 shared.insert((entry.clone(), occurrence_number))
255 };
256 if !new_entry {
257 continue;
260 }
261 write_source_mapped_final(
262 &entry.data,
263 **assets_for_source_mapping,
264 root.clone(),
265 project_dir.clone(),
266 final_stream,
267 )
268 .await?;
269 }
270 }
271 Some(b'S') => {
272 buffer.truncate(start);
273 in_stack = Some(start);
274 continue;
275 }
276 Some(b'D') => {
277 break;
279 }
280 _ => {}
281 }
282 }
283 if nesting != 0 {
284 continue;
286 }
287
288 write_source_mapped_final(
289 &buffer,
290 **assets_for_source_mapping,
291 root.clone(),
292 project_dir.clone(),
293 final_stream,
294 )
295 .await?;
296 buffer.clear();
297 }
298 Ok(())
299 }
300}
301
302impl NodeJsPoolProcess {
303 async fn new(
304 cwd: &Path,
305 env: &FxHashMap<RcStr, RcStr>,
306 entrypoint: &Path,
307 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
308 assets_root: FileSystemPath,
309 project_dir: FileSystemPath,
310 shared_stdout: SharedOutputSet,
311 shared_stderr: SharedOutputSet,
312 debug: bool,
313 ) -> Result<Self> {
314 let guard = duration_span!("Node.js process startup");
315 let listener = TcpListener::bind("127.0.0.1:0")
316 .await
317 .context("binding to a port")?;
318 let port = listener.local_addr().context("getting port")?.port();
319 let mut cmd = Command::new("node");
320 cmd.current_dir(cwd);
321 if debug {
322 cmd.arg("--inspect-brk");
323 }
324 cmd.arg(entrypoint);
325 cmd.arg(port.to_string());
326 cmd.env_clear();
327 cmd.env(
328 "PATH",
329 std::env::var("PATH").expect("the PATH environment variable should always be set"),
330 );
331 #[cfg(target_family = "windows")]
332 cmd.env(
333 "SystemRoot",
334 std::env::var("SystemRoot")
335 .expect("the SystemRoot environment variable should always be set"),
336 );
337 cmd.envs(env);
338 cmd.stderr(Stdio::piped());
339 cmd.stdout(Stdio::piped());
340 cmd.kill_on_drop(true);
341
342 let mut child = cmd.spawn().context("spawning node pooled process")?;
343
344 let timeout = if debug {
345 Duration::MAX
346 } else {
347 CONNECT_TIMEOUT
348 };
349
350 async fn get_output(child: &mut Child) -> Result<(String, String)> {
351 let mut stdout = Vec::new();
352 let mut stderr = Vec::new();
353 child
354 .stdout
355 .take()
356 .unwrap()
357 .read_to_end(&mut stdout)
358 .await?;
359 child
360 .stderr
361 .take()
362 .unwrap()
363 .read_to_end(&mut stderr)
364 .await?;
365 fn clean(buffer: Vec<u8>) -> Result<String> {
366 Ok(String::from_utf8(buffer)?
367 .lines()
368 .filter(|line| {
369 line.len() != MARKER_STR.len() + 1 && !line.starts_with(MARKER_STR)
370 })
371 .collect::<Vec<_>>()
372 .join("\n"))
373 }
374 Ok((clean(stdout)?, clean(stderr)?))
375 }
376
377 let (connection, _) = select! {
378 connection = listener.accept() => connection.context("accepting connection")?,
379 status = child.wait() => {
380 match status {
381 Ok(status) => {
382 let (stdout, stderr) = get_output(&mut child).await?;
383 bail!("node process exited before we could connect to it with {status}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
384 }
385 Err(err) => {
386 let _ = child.start_kill();
387 let (stdout, stderr) = get_output(&mut child).await?;
388 bail!("node process exited before we could connect to it: {err:?}\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
389 },
390 }
391 },
392 _ = sleep(timeout) => {
393 let _ = child.start_kill();
394 let (stdout, stderr) = get_output(&mut child).await?;
395 bail!("timed out waiting for the Node.js process to connect ({timeout:?} timeout)\nProcess output:\n{stdout}\nProcess error output:\n{stderr}");
396 },
397 };
398 connection.set_nodelay(true)?;
399
400 let child_stdout = BufReader::new(child.stdout.take().unwrap());
401 let child_stderr = BufReader::new(child.stderr.take().unwrap());
402
403 let recent_stdout: RecentLines =
404 Arc::new(Mutex::new(VecDeque::with_capacity(RECENT_OUTPUT_LINES)));
405 let recent_stderr: RecentLines =
406 Arc::new(Mutex::new(VecDeque::with_capacity(RECENT_OUTPUT_LINES)));
407
408 let stdout_handler = OutputStreamHandler {
409 stream: child_stdout,
410 shared: shared_stdout,
411 assets_for_source_mapping,
412 root: assets_root.clone(),
413 project_dir: project_dir.clone(),
414 final_stream: stdout(),
415 recent_lines: recent_stdout.clone(),
416 };
417 let stderr_handler = OutputStreamHandler {
418 stream: child_stderr,
419 shared: shared_stderr,
420 assets_for_source_mapping,
421 root: assets_root.clone(),
422 project_dir: project_dir.clone(),
423 final_stream: stderr(),
424 recent_lines: recent_stderr.clone(),
425 };
426
427 let mut process = Self {
428 child: Some(child),
429 connection,
430 stdout_handler,
431 stderr_handler,
432 recent_stdout,
433 recent_stderr,
434 debug,
435 cpu_time_invested: Duration::ZERO,
436 };
437
438 drop(guard);
439
440 let guard = duration_span!("Node.js initialization");
441 let ready_signal = process.recv().await?;
442
443 if !ready_signal.is_empty() {
444 bail!(
445 "Node.js process didn't send the expected ready signal\nOutput:\n{}",
446 String::from_utf8_lossy(&ready_signal)
447 );
448 }
449
450 drop(guard);
451
452 Ok(process)
453 }
454
455 async fn recv(&mut self) -> Result<Bytes> {
456 let connection = &mut self.connection;
457 async fn with_timeout<T, E: Into<anyhow::Error>>(
458 debug: bool,
459 fast: bool,
460 future: impl Future<Output = Result<T, E>> + Send,
461 ) -> Result<T> {
462 if debug {
463 future.await.map_err(Into::into)
464 } else {
465 let time = if fast {
466 Duration::from_secs(20)
467 } else {
468 Duration::from_secs(5 * 60)
469 };
470 timeout(time, future)
471 .await
472 .context("timeout while receiving message from process")?
473 .map_err(Into::into)
474 }
475 }
476 let debug = self.debug;
477 let recv_future = async move {
478 let packet_len = with_timeout(debug, false, connection.read_u32())
479 .await
480 .context("reading packet length")?
481 .try_into()
482 .context("storing packet length")?;
483 let mut packet_data = vec![0; packet_len];
484 with_timeout(debug, true, connection.read_exact(&mut packet_data))
485 .await
486 .context("reading packet data")?;
487 Ok::<_, anyhow::Error>(packet_data.into())
488 };
489 let (result, stdout, stderr) = join!(
490 recv_future,
491 self.stdout_handler.handle_operation(),
492 self.stderr_handler.handle_operation(),
493 );
494 let result = match result {
495 Err(err) => {
496 return Err(self.diagnostic_recv_error(err).await);
503 }
504 Ok(result) => result,
505 };
506 stdout.context("unable to handle stdout from the Node.js process in a structured way")?;
507 stderr.context("unable to handle stderr from the Node.js process in a structured way")?;
508 Ok(result)
509 }
510
511 async fn diagnostic_recv_error(&mut self, err: anyhow::Error) -> anyhow::Error {
515 let exit_status = match self.child.as_mut() {
516 Some(child) => match timeout(Duration::from_secs(2), child.wait()).await {
517 Ok(Ok(status)) => Some(status),
518 _ => None,
519 },
520 None => None,
521 };
522 let mut output = String::new();
523 let stdout_lines: Vec<Vec<u8>> = self.recent_stdout.lock().iter().cloned().collect();
524 let stderr_lines: Vec<Vec<u8>> = self.recent_stderr.lock().iter().cloned().collect();
525 if !stdout_lines.is_empty() {
526 output.push_str("\nRecent process stdout:\n");
527 for line in stdout_lines {
528 output.push_str(&String::from_utf8_lossy(&line));
529 }
530 }
531 if !stderr_lines.is_empty() {
532 output.push_str("\nRecent process stderr:\n");
533 for line in stderr_lines {
534 output.push_str(&String::from_utf8_lossy(&line));
535 }
536 }
537 let exit_note = match exit_status {
538 Some(status) => format!("Node.js process exited with {status}"),
539 None => "Node.js process is still running or its exit status is unavailable".into(),
540 };
541 err.context(format!("{exit_note}{output}"))
543 }
544 async fn send(&mut self, packet_data: Bytes) -> Result<()> {
545 self.connection
546 .write_u32(
547 packet_data
548 .len()
549 .try_into()
550 .context("packet length does not fit into u32")?,
551 )
552 .await
553 .context("writing packet length")?;
554 self.connection
555 .write_all(&packet_data)
556 .await
557 .context("writing packet data")?;
558 self.connection
559 .flush()
560 .await
561 .context("flushing packet data")?;
562 Ok(())
563 }
564}
565
566type IdleProcessQueues = Mutex<Vec<Arc<HeapQueue<NodeJsPoolProcess>>>>;
567
568static ACTIVE_POOLS: LazyLock<IdleProcessQueues> = LazyLock::new(Default::default);
571
572struct ProcessArgs {
576 cwd: PathBuf,
577 env: FxHashMap<RcStr, RcStr>,
578 entrypoint: PathBuf,
579 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
580 assets_root: FileSystemPath,
581 project_dir: FileSystemPath,
582 shared_stdout: SharedOutputSet,
583 shared_stderr: SharedOutputSet,
584 debug: bool,
585}
586
587impl ProcessArgs {
588 async fn create_process(self) -> Result<(NodeJsPoolProcess, Duration)> {
589 let start = Instant::now();
590 let process = NodeJsPoolProcess::new(
591 &self.cwd,
592 &self.env,
593 &self.entrypoint,
594 self.assets_for_source_mapping,
595 self.assets_root,
596 self.project_dir,
597 self.shared_stdout,
598 self.shared_stderr,
599 self.debug,
600 )
601 .await?;
602 Ok((process, start.elapsed()))
603 }
604}
605
606#[turbo_tasks::value(
615 cell = "new",
616 serialization = "skip",
617 evict = "last",
618 eq = "manual",
619 shared
620)]
621pub struct ChildProcessPool {
622 cwd: PathBuf,
623 entrypoint: PathBuf,
624 env: FxHashMap<RcStr, RcStr>,
625 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
626 pub assets_root: FileSystemPath,
627 pub project_dir: FileSystemPath,
628 #[turbo_tasks(trace_ignore, debug_ignore)]
629 idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
630 #[turbo_tasks(trace_ignore, debug_ignore)]
632 concurrency_semaphore: Arc<Semaphore>,
633 #[turbo_tasks(trace_ignore, debug_ignore)]
636 bootup_semaphore: Arc<Semaphore>,
637 #[turbo_tasks(trace_ignore, debug_ignore)]
638 shared_stdout: SharedOutputSet,
639 #[turbo_tasks(trace_ignore, debug_ignore)]
640 shared_stderr: SharedOutputSet,
641 debug: bool,
642 #[turbo_tasks(trace_ignore, debug_ignore)]
643 stats: Arc<Mutex<NodeJsPoolStats>>,
644}
645
646impl ChildProcessPool {
647 pub fn create(
650 cwd: PathBuf,
651 entrypoint: PathBuf,
652 env: FxHashMap<RcStr, RcStr>,
653 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
654 assets_root: FileSystemPath,
655 project_dir: FileSystemPath,
656 concurrency: usize,
657 debug: bool,
658 ) -> EvaluatePool {
659 EvaluatePool::new(
660 Box::new(Self {
661 cwd,
662 entrypoint,
663 env,
664 assets_for_source_mapping,
665 assets_root: assets_root.clone(),
666 project_dir: project_dir.clone(),
667 concurrency_semaphore: Arc::new(Semaphore::new(if debug {
668 1
669 } else {
670 concurrency
671 })),
672 bootup_semaphore: Arc::new(Semaphore::new(1)),
673 idle_processes: Arc::new(HeapQueue::new()),
674 shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())),
675 shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())),
676 debug,
677 stats: Default::default(),
678 }),
679 assets_for_source_mapping,
680 assets_root,
681 project_dir,
682 )
683 }
684}
685
686#[turbo_tasks::value(shared)]
687pub(crate) struct ChildProcessesBackend;
688
689#[turbo_tasks::value_impl]
690impl NodeBackend for ChildProcessesBackend {
691 fn runtime_module_path(&self) -> RcStr {
692 rcstr!("child_process/evaluate.ts")
693 }
694
695 fn globals_module_path(&self) -> RcStr {
696 rcstr!("child_process/globals.ts")
697 }
698
699 fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
700 Box::pin(async move {
701 let CreatePoolOptions {
702 cwd,
703 entrypoint,
704 env,
705 assets_for_source_mapping,
706 assets_root,
707 project_dir,
708 concurrency,
709 debug,
710 } = options;
711
712 Ok(ChildProcessPool::create(
713 cwd,
714 entrypoint,
715 env,
716 assets_for_source_mapping,
717 assets_root,
718 project_dir,
719 concurrency,
720 debug,
721 ))
722 })
723 }
724
725 fn scale_down(&self) -> Result<()> {
726 ChildProcessPool::scale_down();
727 Ok(())
728 }
729
730 fn scale_zero(&self) -> Result<()> {
731 ChildProcessPool::scale_zero();
732 Ok(())
733 }
734}
735
736#[async_trait::async_trait]
737impl EvaluateOperation for ChildProcessPool {
738 async fn operation(&self) -> Result<Box<dyn Operation>> {
739 let operation = {
742 let _guard = duration_span!("Node.js operation");
743 let (process, permits) = self.acquire_process().await?;
744 ChildProcessOperation {
745 process: Some(process),
746 permits,
747 idle_processes: self.idle_processes.clone(),
748 start: Instant::now(),
749 stats: self.stats.clone(),
750 allow_process_reuse: true,
751 }
752 };
753
754 Ok(Box::new(operation))
755 }
756
757 fn stats(&self) -> PoolStatsSnapshot {
759 self.stats.lock().snapshot()
760 }
761
762 fn pre_warm(&self) {
767 let args = self.process_args();
768 let bootup_semaphore = self.bootup_semaphore.clone();
769 let idle_processes = self.idle_processes.clone();
770 let stats = self.stats.clone();
771
772 tokio::spawn(async move {
773 let Ok(bootup_permit) = bootup_semaphore.clone().acquire_owned().await else {
774 return;
775 };
776 {
777 stats.lock().add_booting_worker();
778 }
779 match args.create_process().await {
780 Ok((process, bootup_time)) => {
781 {
782 let mut s = stats.lock();
783 s.add_bootup_time(bootup_time);
784 s.finished_booting_worker();
785 }
786 drop(bootup_permit);
787 idle_processes.push(process, &ACTIVE_POOLS);
788 }
789 Err(_e) => {
790 let mut s = stats.lock();
791 s.finished_booting_worker();
792 s.remove_worker();
793 }
794 }
795 });
796 }
797}
798
799impl ChildProcessPool {
800 async fn acquire_process(&self) -> Result<(NodeJsPoolProcess, AcquiredPermits)> {
801 {
802 self.stats.lock().add_queued_task();
803 }
804
805 let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
806
807 let bootup = async {
808 let permit = self.bootup_semaphore.clone().acquire_owned().await;
809 let wait_time = self.stats.lock().wait_time_before_bootup();
810 tokio::time::sleep(wait_time).await;
811 permit
812 };
813
814 select! {
815 idle_process_result = self.idle_processes.pop(&ACTIVE_POOLS) => {
816 let process = idle_process_result.context("acquiring idle process permit")?;
817 Ok((process, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
818 },
819 bootup_permit = bootup => {
820 let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
821 {
822 self.stats.lock().add_booting_worker();
823 }
824 let (process, bootup_time) = self.create_process().await?;
825 {
827 let mut stats = self.stats.lock();
828 stats.add_bootup_time(bootup_time);
829 stats.finished_booting_worker();
830 }
831 self.bootup_semaphore.add_permits(1);
833 Ok((process, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
834 }
835 }
836 }
837
838 fn process_args(&self) -> ProcessArgs {
839 ProcessArgs {
840 cwd: self.cwd.clone(),
841 env: self.env.clone(),
842 entrypoint: self.entrypoint.clone(),
843 assets_for_source_mapping: self.assets_for_source_mapping,
844 assets_root: self.assets_root.clone(),
845 project_dir: self.project_dir.clone(),
846 shared_stdout: self.shared_stdout.clone(),
847 shared_stderr: self.shared_stderr.clone(),
848 debug: self.debug,
849 }
850 }
851
852 async fn create_process(&self) -> Result<(NodeJsPoolProcess, Duration), anyhow::Error> {
853 self.process_args()
854 .create_process()
855 .await
856 .context("creating new process")
857 }
858
859 pub fn scale_down() {
860 let pools = ACTIVE_POOLS.lock().clone();
861 for pool in pools {
862 pool.reduce_to_one();
863 }
864 }
865
866 pub fn scale_zero() {
867 let pools = ACTIVE_POOLS.lock().clone();
868 for pool in pools {
869 pool.reduce_to_zero(&ACTIVE_POOLS);
870 }
871 }
872}
873
874pub struct ChildProcessOperation {
875 process: Option<NodeJsPoolProcess>,
876 #[allow(dead_code)]
878 permits: AcquiredPermits,
879 idle_processes: Arc<HeapQueue<NodeJsPoolProcess>>,
880 start: Instant,
881 stats: Arc<Mutex<NodeJsPoolStats>>,
882 allow_process_reuse: bool,
883}
884
885#[async_trait::async_trait]
886impl Operation for ChildProcessOperation {
887 async fn recv(&mut self) -> Result<Bytes> {
888 let bytes = self
889 .with_process(|process| async move {
890 process.recv().await.context("failed to receive message")
891 })
892 .await?;
893 Ok(bytes)
894 }
895
896 async fn send(&mut self, message: Bytes) -> Result<()> {
897 self.with_process(|process| async move {
898 timeout(Duration::from_secs(30), process.send(message))
899 .await
900 .context("timeout while sending message")?
901 .context("failed to send message")?;
902 Ok(())
903 })
904 .await
905 }
906
907 async fn wait_or_kill(&mut self) -> Result<ExitStatus> {
908 let mut process = self
909 .process
910 .take()
911 .context("Node.js operation already finished")?;
912
913 if self.allow_process_reuse {
914 self.stats.lock().remove_worker();
915 }
916
917 let mut child = process
918 .child
919 .take()
920 .context("Node.js operation already finished")?;
921
922 let _ = child.start_kill();
924 let status = timeout(Duration::from_secs(30), child.wait())
925 .await
926 .context("timeout while waiting for process end")?
927 .context("waiting for process end")?;
928
929 Ok(status)
930 }
931
932 fn disallow_reuse(&mut self) {
933 if self.allow_process_reuse {
934 self.stats.lock().remove_worker();
935 self.allow_process_reuse = false;
936 }
937 }
938}
939
940impl ChildProcessOperation {
941 async fn with_process<'a, F: Future<Output = Result<T>> + Send + 'a, T>(
942 &'a mut self,
943 f: impl FnOnce(&'a mut NodeJsPoolProcess) -> F,
944 ) -> Result<T> {
945 let process = self
946 .process
947 .as_mut()
948 .context("Node.js operation already finished")?;
949
950 if !self.allow_process_reuse {
951 bail!("Node.js process is no longer usable");
952 }
953
954 let result = f(process).await;
955 if result.is_err() && self.allow_process_reuse {
956 self.stats.lock().remove_worker();
957 self.allow_process_reuse = false;
958 }
959 result
960 }
961}
962
963impl Drop for ChildProcessOperation {
964 fn drop(&mut self) {
965 if let Some(mut process) = self.process.take() {
966 let elapsed = self.start.elapsed();
967 {
968 let stats = &mut self.stats.lock();
969 match self.permits {
970 AcquiredPermits::Idle { .. } => stats.add_warm_process_time(elapsed),
971 AcquiredPermits::Fresh { .. } => stats.add_cold_process_time(elapsed),
972 }
973 }
974 if self.allow_process_reuse {
975 process.cpu_time_invested += elapsed;
976 self.idle_processes.push(process, &ACTIVE_POOLS);
977 }
978 }
979 }
980}