1use std::{iter, process::ExitStatus, time::Duration};
2
3use anyhow::{Result, bail};
4use async_trait::async_trait;
5use bincode::{Decode, Encode};
6use bytes::Bytes;
7use futures_retry::{FutureRetry, RetryPolicy};
8use serde::{Deserialize, Serialize, de::DeserializeOwned};
9use serde_json::Value as JsonValue;
10use turbo_rcstr::{RcStr, rcstr};
11use turbo_tasks::{
12 Completion, FxIndexMap, OperationVc, PrettyPrintError, ResolvedVc, TryJoinIterExt, Vc,
13 duration_span, fxindexmap, parallel::available_parallelism,
14 resolve_strongly_consistent_and_take_and_apply_effects, trace::TraceRawVcs,
15};
16use turbo_tasks_env::{EnvMap, ProcessEnv};
17use turbo_tasks_fs::{File, FileContent, FileSystemPath, to_sys_path};
18use turbopack_core::{
19 asset::AssetContent,
20 changed::content_changed,
21 chunk::{ChunkingContext, ChunkingContextExt, EvaluatableAsset, EvaluatableAssets},
22 context::AssetContext,
23 file_source::FileSource,
24 ident::AssetIdent,
25 issue::{Issue, IssueExt, IssueSource, IssueStage, StyledString},
26 module::Module,
27 module_graph::{
28 GraphEntries, ModuleGraph,
29 chunk_group_info::{ChunkGroup, ChunkGroupEntry},
30 },
31 output::{OutputAsset, OutputAssets},
32 reference_type::{InnerAssets, ReferenceType},
33 source::Source,
34 virtual_source::VirtualSource,
35};
36
37use crate::{
38 AssetsForSourceMapping,
39 backend::{CreatePoolOptions, NodeBackend},
40 embed_js::embed_file_path,
41 emit, emit_package_json,
42 format::FormattingMode,
43 internal_assets_for_source_mapping,
44 pool_stats::PoolStatsSnapshot,
45 source_map::StructuredError,
46};
47
48#[derive(Serialize)]
49#[serde(tag = "type", rename_all = "camelCase")]
50enum EvalJavaScriptOutgoingMessage<'a> {
51 #[serde(rename_all = "camelCase")]
52 Evaluate { args: Vec<&'a JsonValue> },
53 Result {
54 id: u64,
55 data: Option<JsonValue>,
56 error: Option<String>,
57 },
58}
59
60#[derive(Deserialize, Debug)]
61#[serde(tag = "type", rename_all = "camelCase")]
62enum EvalJavaScriptIncomingMessage {
63 Info { data: JsonValue },
64 Request { id: u64, data: JsonValue },
65 End { data: Option<String> },
66 Error(StructuredError),
67}
68
69#[turbo_tasks::value(
70 cell = "new",
71 serialization = "skip",
72 evict = "last",
73 eq = "manual",
74 shared
75)]
76pub struct EvaluatePool {
77 #[turbo_tasks(trace_ignore, debug_ignore)]
78 pool: Box<dyn EvaluateOperation>,
79 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
80 pub assets_root: FileSystemPath,
81 pub project_dir: FileSystemPath,
82}
83
84impl EvaluatePool {
85 pub(crate) fn new(
86 pool: Box<dyn EvaluateOperation>,
87 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
88 assets_root: FileSystemPath,
89 project_dir: FileSystemPath,
90 ) -> Self {
91 Self {
92 pool,
93 assets_for_source_mapping,
94 assets_root,
95 project_dir,
96 }
97 }
98
99 pub async fn operation(&self) -> Result<Box<dyn Operation>> {
100 self.pool.operation().await
101 }
102
103 pub fn stats(&self) -> PoolStatsSnapshot {
104 self.pool.stats()
105 }
106
107 pub fn pre_warm(&self) {
108 self.pool.pre_warm()
109 }
110}
111
112#[async_trait::async_trait]
113pub trait EvaluateOperation: Send + Sync {
114 async fn operation(&self) -> Result<Box<dyn Operation>>;
115 fn stats(&self) -> PoolStatsSnapshot;
116 fn pre_warm(&self);
122}
123
124#[async_trait::async_trait]
125pub trait Operation: Send {
126 async fn recv(&mut self) -> Result<Bytes>;
127
128 async fn send(&mut self, data: Bytes) -> Result<()>;
129
130 async fn wait_or_kill(&mut self) -> Result<ExitStatus>;
131
132 fn disallow_reuse(&mut self) -> ();
133}
134
135#[turbo_tasks::value]
136struct EmittedEvaluatePoolAssets {
137 bootstrap: ResolvedVc<Box<dyn OutputAsset>>,
138 output_root: FileSystemPath,
139 entrypoint: FileSystemPath,
140}
141
142#[turbo_tasks::function(operation, root)]
143async fn emit_evaluate_pool_assets_operation(
144 entries: ResolvedVc<EvaluateEntries>,
145 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
146 module_graph: ResolvedVc<ModuleGraph>,
147) -> Result<Vc<EmittedEvaluatePoolAssets>> {
148 let EvaluateEntries {
149 entries,
150 main_entry_ident,
151 } = &*entries.await?;
152
153 let entrypoint = chunking_context
154 .chunk_path(
155 None,
156 **main_entry_ident,
157 Some(rcstr!("pool_entry")),
158 rcstr!(".js"),
159 )
160 .owned()
161 .await?;
162
163 let bootstrap = chunking_context.root_entry_chunk_group_asset(
164 entrypoint.clone(),
165 ChunkGroup::Entry(entries.iter().cloned().map(ResolvedVc::upcast).collect()),
166 *module_graph,
167 OutputAssets::empty(),
168 OutputAssets::empty(),
169 );
170
171 let output_root = chunking_context.output_root().owned().await?;
172 emit_package_json(output_root.clone())?
173 .as_side_effect()
174 .await?;
175 emit(bootstrap, output_root.clone())
176 .as_side_effect()
177 .await?;
178
179 Ok(EmittedEvaluatePoolAssets {
180 bootstrap: bootstrap.to_resolved().await?,
181 output_root,
182 entrypoint,
183 }
184 .cell())
185}
186
187#[turbo_tasks::function(operation, root, session_dependent)]
188async fn create_evaluate_pool_assets_operation(
189 entries: ResolvedVc<EvaluateEntries>,
190 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
191 module_graph: ResolvedVc<ModuleGraph>,
192) -> Result<Vc<EmittedEvaluatePoolAssets>> {
193 let operation = emit_evaluate_pool_assets_operation(entries, chunking_context, module_graph);
194 let assets = resolve_strongly_consistent_and_take_and_apply_effects(operation).await?;
204
205 Ok(*assets)
206}
207
208#[turbo_tasks::task_input]
209#[derive(Clone, Copy, Hash, Debug, PartialEq, Eq, TraceRawVcs, Encode, Decode)]
210pub enum EnvVarTracking {
211 WholeEnvTracked,
212 Untracked,
213}
214
215#[turbo_tasks::function(operation, root)]
216pub async fn get_evaluate_pool(
219 entries: ResolvedVc<EvaluateEntries>,
220 cwd: FileSystemPath,
221 env: ResolvedVc<Box<dyn ProcessEnv>>,
222 node_backend: ResolvedVc<Box<dyn NodeBackend>>,
223 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
224 module_graph: ResolvedVc<ModuleGraph>,
225 additional_invalidation: ResolvedVc<Completion>,
226 debug: bool,
227 env_var_tracking: EnvVarTracking,
228) -> Result<Vc<EvaluatePool>> {
229 let assets_op = create_evaluate_pool_assets_operation(entries, chunking_context, module_graph);
230 let assets = assets_op.read_strongly_consistent().await?;
233
234 let EmittedEvaluatePoolAssets {
235 bootstrap,
236 output_root,
237 entrypoint,
238 } = &*assets;
239
240 let (Some(cwd), Some(entrypoint)) = (
241 to_sys_path(cwd.clone()).await?,
242 to_sys_path(entrypoint.clone()).await?,
243 ) else {
244 panic!("can only evaluate from a disk filesystem");
245 };
246
247 content_changed(Vc::upcast(**bootstrap)).await?;
249 let assets_for_source_mapping =
250 internal_assets_for_source_mapping(**bootstrap, output_root.clone())
251 .to_resolved()
252 .await?;
253 let env = match env_var_tracking {
254 EnvVarTracking::WholeEnvTracked => env.read_all().await?,
255 EnvVarTracking::Untracked => {
256 common_node_env(*env).await?;
258 for name in ["FORCE_COLOR", "NO_COLOR", "OPENSSL_CONF", "TZ"] {
259 env.read(name.into()).await?;
260 }
261
262 env.read_all().untracked().await?
263 }
264 };
265
266 let node_backend = node_backend.into_trait_ref().await?;
267 let pool = node_backend
268 .create_pool(CreatePoolOptions {
269 cwd,
270 entrypoint,
271 env: env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
272 assets_for_source_mapping,
273 assets_root: output_root.clone(),
274 project_dir: chunking_context.root_path().owned().await?,
275 concurrency: available_parallelism().map_or(1, |v| v.get()),
276 debug,
277 })
278 .await?;
279 pool.pre_warm();
280 additional_invalidation.await?;
281 Ok(pool.cell())
282}
283
284#[turbo_tasks::function]
285async fn common_node_env(env: Vc<Box<dyn ProcessEnv>>) -> Result<Vc<EnvMap>> {
286 let mut filtered = FxIndexMap::default();
287 let env = env.read_all().await?;
288 for (key, value) in &*env {
289 let uppercase = key.to_uppercase();
290 for filter in &["NODE_", "UV_", "SSL_"] {
291 if uppercase.starts_with(filter) {
292 filtered.insert(key.clone(), value.clone());
293 break;
294 }
295 }
296 }
297 Ok(Vc::cell(filtered))
298}
299
300struct PoolErrorHandler;
301
302const MAX_FAST_ATTEMPTS: usize = 5;
304const MAX_ATTEMPTS: usize = MAX_FAST_ATTEMPTS * 2;
306
307impl futures_retry::ErrorHandler<anyhow::Error> for PoolErrorHandler {
308 type OutError = anyhow::Error;
309
310 fn handle(&mut self, attempt: usize, err: anyhow::Error) -> RetryPolicy<Self::OutError> {
311 if attempt >= MAX_ATTEMPTS {
312 RetryPolicy::ForwardError(err)
313 } else if attempt >= MAX_FAST_ATTEMPTS {
314 RetryPolicy::WaitRetry(Duration::from_secs(1))
315 } else {
316 RetryPolicy::Repeat
317 }
318 }
319}
320
321pub trait EvaluateContext {
322 type InfoMessage: DeserializeOwned;
323 type RequestMessage: DeserializeOwned;
324 type ResponseMessage: Serialize;
325 type State: Default;
326
327 fn pool(&self) -> OperationVc<EvaluatePool>;
328 fn keep_alive(&self) -> bool {
329 false
330 }
331 fn args(&self) -> &[ResolvedVc<JsonValue>];
332 fn cwd(&self) -> Vc<FileSystemPath>;
333 fn emit_error(
334 &self,
335 error: StructuredError,
336 pool: &EvaluatePool,
337 ) -> impl Future<Output = Result<()>> + Send;
338 fn info(
339 &self,
340 state: &mut Self::State,
341 data: Self::InfoMessage,
342 pool: &EvaluatePool,
343 ) -> impl Future<Output = Result<()>> + Send;
344 fn request(
345 &self,
346 state: &mut Self::State,
347 data: Self::RequestMessage,
348 pool: &EvaluatePool,
349 ) -> impl Future<Output = Result<Self::ResponseMessage>> + Send;
350 fn finish(
351 &self,
352 state: Self::State,
353 pool: &EvaluatePool,
354 ) -> impl Future<Output = Result<()>> + Send;
355
356 fn crash_context_prefix(&self) -> Option<RcStr> {
362 None
363 }
364}
365
366pub async fn custom_evaluate(evaluate_context: impl EvaluateContext) -> Result<Vc<Option<RcStr>>> {
367 let pool_op = evaluate_context.pool();
368 let mut state = Default::default();
369
370 let pool = pool_op.read_strongly_consistent().await?;
373
374 let args = evaluate_context.args().iter().try_join().await?;
375 let kill = !evaluate_context.keep_alive();
378
379 let (mut operation, _) = FutureRetry::new(
385 || async {
386 let mut operation = pool.operation().await?;
387 operation
388 .send(Bytes::from(serde_json::to_vec(
389 &EvalJavaScriptOutgoingMessage::Evaluate {
390 args: args.iter().map(|v| &**v).collect(),
391 },
392 )?))
393 .await?;
394 Ok(operation)
395 },
396 PoolErrorHandler,
397 )
398 .await
399 .map_err(|(e, _)| e)?;
400
401 let result = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).await?;
405
406 evaluate_context.finish(state, &pool).await?;
407
408 if kill {
409 operation.wait_or_kill().await?;
410 }
411
412 Ok(Vc::cell(result.map(RcStr::from)))
413}
414
415#[turbo_tasks::value]
416pub struct EvaluateEntries {
417 entries: Vec<ResolvedVc<Box<dyn EvaluatableAsset + 'static>>>,
418 main_entry_ident: ResolvedVc<AssetIdent>,
419}
420
421#[turbo_tasks::value_impl]
422impl EvaluateEntries {
423 #[turbo_tasks::function]
424 pub async fn graph_entries(self: Vc<Self>) -> Result<Vc<GraphEntries>> {
425 Ok(GraphEntries::from_chunk_groups(vec![ChunkGroupEntry::Entry(
426 self.await?
427 .entries
428 .iter()
429 .cloned()
430 .map(ResolvedVc::upcast)
431 .collect(),
432 )])
433 .cell())
434 }
435}
436
437#[turbo_tasks::function]
438pub async fn get_evaluate_entries(
439 module_asset: ResolvedVc<Box<dyn Module>>,
440 asset_context: ResolvedVc<Box<dyn AssetContext>>,
441 node_backend: ResolvedVc<Box<dyn NodeBackend>>,
442 runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
443) -> Result<Vc<EvaluateEntries>> {
444 let node_backend = node_backend.into_trait_ref().await?;
445 let runtime_module_path = node_backend.runtime_module_path();
446
447 let runtime_asset = asset_context
448 .process(
449 Vc::upcast(FileSource::new(
450 embed_file_path(runtime_module_path).owned().await?,
451 )),
452 ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
453 )
454 .module()
455 .to_resolved()
456 .await?;
457
458 let entry_module = asset_context
459 .process(
460 Vc::upcast(VirtualSource::new(
461 runtime_asset.ident().await?.path.join("evaluate.js")?,
462 AssetContent::file(
463 FileContent::Content(File::from(
464 "import {run} from 'RUNTIME'; run(() => import('INNER'))",
465 ))
466 .cell(),
467 ),
468 )),
469 ReferenceType::Internal(ResolvedVc::cell(
470 fxindexmap! {rcstr!("INNER") => module_asset,
471 rcstr!("RUNTIME") => runtime_asset},
472 )),
473 )
474 .module()
475 .to_resolved()
476 .await?;
477
478 let runtime_entries = {
479 let mut entries = vec![];
480 let global_module_path = node_backend.globals_module_path();
481
482 let globals_module = asset_context
483 .process(
484 Vc::upcast(FileSource::new(
485 embed_file_path(global_module_path).owned().await?,
486 )),
487 ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
488 )
489 .module();
490
491 let Some(globals_module) = ResolvedVc::try_sidecast::<Box<dyn EvaluatableAsset>>(
492 globals_module.to_resolved().await?,
493 ) else {
494 bail!("Internal module is not evaluatable");
495 };
496
497 entries.push(globals_module);
498
499 if let Some(runtime_entries) = runtime_entries {
500 for &entry in &*runtime_entries.await? {
501 entries.push(entry)
502 }
503 }
504 entries
505 };
506
507 Ok(EvaluateEntries {
508 entries: runtime_entries
509 .iter()
510 .copied()
511 .chain(iter::once(ResolvedVc::try_downcast(entry_module).unwrap()))
512 .collect(),
513 main_entry_ident: module_asset.ident().to_resolved().await?,
514 }
515 .cell())
516}
517
518#[turbo_tasks::function]
521pub async fn evaluate(
522 entries: ResolvedVc<EvaluateEntries>,
523 cwd: FileSystemPath,
524 env: ResolvedVc<Box<dyn ProcessEnv>>,
525 node_backend: ResolvedVc<Box<dyn NodeBackend>>,
526 context_source_for_issue: ResolvedVc<Box<dyn Source>>,
527 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
528 module_graph: ResolvedVc<ModuleGraph>,
529 args: Vec<ResolvedVc<JsonValue>>,
530 additional_invalidation: ResolvedVc<Completion>,
531 debug: bool,
532) -> Result<Vc<Option<RcStr>>> {
533 custom_evaluate(BasicEvaluateContext {
534 entries,
535 cwd,
536 env,
537 node_backend,
538 context_source_for_issue,
539 chunking_context,
540 module_graph,
541 args,
542 additional_invalidation,
543 debug,
544 })
545 .await
546}
547
548async fn pull_operation<T: EvaluateContext>(
551 operation: &mut Box<dyn Operation>,
552 pool: &EvaluatePool,
553 evaluate_context: &T,
554 state: &mut T::State,
555) -> Result<Option<String>> {
556 let _guard = duration_span!("Node.js evaluation");
557
558 loop {
559 let recv_result = operation.recv().await;
560 let bytes = match recv_result {
561 Ok(bytes) => bytes,
562 Err(err) => {
563 let message = match evaluate_context.crash_context_prefix() {
570 Some(prefix) => format!(
571 "Node.js subprocess crashed while evaluating {}: {}",
572 prefix,
573 PrettyPrintError(&err)
574 ),
575 None => format!(
576 "Node.js subprocess crashed while evaluating: {}",
577 PrettyPrintError(&err)
578 ),
579 };
580 let synthetic = StructuredError::from_message("Error".to_string(), message);
581 evaluate_context.emit_error(synthetic, pool).await?;
582 operation.disallow_reuse();
583 return Ok(None);
584 }
585 };
586 let message = serde_json::from_slice(&bytes)?;
587
588 match message {
589 EvalJavaScriptIncomingMessage::Error(error) => {
590 evaluate_context.emit_error(error, pool).await?;
591 operation.disallow_reuse();
593 return Ok(None);
595 }
596 EvalJavaScriptIncomingMessage::End { data } => return Ok(data),
597 EvalJavaScriptIncomingMessage::Info { data } => {
598 evaluate_context
599 .info(state, serde_json::from_value(data)?, pool)
600 .await?;
601 }
602 EvalJavaScriptIncomingMessage::Request { id, data } => {
603 match evaluate_context
604 .request(state, serde_json::from_value(data)?, pool)
605 .await
606 {
607 Ok(response) => {
608 operation
609 .send(Bytes::from(serde_json::to_vec(
610 &EvalJavaScriptOutgoingMessage::Result {
611 id,
612 error: None,
613 data: Some(serde_json::to_value(response)?),
614 },
615 )?))
616 .await?;
617 }
618 Err(e) => {
619 operation
620 .send(Bytes::from(serde_json::to_vec(
621 &EvalJavaScriptOutgoingMessage::Result {
622 id,
623 error: Some(PrettyPrintError(&e).to_string()),
624 data: None,
625 },
626 )?))
627 .await?;
628 }
629 }
630 }
631 }
632 }
633}
634
635struct BasicEvaluateContext {
636 entries: ResolvedVc<EvaluateEntries>,
637 cwd: FileSystemPath,
638 env: ResolvedVc<Box<dyn ProcessEnv>>,
639 node_backend: ResolvedVc<Box<dyn NodeBackend>>,
640 context_source_for_issue: ResolvedVc<Box<dyn Source>>,
641 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
642 module_graph: ResolvedVc<ModuleGraph>,
643 args: Vec<ResolvedVc<JsonValue>>,
644 additional_invalidation: ResolvedVc<Completion>,
645 debug: bool,
646}
647
648impl EvaluateContext for BasicEvaluateContext {
649 type InfoMessage = ();
650 type RequestMessage = ();
651 type ResponseMessage = ();
652 type State = ();
653
654 fn pool(&self) -> OperationVc<EvaluatePool> {
655 get_evaluate_pool(
656 self.entries,
657 self.cwd.clone(),
658 self.env,
659 self.node_backend,
660 self.chunking_context,
661 self.module_graph,
662 self.additional_invalidation,
663 self.debug,
664 EnvVarTracking::WholeEnvTracked,
665 )
666 }
667
668 fn args(&self) -> &[ResolvedVc<serde_json::Value>] {
669 &self.args
670 }
671
672 fn cwd(&self) -> Vc<turbo_tasks_fs::FileSystemPath> {
673 self.cwd.clone().cell()
674 }
675
676 fn keep_alive(&self) -> bool {
677 !self.args.is_empty()
678 }
679
680 async fn emit_error(&self, error: StructuredError, pool: &EvaluatePool) -> Result<()> {
681 EvaluationIssue {
682 error,
683 source: IssueSource::from_source_only(self.context_source_for_issue),
684 assets_for_source_mapping: pool.assets_for_source_mapping,
685 assets_root: pool.assets_root.clone(),
686 root_path: self.chunking_context.root_path().owned().await?,
687 detail: None,
688 }
689 .resolved_cell()
690 .emit();
691 Ok(())
692 }
693
694 async fn info(
695 &self,
696 _state: &mut Self::State,
697 _data: Self::InfoMessage,
698 _pool: &EvaluatePool,
699 ) -> Result<()> {
700 bail!("BasicEvaluateContext does not support info messages")
701 }
702
703 async fn request(
704 &self,
705 _state: &mut Self::State,
706 _data: Self::RequestMessage,
707 _pool: &EvaluatePool,
708 ) -> Result<Self::ResponseMessage> {
709 bail!("BasicEvaluateContext does not support request messages")
710 }
711
712 async fn finish(&self, _state: Self::State, _pool: &EvaluatePool) -> Result<()> {
713 Ok(())
714 }
715}
716
717#[turbo_tasks::value(shared)]
719pub struct EvaluationIssue {
720 pub source: IssueSource,
721 pub error: StructuredError,
722 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
723 pub assets_root: FileSystemPath,
724 pub root_path: FileSystemPath,
725 pub detail: Option<RcStr>,
729}
730
731#[async_trait]
732#[turbo_tasks::value_impl]
733impl Issue for EvaluationIssue {
734 async fn title(&self) -> Result<StyledString> {
735 Ok(StyledString::Text(rcstr!("Error evaluating Node.js code")))
736 }
737
738 fn stage(&self) -> IssueStage {
739 IssueStage::Transform
740 }
741
742 async fn file_path(&self) -> Result<FileSystemPath> {
743 self.source.file_path().await
744 }
745
746 async fn description(&self) -> Result<Option<StyledString>> {
747 Ok(Some(StyledString::Text(
748 self.error
749 .print(
750 *self.assets_for_source_mapping,
751 self.assets_root.clone(),
752 self.root_path.clone(),
753 FormattingMode::Plain,
754 )
755 .await?
756 .into(),
757 )))
758 }
759
760 async fn detail(&self) -> Result<Option<StyledString>> {
761 Ok(self.detail.clone().map(StyledString::Text))
762 }
763
764 fn source(&self) -> Option<IssueSource> {
765 Some(self.source)
766 }
767}