1use std::{
2 borrow::Cow, iter, process::ExitStatus, sync::Arc, thread::available_parallelism,
3 time::Duration,
4};
5
6use anyhow::{Result, bail};
7use bincode::{Decode, Encode};
8use bytes::Bytes;
9use futures_retry::{FutureRetry, RetryPolicy};
10use serde::{Deserialize, Serialize, de::DeserializeOwned};
11use serde_json::Value as JsonValue;
12use turbo_rcstr::{RcStr, rcstr};
13use turbo_tasks::{
14 Completion, FxIndexMap, NonLocalValue, OperationVc, PrettyPrintError, ReadRef, ResolvedVc,
15 TaskInput, TryJoinIterExt, Vc, duration_span, fxindexmap, mark_session_dependent,
16 mark_top_level_task, take_effects, trace::TraceRawVcs,
17};
18use turbo_tasks_env::{EnvMap, ProcessEnv};
19use turbo_tasks_fs::{File, FileContent, FileSystemPath, to_sys_path};
20use turbopack_core::{
21 asset::AssetContent,
22 changed::content_changed,
23 chunk::{ChunkingContext, ChunkingContextExt, EvaluatableAsset, EvaluatableAssets},
24 context::AssetContext,
25 file_source::FileSource,
26 ident::AssetIdent,
27 issue::{
28 Issue, IssueExt, IssueSource, IssueStage, OptionIssueSource, OptionStyledString,
29 StyledString,
30 },
31 module::Module,
32 module_graph::{
33 GraphEntries, ModuleGraph,
34 chunk_group_info::{ChunkGroup, ChunkGroupEntry},
35 },
36 output::{OutputAsset, OutputAssets},
37 reference_type::{InnerAssets, ReferenceType},
38 source::Source,
39 virtual_source::VirtualSource,
40};
41
42use crate::{
43 AssetsForSourceMapping,
44 backend::{CreatePoolOptions, NodeBackend},
45 embed_js::embed_file_path,
46 emit, emit_package_json,
47 format::FormattingMode,
48 internal_assets_for_source_mapping,
49 pool_stats::PoolStatsSnapshot,
50 source_map::StructuredError,
51};
52
53#[derive(Serialize)]
54#[serde(tag = "type", rename_all = "camelCase")]
55enum EvalJavaScriptOutgoingMessage<'a> {
56 #[serde(rename_all = "camelCase")]
57 Evaluate { args: Vec<&'a JsonValue> },
58 Result {
59 id: u64,
60 data: Option<JsonValue>,
61 error: Option<String>,
62 },
63}
64
65#[derive(Deserialize, Debug)]
66#[serde(tag = "type", rename_all = "camelCase")]
67enum EvalJavaScriptIncomingMessage {
68 Info { data: JsonValue },
69 Request { id: u64, data: JsonValue },
70 End { data: Option<String> },
71 Error(StructuredError),
72}
73
74#[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)]
75pub struct EvaluatePool {
76 #[turbo_tasks(trace_ignore, debug_ignore)]
77 pool: Box<dyn EvaluateOperation>,
78 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
79 pub assets_root: FileSystemPath,
80 pub project_dir: FileSystemPath,
81}
82
83impl EvaluatePool {
84 pub(crate) fn new(
85 pool: Box<dyn EvaluateOperation>,
86 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
87 assets_root: FileSystemPath,
88 project_dir: FileSystemPath,
89 ) -> Self {
90 Self {
91 pool,
92 assets_for_source_mapping,
93 assets_root,
94 project_dir,
95 }
96 }
97
98 pub async fn operation(&self) -> Result<Box<dyn Operation>> {
99 self.pool.operation().await
100 }
101
102 pub fn stats(&self) -> PoolStatsSnapshot {
103 self.pool.stats()
104 }
105
106 pub fn pre_warm(&self) {
107 self.pool.pre_warm()
108 }
109}
110
111#[async_trait::async_trait]
112pub trait EvaluateOperation: Send + Sync {
113 async fn operation(&self) -> Result<Box<dyn Operation>>;
114 fn stats(&self) -> PoolStatsSnapshot;
115 fn pre_warm(&self);
121}
122
123#[async_trait::async_trait]
124pub trait Operation: Send {
125 async fn recv(&mut self) -> Result<Bytes>;
126
127 async fn send(&mut self, data: Bytes) -> Result<()>;
128
129 async fn wait_or_kill(&mut self) -> Result<ExitStatus>;
130
131 fn disallow_reuse(&mut self) -> ();
132}
133
134#[turbo_tasks::value]
135struct EmittedEvaluatePoolAssets {
136 bootstrap: ResolvedVc<Box<dyn OutputAsset>>,
137 output_root: FileSystemPath,
138 entrypoint: FileSystemPath,
139}
140
141#[turbo_tasks::function(operation)]
142async fn emit_evaluate_pool_assets_operation(
143 entries: ResolvedVc<EvaluateEntries>,
144 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
145 module_graph: ResolvedVc<ModuleGraph>,
146) -> Result<Vc<EmittedEvaluatePoolAssets>> {
147 let EvaluateEntries {
148 entries,
149 main_entry_ident,
150 } = &*entries.await?;
151
152 let module_path = main_entry_ident.path().await?;
153 let file_name = module_path.file_name();
154 let file_name = if file_name.ends_with(".js") {
155 Cow::Borrowed(file_name)
156 } else if let Some(file_name) = file_name.strip_suffix(".ts") {
157 Cow::Owned(format!("{file_name}.js"))
158 } else {
159 Cow::Owned(format!("{file_name}.js"))
160 };
161 let entrypoint = chunking_context.output_root().await?.join(&file_name)?;
162
163 let bootstrap = chunking_context.root_entry_chunk_group_asset(
164 entrypoint.clone(),
165 ChunkGroup::Entry(entries.iter().cloned().map(ResolvedVc::upcast).collect()),
166 *module_graph,
167 OutputAssets::empty(),
168 OutputAssets::empty(),
169 );
170
171 let output_root = chunking_context.output_root().owned().await?;
172 emit_package_json(output_root.clone())?
173 .as_side_effect()
174 .await?;
175 emit(bootstrap, output_root.clone())
176 .as_side_effect()
177 .await?;
178
179 Ok(EmittedEvaluatePoolAssets {
180 bootstrap: bootstrap.to_resolved().await?,
181 output_root,
182 entrypoint: entrypoint.clone(),
183 }
184 .cell())
185}
186
187#[turbo_tasks::value(serialization = "none")]
188struct EmittedEvaluatePoolAssetsWithEffects {
189 assets: ReadRef<EmittedEvaluatePoolAssets>,
190}
191
192#[turbo_tasks::function(operation)]
193async fn create_evaluate_pool_assets_operation(
194 entries: ResolvedVc<EvaluateEntries>,
195 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
196 module_graph: ResolvedVc<ModuleGraph>,
197) -> Result<Vc<EmittedEvaluatePoolAssets>> {
198 mark_session_dependent();
199 let operation = emit_evaluate_pool_assets_operation(entries, chunking_context, module_graph);
200 let assets = operation.resolve().strongly_consistent().await?;
201 let effects = Arc::new(take_effects(operation).await?);
202
203 mark_top_level_task();
210 effects.apply().await?;
211
212 Ok(*assets)
213}
214
215#[derive(
216 Clone, Copy, Hash, Debug, PartialEq, Eq, TaskInput, NonLocalValue, TraceRawVcs, Encode, Decode,
217)]
218pub enum EnvVarTracking {
219 WholeEnvTracked,
220 Untracked,
221}
222
223#[turbo_tasks::function(operation)]
224pub async fn get_evaluate_pool(
227 entries: ResolvedVc<EvaluateEntries>,
228 cwd: FileSystemPath,
229 env: ResolvedVc<Box<dyn ProcessEnv>>,
230 node_backend: ResolvedVc<Box<dyn NodeBackend>>,
231 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
232 module_graph: ResolvedVc<ModuleGraph>,
233 additional_invalidation: ResolvedVc<Completion>,
234 debug: bool,
235 env_var_tracking: EnvVarTracking,
236) -> Result<Vc<EvaluatePool>> {
237 let assets_op = create_evaluate_pool_assets_operation(entries, chunking_context, module_graph);
238 let assets = assets_op.read_strongly_consistent().await?;
239
240 let EmittedEvaluatePoolAssets {
241 bootstrap,
242 output_root,
243 entrypoint,
244 } = &*assets;
245
246 let (Some(cwd), Some(entrypoint)) = (
247 to_sys_path(cwd.clone()).await?,
248 to_sys_path(entrypoint.clone()).await?,
249 ) else {
250 panic!("can only evaluate from a disk filesystem");
251 };
252
253 content_changed(Vc::upcast(**bootstrap)).await?;
255 let assets_for_source_mapping =
256 internal_assets_for_source_mapping(**bootstrap, output_root.clone())
257 .to_resolved()
258 .await?;
259 let env = match env_var_tracking {
260 EnvVarTracking::WholeEnvTracked => env.read_all().await?,
261 EnvVarTracking::Untracked => {
262 common_node_env(*env).await?;
264 for name in ["FORCE_COLOR", "NO_COLOR", "OPENSSL_CONF", "TZ"] {
265 env.read(name.into()).await?;
266 }
267
268 env.read_all().untracked().await?
269 }
270 };
271
272 let node_backend = node_backend.into_trait_ref().await?;
273 let pool = node_backend
274 .create_pool(CreatePoolOptions {
275 cwd,
276 entrypoint,
277 env: env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
278 assets_for_source_mapping,
279 assets_root: output_root.clone(),
280 project_dir: chunking_context.root_path().owned().await?,
281 concurrency: available_parallelism().map_or(1, |v| v.get()),
282 debug,
283 })
284 .await?;
285 pool.pre_warm();
286 additional_invalidation.await?;
287 Ok(pool.cell())
288}
289
290#[turbo_tasks::function]
291async fn common_node_env(env: Vc<Box<dyn ProcessEnv>>) -> Result<Vc<EnvMap>> {
292 let mut filtered = FxIndexMap::default();
293 let env = env.read_all().await?;
294 for (key, value) in &*env {
295 let uppercase = key.to_uppercase();
296 for filter in &["NODE_", "UV_", "SSL_"] {
297 if uppercase.starts_with(filter) {
298 filtered.insert(key.clone(), value.clone());
299 break;
300 }
301 }
302 }
303 Ok(Vc::cell(filtered))
304}
305
306struct PoolErrorHandler;
307
308const MAX_FAST_ATTEMPTS: usize = 5;
310const MAX_ATTEMPTS: usize = MAX_FAST_ATTEMPTS * 2;
312
313impl futures_retry::ErrorHandler<anyhow::Error> for PoolErrorHandler {
314 type OutError = anyhow::Error;
315
316 fn handle(&mut self, attempt: usize, err: anyhow::Error) -> RetryPolicy<Self::OutError> {
317 if attempt >= MAX_ATTEMPTS {
318 RetryPolicy::ForwardError(err)
319 } else if attempt >= MAX_FAST_ATTEMPTS {
320 RetryPolicy::WaitRetry(Duration::from_secs(1))
321 } else {
322 RetryPolicy::Repeat
323 }
324 }
325}
326
327pub trait EvaluateContext {
328 type InfoMessage: DeserializeOwned;
329 type RequestMessage: DeserializeOwned;
330 type ResponseMessage: Serialize;
331 type State: Default;
332
333 fn pool(&self) -> OperationVc<EvaluatePool>;
334 fn keep_alive(&self) -> bool {
335 false
336 }
337 fn args(&self) -> &[ResolvedVc<JsonValue>];
338 fn cwd(&self) -> Vc<FileSystemPath>;
339 fn emit_error(
340 &self,
341 error: StructuredError,
342 pool: &EvaluatePool,
343 ) -> impl Future<Output = Result<()>> + Send;
344 fn info(
345 &self,
346 state: &mut Self::State,
347 data: Self::InfoMessage,
348 pool: &EvaluatePool,
349 ) -> impl Future<Output = Result<()>> + Send;
350 fn request(
351 &self,
352 state: &mut Self::State,
353 data: Self::RequestMessage,
354 pool: &EvaluatePool,
355 ) -> impl Future<Output = Result<Self::ResponseMessage>> + Send;
356 fn finish(
357 &self,
358 state: Self::State,
359 pool: &EvaluatePool,
360 ) -> impl Future<Output = Result<()>> + Send;
361}
362
363pub async fn custom_evaluate(evaluate_context: impl EvaluateContext) -> Result<Vc<Option<RcStr>>> {
364 let pool_op = evaluate_context.pool();
365 let mut state = Default::default();
366
367 let pool = pool_op.read_strongly_consistent().await?;
370
371 let args = evaluate_context.args().iter().try_join().await?;
372 let kill = !evaluate_context.keep_alive();
375
376 let (mut operation, _) = FutureRetry::new(
382 || async {
383 let mut operation = pool.operation().await?;
384 operation
385 .send(Bytes::from(serde_json::to_vec(
386 &EvalJavaScriptOutgoingMessage::Evaluate {
387 args: args.iter().map(|v| &**v).collect(),
388 },
389 )?))
390 .await?;
391 Ok(operation)
392 },
393 PoolErrorHandler,
394 )
395 .await
396 .map_err(|(e, _)| e)?;
397
398 let result = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).await?;
402
403 evaluate_context.finish(state, &pool).await?;
404
405 if kill {
406 operation.wait_or_kill().await?;
407 }
408
409 Ok(Vc::cell(result.map(RcStr::from)))
410}
411
412#[turbo_tasks::value]
413pub struct EvaluateEntries {
414 entries: Vec<ResolvedVc<Box<dyn EvaluatableAsset + 'static>>>,
415 main_entry_ident: ResolvedVc<AssetIdent>,
416}
417
418#[turbo_tasks::value_impl]
419impl EvaluateEntries {
420 #[turbo_tasks::function]
421 pub async fn graph_entries(self: Vc<Self>) -> Result<Vc<GraphEntries>> {
422 Ok(Vc::cell(vec![ChunkGroupEntry::Entry(
423 self.await?
424 .entries
425 .iter()
426 .cloned()
427 .map(ResolvedVc::upcast)
428 .collect(),
429 )]))
430 }
431}
432
433#[turbo_tasks::function]
434pub async fn get_evaluate_entries(
435 module_asset: ResolvedVc<Box<dyn Module>>,
436 asset_context: ResolvedVc<Box<dyn AssetContext>>,
437 node_backend: ResolvedVc<Box<dyn NodeBackend>>,
438 runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
439) -> Result<Vc<EvaluateEntries>> {
440 let node_backend = node_backend.into_trait_ref().await?;
441 let runtime_module_path = node_backend.runtime_module_path();
442
443 let runtime_asset = asset_context
444 .process(
445 Vc::upcast(FileSource::new(
446 embed_file_path(runtime_module_path).owned().await?,
447 )),
448 ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
449 )
450 .module()
451 .to_resolved()
452 .await?;
453
454 let entry_module = asset_context
455 .process(
456 Vc::upcast(VirtualSource::new(
457 runtime_asset.ident().path().await?.join("evaluate.js")?,
458 AssetContent::file(
459 FileContent::Content(File::from(
460 "import { run } from 'RUNTIME'; run(() => import('INNER'))",
461 ))
462 .cell(),
463 ),
464 )),
465 ReferenceType::Internal(ResolvedVc::cell(fxindexmap! {
466 rcstr!("INNER") => module_asset,
467 rcstr!("RUNTIME") => runtime_asset
468 })),
469 )
470 .module()
471 .to_resolved()
472 .await?;
473
474 let runtime_entries = {
475 let mut entries = vec![];
476 let global_module_path = node_backend.globals_module_path();
477
478 let globals_module = asset_context
479 .process(
480 Vc::upcast(FileSource::new(
481 embed_file_path(global_module_path).owned().await?,
482 )),
483 ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
484 )
485 .module();
486
487 let Some(globals_module) = ResolvedVc::try_sidecast::<Box<dyn EvaluatableAsset>>(
488 globals_module.to_resolved().await?,
489 ) else {
490 bail!("Internal module is not evaluatable");
491 };
492
493 entries.push(globals_module);
494
495 if let Some(runtime_entries) = runtime_entries {
496 for &entry in &*runtime_entries.await? {
497 entries.push(entry)
498 }
499 }
500 entries
501 };
502
503 Ok(EvaluateEntries {
504 entries: runtime_entries
505 .iter()
506 .copied()
507 .chain(iter::once(ResolvedVc::try_downcast(entry_module).unwrap()))
508 .collect(),
509 main_entry_ident: module_asset.ident().to_resolved().await?,
510 }
511 .cell())
512}
513
514#[turbo_tasks::function]
517pub async fn evaluate(
518 entries: ResolvedVc<EvaluateEntries>,
519 cwd: FileSystemPath,
520 env: ResolvedVc<Box<dyn ProcessEnv>>,
521 node_backend: ResolvedVc<Box<dyn NodeBackend>>,
522 context_source_for_issue: ResolvedVc<Box<dyn Source>>,
523 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
524 module_graph: ResolvedVc<ModuleGraph>,
525 args: Vec<ResolvedVc<JsonValue>>,
526 additional_invalidation: ResolvedVc<Completion>,
527 debug: bool,
528) -> Result<Vc<Option<RcStr>>> {
529 custom_evaluate(BasicEvaluateContext {
530 entries,
531 cwd,
532 env,
533 node_backend,
534 context_source_for_issue,
535 chunking_context,
536 module_graph,
537 args,
538 additional_invalidation,
539 debug,
540 })
541 .await
542}
543
544async fn pull_operation<T: EvaluateContext>(
547 operation: &mut Box<dyn Operation>,
548 pool: &EvaluatePool,
549 evaluate_context: &T,
550 state: &mut T::State,
551) -> Result<Option<String>> {
552 let _guard = duration_span!("Node.js evaluation");
553
554 loop {
555 let message = serde_json::from_slice(&operation.recv().await?)?;
556
557 match message {
558 EvalJavaScriptIncomingMessage::Error(error) => {
559 evaluate_context.emit_error(error, pool).await?;
560 operation.disallow_reuse();
562 return Ok(None);
564 }
565 EvalJavaScriptIncomingMessage::End { data } => return Ok(data),
566 EvalJavaScriptIncomingMessage::Info { data } => {
567 evaluate_context
568 .info(state, serde_json::from_value(data)?, pool)
569 .await?;
570 }
571 EvalJavaScriptIncomingMessage::Request { id, data } => {
572 match evaluate_context
573 .request(state, serde_json::from_value(data)?, pool)
574 .await
575 {
576 Ok(response) => {
577 operation
578 .send(Bytes::from(serde_json::to_vec(
579 &EvalJavaScriptOutgoingMessage::Result {
580 id,
581 error: None,
582 data: Some(serde_json::to_value(response)?),
583 },
584 )?))
585 .await?;
586 }
587 Err(e) => {
588 operation
589 .send(Bytes::from(serde_json::to_vec(
590 &EvalJavaScriptOutgoingMessage::Result {
591 id,
592 error: Some(PrettyPrintError(&e).to_string()),
593 data: None,
594 },
595 )?))
596 .await?;
597 }
598 }
599 }
600 }
601 }
602}
603
604struct BasicEvaluateContext {
605 entries: ResolvedVc<EvaluateEntries>,
606 cwd: FileSystemPath,
607 env: ResolvedVc<Box<dyn ProcessEnv>>,
608 node_backend: ResolvedVc<Box<dyn NodeBackend>>,
609 context_source_for_issue: ResolvedVc<Box<dyn Source>>,
610 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
611 module_graph: ResolvedVc<ModuleGraph>,
612 args: Vec<ResolvedVc<JsonValue>>,
613 additional_invalidation: ResolvedVc<Completion>,
614 debug: bool,
615}
616
617impl EvaluateContext for BasicEvaluateContext {
618 type InfoMessage = ();
619 type RequestMessage = ();
620 type ResponseMessage = ();
621 type State = ();
622
623 fn pool(&self) -> OperationVc<EvaluatePool> {
624 get_evaluate_pool(
625 self.entries,
626 self.cwd.clone(),
627 self.env,
628 self.node_backend,
629 self.chunking_context,
630 self.module_graph,
631 self.additional_invalidation,
632 self.debug,
633 EnvVarTracking::WholeEnvTracked,
634 )
635 }
636
637 fn args(&self) -> &[ResolvedVc<serde_json::Value>] {
638 &self.args
639 }
640
641 fn cwd(&self) -> Vc<turbo_tasks_fs::FileSystemPath> {
642 self.cwd.clone().cell()
643 }
644
645 fn keep_alive(&self) -> bool {
646 !self.args.is_empty()
647 }
648
649 async fn emit_error(&self, error: StructuredError, pool: &EvaluatePool) -> Result<()> {
650 EvaluationIssue {
651 error,
652 source: IssueSource::from_source_only(self.context_source_for_issue),
653 assets_for_source_mapping: pool.assets_for_source_mapping,
654 assets_root: pool.assets_root.clone(),
655 root_path: self.chunking_context.root_path().owned().await?,
656 }
657 .resolved_cell()
658 .emit();
659 Ok(())
660 }
661
662 async fn info(
663 &self,
664 _state: &mut Self::State,
665 _data: Self::InfoMessage,
666 _pool: &EvaluatePool,
667 ) -> Result<()> {
668 bail!("BasicEvaluateContext does not support info messages")
669 }
670
671 async fn request(
672 &self,
673 _state: &mut Self::State,
674 _data: Self::RequestMessage,
675 _pool: &EvaluatePool,
676 ) -> Result<Self::ResponseMessage> {
677 bail!("BasicEvaluateContext does not support request messages")
678 }
679
680 async fn finish(&self, _state: Self::State, _pool: &EvaluatePool) -> Result<()> {
681 Ok(())
682 }
683}
684
685#[turbo_tasks::value(shared)]
687pub struct EvaluationIssue {
688 pub source: IssueSource,
689 pub error: StructuredError,
690 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
691 pub assets_root: FileSystemPath,
692 pub root_path: FileSystemPath,
693}
694
695#[turbo_tasks::value_impl]
696impl Issue for EvaluationIssue {
697 #[turbo_tasks::function]
698 fn title(&self) -> Vc<StyledString> {
699 StyledString::Text(rcstr!("Error evaluating Node.js code")).cell()
700 }
701
702 #[turbo_tasks::function]
703 fn stage(&self) -> Vc<IssueStage> {
704 IssueStage::Transform.cell()
705 }
706
707 #[turbo_tasks::function]
708 fn file_path(&self) -> Vc<FileSystemPath> {
709 self.source.file_path()
710 }
711
712 #[turbo_tasks::function]
713 async fn description(&self) -> Result<Vc<OptionStyledString>> {
714 Ok(Vc::cell(Some(
715 StyledString::Text(
716 self.error
717 .print(
718 *self.assets_for_source_mapping,
719 self.assets_root.clone(),
720 self.root_path.clone(),
721 FormattingMode::Plain,
722 )
723 .await?
724 .into(),
725 )
726 .resolved_cell(),
727 )))
728 }
729
730 #[turbo_tasks::function]
731 fn source(&self) -> Vc<OptionIssueSource> {
732 Vc::cell(Some(self.source))
733 }
734}