1use std::{borrow::Cow, iter, sync::Arc, thread::available_parallelism, time::Duration};
2
3use anyhow::{Result, bail};
4use bincode::{Decode, Encode};
5use futures_retry::{FutureRetry, RetryPolicy};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use serde_json::Value as JsonValue;
8use turbo_rcstr::{RcStr, rcstr};
9use turbo_tasks::{
10 Completion, Effects, FxIndexMap, NonLocalValue, OperationVc, ReadRef, ResolvedVc, TaskInput,
11 TryJoinIterExt, Vc, duration_span, fxindexmap, get_effects, trace::TraceRawVcs,
12};
13use turbo_tasks_env::{EnvMap, ProcessEnv};
14use turbo_tasks_fs::{File, FileContent, FileSystemPath, to_sys_path};
15use turbopack_core::{
16 asset::AssetContent,
17 changed::content_changed,
18 chunk::{ChunkingContext, ChunkingContextExt, EvaluatableAsset, EvaluatableAssets},
19 context::AssetContext,
20 error::PrettyPrintError,
21 file_source::FileSource,
22 ident::AssetIdent,
23 issue::{
24 Issue, IssueExt, IssueSource, IssueStage, OptionIssueSource, OptionStyledString,
25 StyledString,
26 },
27 module::Module,
28 module_graph::{GraphEntries, ModuleGraph, chunk_group_info::ChunkGroupEntry},
29 output::{OutputAsset, OutputAssets},
30 reference_type::{InnerAssets, ReferenceType},
31 source::Source,
32 virtual_source::VirtualSource,
33};
34
35use crate::{
36 AssetsForSourceMapping,
37 embed_js::embed_file_path,
38 emit, emit_package_json, internal_assets_for_source_mapping,
39 pool::{FormattingMode, NodeJsOperation, NodeJsPool},
40 source_map::StructuredError,
41};
42
43#[derive(Serialize)]
44#[serde(tag = "type", rename_all = "camelCase")]
45enum EvalJavaScriptOutgoingMessage<'a> {
46 #[serde(rename_all = "camelCase")]
47 Evaluate { args: Vec<&'a JsonValue> },
48 Result {
49 id: u64,
50 data: Option<JsonValue>,
51 error: Option<String>,
52 },
53}
54
55#[derive(Deserialize, Debug)]
56#[serde(tag = "type", rename_all = "camelCase")]
57enum EvalJavaScriptIncomingMessage {
58 Info { data: JsonValue },
59 Request { id: u64, data: JsonValue },
60 End { data: Option<String> },
61 Error(StructuredError),
62}
63
64#[turbo_tasks::value]
65struct EmittedEvaluatePoolAssets {
66 bootstrap: ResolvedVc<Box<dyn OutputAsset>>,
67 output_root: FileSystemPath,
68 entrypoint: FileSystemPath,
69}
70
71#[turbo_tasks::function(operation)]
72async fn emit_evaluate_pool_assets_operation(
73 entries: ResolvedVc<EvaluateEntries>,
74 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
75 module_graph: ResolvedVc<ModuleGraph>,
76) -> Result<Vc<EmittedEvaluatePoolAssets>> {
77 let EvaluateEntries {
78 entries,
79 main_entry_ident,
80 } = &*entries.await?;
81
82 let module_path = main_entry_ident.path().await?;
83 let file_name = module_path.file_name();
84 let file_name = if file_name.ends_with(".js") {
85 Cow::Borrowed(file_name)
86 } else if let Some(file_name) = file_name.strip_suffix(".ts") {
87 Cow::Owned(format!("{file_name}.js"))
88 } else {
89 Cow::Owned(format!("{file_name}.js"))
90 };
91 let entrypoint = chunking_context.output_root().await?.join(&file_name)?;
92
93 let bootstrap = chunking_context.root_entry_chunk_group_asset(
94 entrypoint.clone(),
95 Vc::cell(entries.clone()),
96 *module_graph,
97 OutputAssets::empty(),
98 OutputAssets::empty(),
99 );
100
101 let output_root = chunking_context.output_root().owned().await?;
102 emit_package_json(output_root.clone())?
103 .as_side_effect()
104 .await?;
105 emit(bootstrap, output_root.clone())
106 .as_side_effect()
107 .await?;
108
109 Ok(EmittedEvaluatePoolAssets {
110 bootstrap: bootstrap.to_resolved().await?,
111 output_root,
112 entrypoint: entrypoint.clone(),
113 }
114 .cell())
115}
116
117#[turbo_tasks::value(serialization = "none")]
118struct EmittedEvaluatePoolAssetsWithEffects {
119 assets: ReadRef<EmittedEvaluatePoolAssets>,
120 effects: Arc<Effects>,
121}
122
123#[turbo_tasks::function(operation)]
124async fn emit_evaluate_pool_assets_with_effects_operation(
125 entries: ResolvedVc<EvaluateEntries>,
126 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
127 module_graph: ResolvedVc<ModuleGraph>,
128) -> Result<Vc<EmittedEvaluatePoolAssetsWithEffects>> {
129 let operation = emit_evaluate_pool_assets_operation(entries, chunking_context, module_graph);
130 let assets = operation.read_strongly_consistent().await?;
131 let effects = Arc::new(get_effects(operation).await?);
132 Ok(EmittedEvaluatePoolAssetsWithEffects { assets, effects }.cell())
133}
134
135#[derive(
136 Clone, Copy, Hash, Debug, PartialEq, Eq, TaskInput, NonLocalValue, TraceRawVcs, Encode, Decode,
137)]
138pub enum EnvVarTracking {
139 WholeEnvTracked,
140 Untracked,
141}
142
143#[turbo_tasks::function(operation)]
144pub async fn get_evaluate_pool(
147 entries: ResolvedVc<EvaluateEntries>,
148 cwd: FileSystemPath,
149 env: ResolvedVc<Box<dyn ProcessEnv>>,
150 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
151 module_graph: ResolvedVc<ModuleGraph>,
152 additional_invalidation: ResolvedVc<Completion>,
153 debug: bool,
154 env_var_tracking: EnvVarTracking,
155) -> Result<Vc<NodeJsPool>> {
156 let operation =
157 emit_evaluate_pool_assets_with_effects_operation(entries, chunking_context, module_graph);
158 let EmittedEvaluatePoolAssetsWithEffects { assets, effects } =
159 &*operation.read_strongly_consistent().await?;
160 effects.apply().await?;
161
162 let EmittedEvaluatePoolAssets {
163 bootstrap,
164 output_root,
165 entrypoint,
166 } = &**assets;
167
168 let (Some(cwd), Some(entrypoint)) = (
169 to_sys_path(cwd.clone()).await?,
170 to_sys_path(entrypoint.clone()).await?,
171 ) else {
172 panic!("can only evaluate from a disk filesystem");
173 };
174
175 content_changed(Vc::upcast(**bootstrap)).await?;
177 let assets_for_source_mapping =
178 internal_assets_for_source_mapping(**bootstrap, output_root.clone())
179 .to_resolved()
180 .await?;
181 let env = match env_var_tracking {
182 EnvVarTracking::WholeEnvTracked => env.read_all().await?,
183 EnvVarTracking::Untracked => {
184 common_node_env(*env).await?;
186 for name in ["FORCE_COLOR", "NO_COLOR", "OPENSSL_CONF", "TZ"] {
187 env.read(name.into()).await?;
188 }
189
190 env.read_all().untracked().await?
191 }
192 };
193 let pool = NodeJsPool::new(
194 cwd,
195 entrypoint,
196 env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
197 assets_for_source_mapping,
198 output_root.clone(),
199 chunking_context.root_path().owned().await?,
200 available_parallelism().map_or(1, |v| v.get()),
201 debug,
202 );
203 additional_invalidation.await?;
204 Ok(pool.cell())
205}
206
207#[turbo_tasks::function]
208async fn common_node_env(env: Vc<Box<dyn ProcessEnv>>) -> Result<Vc<EnvMap>> {
209 let mut filtered = FxIndexMap::default();
210 let env = env.read_all().await?;
211 for (key, value) in &*env {
212 let uppercase = key.to_uppercase();
213 for filter in &["NODE_", "UV_", "SSL_"] {
214 if uppercase.starts_with(filter) {
215 filtered.insert(key.clone(), value.clone());
216 break;
217 }
218 }
219 }
220 Ok(Vc::cell(filtered))
221}
222
223struct PoolErrorHandler;
224
225const MAX_FAST_ATTEMPTS: usize = 5;
227const MAX_ATTEMPTS: usize = MAX_FAST_ATTEMPTS * 2;
229
230impl futures_retry::ErrorHandler<anyhow::Error> for PoolErrorHandler {
231 type OutError = anyhow::Error;
232
233 fn handle(&mut self, attempt: usize, err: anyhow::Error) -> RetryPolicy<Self::OutError> {
234 if attempt >= MAX_ATTEMPTS {
235 RetryPolicy::ForwardError(err)
236 } else if attempt >= MAX_FAST_ATTEMPTS {
237 RetryPolicy::WaitRetry(Duration::from_secs(1))
238 } else {
239 RetryPolicy::Repeat
240 }
241 }
242}
243
244pub trait EvaluateContext {
245 type InfoMessage: DeserializeOwned;
246 type RequestMessage: DeserializeOwned;
247 type ResponseMessage: Serialize;
248 type State: Default;
249
250 fn pool(&self) -> OperationVc<NodeJsPool>;
251 fn keep_alive(&self) -> bool {
252 false
253 }
254 fn args(&self) -> &[ResolvedVc<JsonValue>];
255 fn cwd(&self) -> Vc<FileSystemPath>;
256 fn emit_error(
257 &self,
258 error: StructuredError,
259 pool: &NodeJsPool,
260 ) -> impl Future<Output = Result<()>> + Send;
261 fn info(
262 &self,
263 state: &mut Self::State,
264 data: Self::InfoMessage,
265 pool: &NodeJsPool,
266 ) -> impl Future<Output = Result<()>> + Send;
267 fn request(
268 &self,
269 state: &mut Self::State,
270 data: Self::RequestMessage,
271 pool: &NodeJsPool,
272 ) -> impl Future<Output = Result<Self::ResponseMessage>> + Send;
273 fn finish(
274 &self,
275 state: Self::State,
276 pool: &NodeJsPool,
277 ) -> impl Future<Output = Result<()>> + Send;
278}
279
280pub async fn custom_evaluate(evaluate_context: impl EvaluateContext) -> Result<Vc<Option<RcStr>>> {
281 let pool_op = evaluate_context.pool();
282 let mut state = Default::default();
283
284 let pool = pool_op.read_strongly_consistent().await?;
287
288 let args = evaluate_context.args().iter().try_join().await?;
289 let kill = !evaluate_context.keep_alive();
292
293 let (mut operation, _) = FutureRetry::new(
299 || async {
300 let mut operation = pool.operation().await?;
301 operation
302 .send(EvalJavaScriptOutgoingMessage::Evaluate {
303 args: args.iter().map(|v| &**v).collect(),
304 })
305 .await?;
306 Ok(operation)
307 },
308 PoolErrorHandler,
309 )
310 .await
311 .map_err(|(e, _)| e)?;
312
313 let result = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).await?;
317
318 evaluate_context.finish(state, &pool).await?;
319
320 if kill {
321 operation.wait_or_kill().await?;
322 }
323
324 Ok(Vc::cell(result.map(RcStr::from)))
325}
326
327#[turbo_tasks::value]
328pub struct EvaluateEntries {
329 entries: Vec<ResolvedVc<Box<dyn EvaluatableAsset + 'static>>>,
330 main_entry_ident: ResolvedVc<AssetIdent>,
331}
332
333#[turbo_tasks::value_impl]
334impl EvaluateEntries {
335 #[turbo_tasks::function]
336 pub async fn graph_entries(self: Vc<Self>) -> Result<Vc<GraphEntries>> {
337 Ok(Vc::cell(vec![ChunkGroupEntry::Entry(
338 self.await?
339 .entries
340 .iter()
341 .cloned()
342 .map(ResolvedVc::upcast)
343 .collect(),
344 )]))
345 }
346}
347
348#[turbo_tasks::function]
349pub async fn get_evaluate_entries(
350 module_asset: ResolvedVc<Box<dyn Module>>,
351 asset_context: ResolvedVc<Box<dyn AssetContext>>,
352 runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
353) -> Result<Vc<EvaluateEntries>> {
354 let runtime_asset = asset_context
355 .process(
356 Vc::upcast(FileSource::new(
357 embed_file_path(rcstr!("ipc/evaluate.ts")).owned().await?,
358 )),
359 ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
360 )
361 .module()
362 .to_resolved()
363 .await?;
364
365 let entry_module = asset_context
366 .process(
367 Vc::upcast(VirtualSource::new(
368 runtime_asset.ident().path().await?.join("evaluate.js")?,
369 AssetContent::file(
370 FileContent::Content(File::from(
371 "import { run } from 'RUNTIME'; run(() => import('INNER'))",
372 ))
373 .cell(),
374 ),
375 )),
376 ReferenceType::Internal(ResolvedVc::cell(fxindexmap! {
377 rcstr!("INNER") => module_asset,
378 rcstr!("RUNTIME") => runtime_asset
379 })),
380 )
381 .module()
382 .to_resolved()
383 .await?;
384
385 let runtime_entries = {
386 let globals_module = asset_context
387 .process(
388 Vc::upcast(FileSource::new(
389 embed_file_path(rcstr!("globals.ts")).owned().await?,
390 )),
391 ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
392 )
393 .module();
394
395 let Some(globals_module) =
396 Vc::try_resolve_sidecast::<Box<dyn EvaluatableAsset>>(globals_module).await?
397 else {
398 bail!("Internal module is not evaluatable");
399 };
400
401 let mut entries = vec![globals_module.to_resolved().await?];
402 if let Some(runtime_entries) = runtime_entries {
403 for &entry in &*runtime_entries.await? {
404 entries.push(entry)
405 }
406 }
407 entries
408 };
409
410 Ok(EvaluateEntries {
411 entries: runtime_entries
412 .iter()
413 .copied()
414 .chain(iter::once(ResolvedVc::try_downcast(entry_module).unwrap()))
415 .collect(),
416 main_entry_ident: module_asset.ident().to_resolved().await?,
417 }
418 .cell())
419}
420
421#[turbo_tasks::function]
424pub async fn evaluate(
425 entries: ResolvedVc<EvaluateEntries>,
426 cwd: FileSystemPath,
427 env: ResolvedVc<Box<dyn ProcessEnv>>,
428 context_source_for_issue: ResolvedVc<Box<dyn Source>>,
429 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
430 module_graph: ResolvedVc<ModuleGraph>,
431 args: Vec<ResolvedVc<JsonValue>>,
432 additional_invalidation: ResolvedVc<Completion>,
433 debug: bool,
434) -> Result<Vc<Option<RcStr>>> {
435 custom_evaluate(BasicEvaluateContext {
436 entries,
437 cwd,
438 env,
439 context_source_for_issue,
440 chunking_context,
441 module_graph,
442 args,
443 additional_invalidation,
444 debug,
445 })
446 .await
447}
448
449async fn pull_operation<T: EvaluateContext>(
452 operation: &mut NodeJsOperation,
453 pool: &NodeJsPool,
454 evaluate_context: &T,
455 state: &mut T::State,
456) -> Result<Option<String>> {
457 let _guard = duration_span!("Node.js evaluation");
458
459 loop {
460 match operation.recv().await? {
461 EvalJavaScriptIncomingMessage::Error(error) => {
462 evaluate_context.emit_error(error, pool).await?;
463 operation.disallow_reuse();
465 return Ok(None);
467 }
468 EvalJavaScriptIncomingMessage::End { data } => return Ok(data),
469 EvalJavaScriptIncomingMessage::Info { data } => {
470 evaluate_context
471 .info(state, serde_json::from_value(data)?, pool)
472 .await?;
473 }
474 EvalJavaScriptIncomingMessage::Request { id, data } => {
475 match evaluate_context
476 .request(state, serde_json::from_value(data)?, pool)
477 .await
478 {
479 Ok(response) => {
480 operation
481 .send(EvalJavaScriptOutgoingMessage::Result {
482 id,
483 error: None,
484 data: Some(serde_json::to_value(response)?),
485 })
486 .await?;
487 }
488 Err(e) => {
489 operation
490 .send(EvalJavaScriptOutgoingMessage::Result {
491 id,
492 error: Some(PrettyPrintError(&e).to_string()),
493 data: None,
494 })
495 .await?;
496 }
497 }
498 }
499 }
500 }
501}
502
503struct BasicEvaluateContext {
504 entries: ResolvedVc<EvaluateEntries>,
505 cwd: FileSystemPath,
506 env: ResolvedVc<Box<dyn ProcessEnv>>,
507 context_source_for_issue: ResolvedVc<Box<dyn Source>>,
508 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
509 module_graph: ResolvedVc<ModuleGraph>,
510 args: Vec<ResolvedVc<JsonValue>>,
511 additional_invalidation: ResolvedVc<Completion>,
512 debug: bool,
513}
514
515impl EvaluateContext for BasicEvaluateContext {
516 type InfoMessage = ();
517 type RequestMessage = ();
518 type ResponseMessage = ();
519 type State = ();
520
521 fn pool(&self) -> OperationVc<crate::pool::NodeJsPool> {
522 get_evaluate_pool(
523 self.entries,
524 self.cwd.clone(),
525 self.env,
526 self.chunking_context,
527 self.module_graph,
528 self.additional_invalidation,
529 self.debug,
530 EnvVarTracking::WholeEnvTracked,
531 )
532 }
533
534 fn args(&self) -> &[ResolvedVc<serde_json::Value>] {
535 &self.args
536 }
537
538 fn cwd(&self) -> Vc<turbo_tasks_fs::FileSystemPath> {
539 self.cwd.clone().cell()
540 }
541
542 fn keep_alive(&self) -> bool {
543 !self.args.is_empty()
544 }
545
546 async fn emit_error(&self, error: StructuredError, pool: &NodeJsPool) -> Result<()> {
547 EvaluationIssue {
548 error,
549 source: IssueSource::from_source_only(self.context_source_for_issue),
550 assets_for_source_mapping: pool.assets_for_source_mapping,
551 assets_root: pool.assets_root.clone(),
552 root_path: self.chunking_context.root_path().owned().await?,
553 }
554 .resolved_cell()
555 .emit();
556 Ok(())
557 }
558
559 async fn info(
560 &self,
561 _state: &mut Self::State,
562 _data: Self::InfoMessage,
563 _pool: &NodeJsPool,
564 ) -> Result<()> {
565 bail!("BasicEvaluateContext does not support info messages")
566 }
567
568 async fn request(
569 &self,
570 _state: &mut Self::State,
571 _data: Self::RequestMessage,
572 _pool: &NodeJsPool,
573 ) -> Result<Self::ResponseMessage> {
574 bail!("BasicEvaluateContext does not support request messages")
575 }
576
577 async fn finish(&self, _state: Self::State, _pool: &NodeJsPool) -> Result<()> {
578 Ok(())
579 }
580}
581
582pub fn scale_zero() {
583 NodeJsPool::scale_zero();
584}
585
586pub fn scale_down() {
587 NodeJsPool::scale_down();
588}
589
590#[turbo_tasks::value(shared)]
592pub struct EvaluationIssue {
593 pub source: IssueSource,
594 pub error: StructuredError,
595 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
596 pub assets_root: FileSystemPath,
597 pub root_path: FileSystemPath,
598}
599
600#[turbo_tasks::value_impl]
601impl Issue for EvaluationIssue {
602 #[turbo_tasks::function]
603 fn title(&self) -> Vc<StyledString> {
604 StyledString::Text(rcstr!("Error evaluating Node.js code")).cell()
605 }
606
607 #[turbo_tasks::function]
608 fn stage(&self) -> Vc<IssueStage> {
609 IssueStage::Transform.cell()
610 }
611
612 #[turbo_tasks::function]
613 fn file_path(&self) -> Vc<FileSystemPath> {
614 self.source.file_path()
615 }
616
617 #[turbo_tasks::function]
618 async fn description(&self) -> Result<Vc<OptionStyledString>> {
619 Ok(Vc::cell(Some(
620 StyledString::Text(
621 self.error
622 .print(
623 *self.assets_for_source_mapping,
624 self.assets_root.clone(),
625 self.root_path.clone(),
626 FormattingMode::Plain,
627 )
628 .await?
629 .into(),
630 )
631 .resolved_cell(),
632 )))
633 }
634
635 #[turbo_tasks::function]
636 fn source(&self) -> Vc<OptionIssueSource> {
637 Vc::cell(Some(self.source))
638 }
639}