1use std::{borrow::Cow, iter, ops::ControlFlow, thread::available_parallelism, time::Duration};
2
3use anyhow::{Result, anyhow, bail};
4use async_stream::try_stream as generator;
5use async_trait::async_trait;
6use futures::{
7 SinkExt, StreamExt,
8 channel::mpsc::{UnboundedSender, unbounded},
9 pin_mut,
10};
11use futures_retry::{FutureRetry, RetryPolicy};
12use parking_lot::Mutex;
13use serde::{Deserialize, Serialize, de::DeserializeOwned};
14use serde_json::Value as JsonValue;
15use turbo_rcstr::rcstr;
16use turbo_tasks::{
17 Completion, FxIndexMap, NonLocalValue, OperationVc, RawVc, ResolvedVc, TaskInput,
18 TryJoinIterExt, Vc, VcValueType, apply_effects, duration_span, fxindexmap, mark_finished,
19 prevent_gc, trace::TraceRawVcs, util::SharedError,
20};
21use turbo_tasks_bytes::{Bytes, Stream};
22use turbo_tasks_env::{EnvMap, ProcessEnv};
23use turbo_tasks_fs::{File, FileSystemPath, to_sys_path};
24use turbopack_core::{
25 asset::AssetContent,
26 changed::content_changed,
27 chunk::{ChunkingContext, ChunkingContextExt, EvaluatableAsset, EvaluatableAssets},
28 context::AssetContext,
29 error::PrettyPrintError,
30 file_source::FileSource,
31 issue::{
32 Issue, IssueExt, IssueSource, IssueStage, OptionIssueSource, OptionStyledString,
33 StyledString,
34 },
35 module::Module,
36 module_graph::{ModuleGraph, chunk_group_info::ChunkGroupEntry},
37 output::{OutputAsset, OutputAssets},
38 reference_type::{InnerAssets, ReferenceType},
39 source::Source,
40 virtual_source::VirtualSource,
41};
42
43use crate::{
44 AssetsForSourceMapping,
45 embed_js::embed_file_path,
46 emit, emit_package_json, internal_assets_for_source_mapping,
47 pool::{FormattingMode, NodeJsOperation, NodeJsPool},
48 source_map::StructuredError,
49};
50
51#[derive(Serialize)]
52#[serde(tag = "type", rename_all = "camelCase")]
53enum EvalJavaScriptOutgoingMessage<'a> {
54 #[serde(rename_all = "camelCase")]
55 Evaluate { args: Vec<&'a JsonValue> },
56 Result {
57 id: u64,
58 data: Option<JsonValue>,
59 error: Option<String>,
60 },
61}
62
63#[derive(Deserialize, Debug)]
64#[serde(tag = "type", rename_all = "camelCase")]
65enum EvalJavaScriptIncomingMessage {
66 Info { data: JsonValue },
67 Request { id: u64, data: JsonValue },
68 End { data: Option<String> },
69 Error(StructuredError),
70}
71
72type LoopResult = ControlFlow<Result<Option<String>, StructuredError>, String>;
73
74type EvaluationItem = Result<Bytes, SharedError>;
75type JavaScriptStream = Stream<EvaluationItem>;
76
77#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
78pub struct JavaScriptStreamSender {
79 #[turbo_tasks(trace_ignore, debug_ignore)]
80 get: Box<dyn Fn() -> UnboundedSender<Result<Bytes, SharedError>> + Send + Sync>,
81}
82
83#[turbo_tasks::value(transparent)]
84#[derive(Clone, Debug)]
85pub struct JavaScriptEvaluation(#[turbo_tasks(trace_ignore)] JavaScriptStream);
86
87#[turbo_tasks::value]
88struct EmittedEvaluatePoolAssets {
89 bootstrap: ResolvedVc<Box<dyn OutputAsset>>,
90 output_root: FileSystemPath,
91 entrypoint: FileSystemPath,
92}
93
94#[turbo_tasks::function(operation)]
95async fn emit_evaluate_pool_assets_operation(
96 module_asset: ResolvedVc<Box<dyn Module>>,
97 asset_context: ResolvedVc<Box<dyn AssetContext>>,
98 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
99 runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
100) -> Result<Vc<EmittedEvaluatePoolAssets>> {
101 let runtime_asset = asset_context
102 .process(
103 Vc::upcast(FileSource::new(
104 embed_file_path(rcstr!("ipc/evaluate.ts"))
105 .await?
106 .clone_value(),
107 )),
108 ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
109 )
110 .module()
111 .to_resolved()
112 .await?;
113
114 let module_path = module_asset.ident().path().await?;
115 let file_name = module_path.file_name();
116 let file_name = if file_name.ends_with(".js") {
117 Cow::Borrowed(file_name)
118 } else if let Some(file_name) = file_name.strip_suffix(".ts") {
119 Cow::Owned(format!("{file_name}.js"))
120 } else {
121 Cow::Owned(format!("{file_name}.js"))
122 };
123 let entrypoint = chunking_context.output_root().await?.join(&file_name)?;
124 let entry_module = asset_context
125 .process(
126 Vc::upcast(VirtualSource::new(
127 runtime_asset.ident().path().await?.join("evaluate.js")?,
128 AssetContent::file(
129 File::from("import { run } from 'RUNTIME'; run(() => import('INNER'))").into(),
130 ),
131 )),
132 ReferenceType::Internal(ResolvedVc::cell(fxindexmap! {
133 rcstr!("INNER") => module_asset,
134 rcstr!("RUNTIME") => runtime_asset
135 })),
136 )
137 .module()
138 .to_resolved()
139 .await?;
140
141 let runtime_entries = {
142 let globals_module = asset_context
143 .process(
144 Vc::upcast(FileSource::new(
145 embed_file_path(rcstr!("globals.ts")).await?.clone_value(),
146 )),
147 ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
148 )
149 .module();
150
151 let Some(globals_module) =
152 Vc::try_resolve_sidecast::<Box<dyn EvaluatableAsset>>(globals_module).await?
153 else {
154 bail!("Internal module is not evaluatable");
155 };
156
157 let mut entries = vec![globals_module.to_resolved().await?];
158 if let Some(runtime_entries) = runtime_entries {
159 for &entry in &*runtime_entries.await? {
160 entries.push(entry)
161 }
162 }
163 entries
164 };
165
166 let module_graph = ModuleGraph::from_modules(
167 Vc::cell(vec![ChunkGroupEntry::Entry(
168 iter::once(entry_module)
169 .chain(runtime_entries.iter().copied().map(ResolvedVc::upcast))
170 .collect(),
171 )]),
172 false,
173 );
174
175 let bootstrap = chunking_context.root_entry_chunk_group_asset(
176 entrypoint.clone(),
177 Vc::<EvaluatableAssets>::cell(runtime_entries)
178 .with_entry(*ResolvedVc::try_downcast(entry_module).unwrap()),
179 module_graph,
180 OutputAssets::empty(),
181 );
182
183 let output_root = chunking_context.output_root().await?.clone_value();
184 let _ = emit_package_json(output_root.clone())?.resolve().await?;
185 let _ = emit(bootstrap, output_root.clone()).resolve().await?;
186
187 Ok(EmittedEvaluatePoolAssets {
188 bootstrap: bootstrap.to_resolved().await?,
189 output_root,
190 entrypoint: entrypoint.clone(),
191 }
192 .cell())
193}
194
195#[turbo_tasks::function(operation)]
196async fn emit_evaluate_pool_assets_with_effects_operation(
197 module_asset: ResolvedVc<Box<dyn Module>>,
198 asset_context: ResolvedVc<Box<dyn AssetContext>>,
199 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
200 runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
201) -> Result<Vc<EmittedEvaluatePoolAssets>> {
202 let operation = emit_evaluate_pool_assets_operation(
203 module_asset,
204 asset_context,
205 chunking_context,
206 runtime_entries,
207 );
208 let result = operation.resolve_strongly_consistent().await?;
209 apply_effects(operation).await?;
210 Ok(*result)
211}
212
213#[derive(
214 Clone,
215 Copy,
216 Hash,
217 Debug,
218 PartialEq,
219 Eq,
220 Serialize,
221 Deserialize,
222 TaskInput,
223 NonLocalValue,
224 TraceRawVcs,
225)]
226pub enum EnvVarTracking {
227 WholeEnvTracked,
228 Untracked,
229}
230
231#[turbo_tasks::function(operation)]
232pub async fn get_evaluate_pool(
235 module_asset: ResolvedVc<Box<dyn Module>>,
236 cwd: FileSystemPath,
237 env: ResolvedVc<Box<dyn ProcessEnv>>,
238 asset_context: ResolvedVc<Box<dyn AssetContext>>,
239 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
240 runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
241 additional_invalidation: ResolvedVc<Completion>,
242 debug: bool,
243 env_var_tracking: EnvVarTracking,
244) -> Result<Vc<NodeJsPool>> {
245 let EmittedEvaluatePoolAssets {
246 bootstrap,
247 output_root,
248 entrypoint,
249 } = &*emit_evaluate_pool_assets_with_effects_operation(
250 module_asset,
251 asset_context,
252 chunking_context,
253 runtime_entries,
254 )
255 .read_strongly_consistent()
256 .await?;
257
258 let (Some(cwd), Some(entrypoint)) = (
259 to_sys_path(cwd.clone()).await?,
260 to_sys_path(entrypoint.clone()).await?,
261 ) else {
262 panic!("can only evaluate from a disk filesystem");
263 };
264
265 content_changed(Vc::upcast(**bootstrap)).await?;
267 let assets_for_source_mapping =
268 internal_assets_for_source_mapping(**bootstrap, output_root.clone())
269 .to_resolved()
270 .await?;
271 let env = match env_var_tracking {
272 EnvVarTracking::WholeEnvTracked => env.read_all().await?,
273 EnvVarTracking::Untracked => {
274 common_node_env(*env).await?;
276 for name in ["FORCE_COLOR", "NO_COLOR", "OPENSSL_CONF", "TZ"] {
277 env.read(name.into()).await?;
278 }
279
280 env.read_all().untracked().await?
281 }
282 };
283 let pool = NodeJsPool::new(
284 cwd,
285 entrypoint,
286 env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
287 assets_for_source_mapping,
288 output_root.clone(),
289 chunking_context.root_path().await?.clone_value(),
290 available_parallelism().map_or(1, |v| v.get()),
291 debug,
292 );
293 additional_invalidation.await?;
294 Ok(pool.cell())
295}
296
297#[turbo_tasks::function]
298async fn common_node_env(env: Vc<Box<dyn ProcessEnv>>) -> Result<Vc<EnvMap>> {
299 let mut filtered = FxIndexMap::default();
300 let env = env.read_all().await?;
301 for (key, value) in &*env {
302 let uppercase = key.to_uppercase();
303 for filter in &["NODE_", "UV_", "SSL_"] {
304 if uppercase.starts_with(filter) {
305 filtered.insert(key.clone(), value.clone());
306 break;
307 }
308 }
309 }
310 Ok(Vc::cell(filtered))
311}
312
313struct PoolErrorHandler;
314
315const MAX_FAST_ATTEMPTS: usize = 5;
317const MAX_ATTEMPTS: usize = MAX_FAST_ATTEMPTS * 2;
319
320impl futures_retry::ErrorHandler<anyhow::Error> for PoolErrorHandler {
321 type OutError = anyhow::Error;
322
323 fn handle(&mut self, attempt: usize, err: anyhow::Error) -> RetryPolicy<Self::OutError> {
324 if attempt >= MAX_ATTEMPTS {
325 RetryPolicy::ForwardError(err)
326 } else if attempt >= MAX_FAST_ATTEMPTS {
327 RetryPolicy::WaitRetry(Duration::from_secs(1))
328 } else {
329 RetryPolicy::Repeat
330 }
331 }
332}
333
334#[async_trait]
335pub trait EvaluateContext {
336 type InfoMessage: DeserializeOwned;
337 type RequestMessage: DeserializeOwned;
338 type ResponseMessage: Serialize;
339 type State: Default;
340
341 fn compute(self, sender: Vc<JavaScriptStreamSender>);
342 fn pool(&self) -> OperationVc<NodeJsPool>;
343 fn keep_alive(&self) -> bool {
344 false
345 }
346 fn args(&self) -> &[ResolvedVc<JsonValue>];
347 fn cwd(&self) -> Vc<FileSystemPath>;
348 async fn emit_error(&self, error: StructuredError, pool: &NodeJsPool) -> Result<()>;
349 async fn info(
350 &self,
351 state: &mut Self::State,
352 data: Self::InfoMessage,
353 pool: &NodeJsPool,
354 ) -> Result<()>;
355 async fn request(
356 &self,
357 state: &mut Self::State,
358 data: Self::RequestMessage,
359 pool: &NodeJsPool,
360 ) -> Result<Self::ResponseMessage>;
361 async fn finish(&self, _state: Self::State, _pool: &NodeJsPool) -> Result<()>;
362}
363
364pub fn custom_evaluate(evaluate_context: impl EvaluateContext) -> Vc<JavaScriptEvaluation> {
365 prevent_gc();
368
369 let cell = turbo_tasks::macro_helpers::find_cell_by_type(
375 <JavaScriptEvaluation as VcValueType>::get_value_type_id(),
376 );
377
378 let (sender, receiver) = unbounded();
381 cell.update(JavaScriptEvaluation(JavaScriptStream::new_open(
382 vec![],
383 Box::new(receiver),
384 )));
385 let initial = Mutex::new(Some(sender));
386
387 evaluate_context.compute(
389 JavaScriptStreamSender {
390 get: Box::new(move || {
391 if let Some(sender) = initial.lock().take() {
392 sender
393 } else {
394 let (sender, receiver) = unbounded();
397 cell.update(JavaScriptEvaluation(JavaScriptStream::new_open(
398 vec![],
399 Box::new(receiver),
400 )));
401 sender
402 }
403 }),
404 }
405 .cell(),
406 );
407
408 let raw: RawVc = cell.into();
409 raw.into()
410}
411
412#[turbo_tasks::function]
415pub fn evaluate(
416 module_asset: ResolvedVc<Box<dyn Module>>,
417 cwd: FileSystemPath,
418 env: ResolvedVc<Box<dyn ProcessEnv>>,
419 context_source_for_issue: ResolvedVc<Box<dyn Source>>,
420 asset_context: ResolvedVc<Box<dyn AssetContext>>,
421 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
422 runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
423 args: Vec<ResolvedVc<JsonValue>>,
424 additional_invalidation: ResolvedVc<Completion>,
425 debug: bool,
426) -> Vc<JavaScriptEvaluation> {
427 custom_evaluate(BasicEvaluateContext {
428 module_asset,
429 cwd,
430 env,
431 context_source_for_issue,
432 asset_context,
433 chunking_context,
434 runtime_entries,
435 args,
436 additional_invalidation,
437 debug,
438 })
439}
440
441pub async fn compute(
442 evaluate_context: impl EvaluateContext,
443 sender: Vc<JavaScriptStreamSender>,
444) -> Result<Vc<()>> {
445 mark_finished();
446 let Ok(sender) = sender.await else {
447 return Ok(Default::default());
449 };
450
451 let stream = generator! {
452 let pool_op = evaluate_context.pool();
453 let mut state = Default::default();
454
455 let pool = pool_op.read_strongly_consistent().await?;
458
459 let args = evaluate_context.args().iter().try_join().await?;
460 let kill = !evaluate_context.keep_alive();
463
464 let (mut operation, _) = FutureRetry::new(
470 || async {
471 let mut operation = pool.operation().await?;
472 operation
473 .send(EvalJavaScriptOutgoingMessage::Evaluate {
474 args: args.iter().map(|v| &**v).collect(),
475 })
476 .await?;
477 Ok(operation)
478 },
479 PoolErrorHandler,
480 )
481 .await
482 .map_err(|(e, _)| e)?;
483
484 loop {
488 let output = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).await?;
489
490 match output {
491 LoopResult::Continue(data) => {
492 yield data.into();
493 }
494 LoopResult::Break(Ok(Some(data))) => {
495 yield data.into();
496 break;
497 }
498 LoopResult::Break(Err(e)) => {
499 let error = print_error(e, &pool, &evaluate_context).await?;
500 Err(anyhow!("Node.js evaluation failed: {}", error))?;
501 break;
502 }
503 LoopResult::Break(Ok(None)) => {
504 break;
505 }
506 }
507 }
508
509 evaluate_context.finish(state, &pool).await?;
510
511 if kill {
512 operation.wait_or_kill().await?;
513 }
514 };
515
516 let mut sender = (sender.get)();
517 pin_mut!(stream);
518 while let Some(value) = stream.next().await {
519 if sender.send(value).await.is_err() {
520 return Ok(Default::default());
521 }
522 if sender.flush().await.is_err() {
523 return Ok(Default::default());
524 }
525 }
526
527 Ok(Default::default())
528}
529
530async fn pull_operation<T: EvaluateContext>(
533 operation: &mut NodeJsOperation,
534 pool: &NodeJsPool,
535 evaluate_context: &T,
536 state: &mut T::State,
537) -> Result<LoopResult> {
538 let guard = duration_span!("Node.js evaluation");
539
540 let output = loop {
541 match operation.recv().await? {
542 EvalJavaScriptIncomingMessage::Error(error) => {
543 evaluate_context.emit_error(error, pool).await?;
544 operation.disallow_reuse();
546 break ControlFlow::Break(Ok(None));
548 }
549 EvalJavaScriptIncomingMessage::End { data } => break ControlFlow::Break(Ok(data)),
550 EvalJavaScriptIncomingMessage::Info { data } => {
551 evaluate_context
552 .info(state, serde_json::from_value(data)?, pool)
553 .await?;
554 }
555 EvalJavaScriptIncomingMessage::Request { id, data } => {
556 match evaluate_context
557 .request(state, serde_json::from_value(data)?, pool)
558 .await
559 {
560 Ok(response) => {
561 operation
562 .send(EvalJavaScriptOutgoingMessage::Result {
563 id,
564 error: None,
565 data: Some(serde_json::to_value(response)?),
566 })
567 .await?;
568 }
569 Err(e) => {
570 operation
571 .send(EvalJavaScriptOutgoingMessage::Result {
572 id,
573 error: Some(PrettyPrintError(&e).to_string()),
574 data: None,
575 })
576 .await?;
577 }
578 }
579 }
580 }
581 };
582 drop(guard);
583
584 Ok(output)
585}
586
587#[turbo_tasks::function]
588async fn basic_compute(
589 evaluate_context: BasicEvaluateContext,
590 sender: Vc<JavaScriptStreamSender>,
591) -> Result<Vc<()>> {
592 compute(evaluate_context, sender).await
593}
594
595#[derive(Clone, PartialEq, Eq, Hash, TaskInput, Debug, Serialize, Deserialize, TraceRawVcs)]
596struct BasicEvaluateContext {
597 module_asset: ResolvedVc<Box<dyn Module>>,
598 cwd: FileSystemPath,
599 env: ResolvedVc<Box<dyn ProcessEnv>>,
600 context_source_for_issue: ResolvedVc<Box<dyn Source>>,
601 asset_context: ResolvedVc<Box<dyn AssetContext>>,
602 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
603 runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
604 args: Vec<ResolvedVc<JsonValue>>,
605 additional_invalidation: ResolvedVc<Completion>,
606 debug: bool,
607}
608
609#[async_trait]
610impl EvaluateContext for BasicEvaluateContext {
611 type InfoMessage = ();
612 type RequestMessage = ();
613 type ResponseMessage = ();
614 type State = ();
615
616 fn compute(self, sender: Vc<JavaScriptStreamSender>) {
617 let _ = basic_compute(self, sender);
618 }
619
620 fn pool(&self) -> OperationVc<crate::pool::NodeJsPool> {
621 get_evaluate_pool(
622 self.module_asset,
623 self.cwd.clone(),
624 self.env,
625 self.asset_context,
626 self.chunking_context,
627 self.runtime_entries,
628 self.additional_invalidation,
629 self.debug,
630 EnvVarTracking::WholeEnvTracked,
631 )
632 }
633
634 fn args(&self) -> &[ResolvedVc<serde_json::Value>] {
635 &self.args
636 }
637
638 fn cwd(&self) -> Vc<turbo_tasks_fs::FileSystemPath> {
639 self.cwd.clone().cell()
640 }
641
642 fn keep_alive(&self) -> bool {
643 !self.args.is_empty()
644 }
645
646 async fn emit_error(&self, error: StructuredError, pool: &NodeJsPool) -> Result<()> {
647 EvaluationIssue {
648 error,
649 source: IssueSource::from_source_only(self.context_source_for_issue),
650 assets_for_source_mapping: pool.assets_for_source_mapping,
651 assets_root: pool.assets_root.clone(),
652 root_path: self.chunking_context.root_path().await?.clone_value(),
653 }
654 .resolved_cell()
655 .emit();
656 Ok(())
657 }
658
659 async fn info(
660 &self,
661 _state: &mut Self::State,
662 _data: Self::InfoMessage,
663 _pool: &NodeJsPool,
664 ) -> Result<()> {
665 bail!("BasicEvaluateContext does not support info messages")
666 }
667
668 async fn request(
669 &self,
670 _state: &mut Self::State,
671 _data: Self::RequestMessage,
672 _pool: &NodeJsPool,
673 ) -> Result<Self::ResponseMessage> {
674 bail!("BasicEvaluateContext does not support request messages")
675 }
676
677 async fn finish(&self, _state: Self::State, _pool: &NodeJsPool) -> Result<()> {
678 Ok(())
679 }
680}
681
682pub fn scale_zero() {
683 NodeJsPool::scale_zero();
684}
685
686pub fn scale_down() {
687 NodeJsPool::scale_down();
688}
689
690async fn print_error(
691 error: StructuredError,
692 pool: &NodeJsPool,
693 evaluate_context: &impl EvaluateContext,
694) -> Result<String> {
695 error
696 .print(
697 *pool.assets_for_source_mapping,
698 pool.assets_root.clone(),
699 evaluate_context.cwd().await?.clone_value(),
700 FormattingMode::Plain,
701 )
702 .await
703}
704#[turbo_tasks::value(shared)]
706pub struct EvaluationIssue {
707 pub source: IssueSource,
708 pub error: StructuredError,
709 pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
710 pub assets_root: FileSystemPath,
711 pub root_path: FileSystemPath,
712}
713
714#[turbo_tasks::value_impl]
715impl Issue for EvaluationIssue {
716 #[turbo_tasks::function]
717 fn title(&self) -> Vc<StyledString> {
718 StyledString::Text(rcstr!("Error evaluating Node.js code")).cell()
719 }
720
721 #[turbo_tasks::function]
722 fn stage(&self) -> Vc<IssueStage> {
723 IssueStage::Transform.into()
724 }
725
726 #[turbo_tasks::function]
727 fn file_path(&self) -> Vc<FileSystemPath> {
728 self.source.file_path()
729 }
730
731 #[turbo_tasks::function]
732 async fn description(&self) -> Result<Vc<OptionStyledString>> {
733 Ok(Vc::cell(Some(
734 StyledString::Text(
735 self.error
736 .print(
737 *self.assets_for_source_mapping,
738 self.assets_root.clone(),
739 self.root_path.clone(),
740 FormattingMode::Plain,
741 )
742 .await?
743 .into(),
744 )
745 .resolved_cell(),
746 )))
747 }
748
749 #[turbo_tasks::function]
750 fn source(&self) -> Vc<OptionIssueSource> {
751 Vc::cell(Some(self.source))
752 }
753}