turbopack_node/
evaluate.rs

1use std::{borrow::Cow, iter, sync::Arc, thread::available_parallelism, time::Duration};
2
3use anyhow::{Result, bail};
4use bincode::{Decode, Encode};
5use futures_retry::{FutureRetry, RetryPolicy};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use serde_json::Value as JsonValue;
8use turbo_rcstr::{RcStr, rcstr};
9use turbo_tasks::{
10    Completion, Effects, FxIndexMap, NonLocalValue, OperationVc, ReadRef, ResolvedVc, TaskInput,
11    TryJoinIterExt, Vc, duration_span, fxindexmap, get_effects, trace::TraceRawVcs,
12};
13use turbo_tasks_env::{EnvMap, ProcessEnv};
14use turbo_tasks_fs::{File, FileContent, FileSystemPath, to_sys_path};
15use turbopack_core::{
16    asset::AssetContent,
17    changed::content_changed,
18    chunk::{ChunkingContext, ChunkingContextExt, EvaluatableAsset, EvaluatableAssets},
19    context::AssetContext,
20    error::PrettyPrintError,
21    file_source::FileSource,
22    ident::AssetIdent,
23    issue::{
24        Issue, IssueExt, IssueSource, IssueStage, OptionIssueSource, OptionStyledString,
25        StyledString,
26    },
27    module::Module,
28    module_graph::{GraphEntries, ModuleGraph, chunk_group_info::ChunkGroupEntry},
29    output::{OutputAsset, OutputAssets},
30    reference_type::{InnerAssets, ReferenceType},
31    source::Source,
32    virtual_source::VirtualSource,
33};
34
35use crate::{
36    AssetsForSourceMapping,
37    embed_js::embed_file_path,
38    emit, emit_package_json, internal_assets_for_source_mapping,
39    pool::{FormattingMode, NodeJsOperation, NodeJsPool},
40    source_map::StructuredError,
41};
42
43#[derive(Serialize)]
44#[serde(tag = "type", rename_all = "camelCase")]
45enum EvalJavaScriptOutgoingMessage<'a> {
46    #[serde(rename_all = "camelCase")]
47    Evaluate { args: Vec<&'a JsonValue> },
48    Result {
49        id: u64,
50        data: Option<JsonValue>,
51        error: Option<String>,
52    },
53}
54
55#[derive(Deserialize, Debug)]
56#[serde(tag = "type", rename_all = "camelCase")]
57enum EvalJavaScriptIncomingMessage {
58    Info { data: JsonValue },
59    Request { id: u64, data: JsonValue },
60    End { data: Option<String> },
61    Error(StructuredError),
62}
63
64#[turbo_tasks::value]
65struct EmittedEvaluatePoolAssets {
66    bootstrap: ResolvedVc<Box<dyn OutputAsset>>,
67    output_root: FileSystemPath,
68    entrypoint: FileSystemPath,
69}
70
71#[turbo_tasks::function(operation)]
72async fn emit_evaluate_pool_assets_operation(
73    entries: ResolvedVc<EvaluateEntries>,
74    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
75    module_graph: ResolvedVc<ModuleGraph>,
76) -> Result<Vc<EmittedEvaluatePoolAssets>> {
77    let EvaluateEntries {
78        entries,
79        main_entry_ident,
80    } = &*entries.await?;
81
82    let module_path = main_entry_ident.path().await?;
83    let file_name = module_path.file_name();
84    let file_name = if file_name.ends_with(".js") {
85        Cow::Borrowed(file_name)
86    } else if let Some(file_name) = file_name.strip_suffix(".ts") {
87        Cow::Owned(format!("{file_name}.js"))
88    } else {
89        Cow::Owned(format!("{file_name}.js"))
90    };
91    let entrypoint = chunking_context.output_root().await?.join(&file_name)?;
92
93    let bootstrap = chunking_context.root_entry_chunk_group_asset(
94        entrypoint.clone(),
95        Vc::cell(entries.clone()),
96        *module_graph,
97        OutputAssets::empty(),
98        OutputAssets::empty(),
99    );
100
101    let output_root = chunking_context.output_root().owned().await?;
102    emit_package_json(output_root.clone())?
103        .as_side_effect()
104        .await?;
105    emit(bootstrap, output_root.clone())
106        .as_side_effect()
107        .await?;
108
109    Ok(EmittedEvaluatePoolAssets {
110        bootstrap: bootstrap.to_resolved().await?,
111        output_root,
112        entrypoint: entrypoint.clone(),
113    }
114    .cell())
115}
116
117#[turbo_tasks::value(serialization = "none")]
118struct EmittedEvaluatePoolAssetsWithEffects {
119    assets: ReadRef<EmittedEvaluatePoolAssets>,
120    effects: Arc<Effects>,
121}
122
123#[turbo_tasks::function(operation)]
124async fn emit_evaluate_pool_assets_with_effects_operation(
125    entries: ResolvedVc<EvaluateEntries>,
126    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
127    module_graph: ResolvedVc<ModuleGraph>,
128) -> Result<Vc<EmittedEvaluatePoolAssetsWithEffects>> {
129    let operation = emit_evaluate_pool_assets_operation(entries, chunking_context, module_graph);
130    let assets = operation.read_strongly_consistent().await?;
131    let effects = Arc::new(get_effects(operation).await?);
132    Ok(EmittedEvaluatePoolAssetsWithEffects { assets, effects }.cell())
133}
134
135#[derive(
136    Clone, Copy, Hash, Debug, PartialEq, Eq, TaskInput, NonLocalValue, TraceRawVcs, Encode, Decode,
137)]
138pub enum EnvVarTracking {
139    WholeEnvTracked,
140    Untracked,
141}
142
143#[turbo_tasks::function(operation)]
144/// Pass the file you cared as `runtime_entries` to invalidate and reload the
145/// evaluated result automatically.
146pub async fn get_evaluate_pool(
147    entries: ResolvedVc<EvaluateEntries>,
148    cwd: FileSystemPath,
149    env: ResolvedVc<Box<dyn ProcessEnv>>,
150    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
151    module_graph: ResolvedVc<ModuleGraph>,
152    additional_invalidation: ResolvedVc<Completion>,
153    debug: bool,
154    env_var_tracking: EnvVarTracking,
155) -> Result<Vc<NodeJsPool>> {
156    let operation =
157        emit_evaluate_pool_assets_with_effects_operation(entries, chunking_context, module_graph);
158    let EmittedEvaluatePoolAssetsWithEffects { assets, effects } =
159        &*operation.read_strongly_consistent().await?;
160    effects.apply().await?;
161
162    let EmittedEvaluatePoolAssets {
163        bootstrap,
164        output_root,
165        entrypoint,
166    } = &**assets;
167
168    let (Some(cwd), Some(entrypoint)) = (
169        to_sys_path(cwd.clone()).await?,
170        to_sys_path(entrypoint.clone()).await?,
171    ) else {
172        panic!("can only evaluate from a disk filesystem");
173    };
174
175    // Invalidate pool when code content changes
176    content_changed(Vc::upcast(**bootstrap)).await?;
177    let assets_for_source_mapping =
178        internal_assets_for_source_mapping(**bootstrap, output_root.clone())
179            .to_resolved()
180            .await?;
181    let env = match env_var_tracking {
182        EnvVarTracking::WholeEnvTracked => env.read_all().await?,
183        EnvVarTracking::Untracked => {
184            // We always depend on some known env vars that are used by Node.js
185            common_node_env(*env).await?;
186            for name in ["FORCE_COLOR", "NO_COLOR", "OPENSSL_CONF", "TZ"] {
187                env.read(name.into()).await?;
188            }
189
190            env.read_all().untracked().await?
191        }
192    };
193    let pool = NodeJsPool::new(
194        cwd,
195        entrypoint,
196        env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
197        assets_for_source_mapping,
198        output_root.clone(),
199        chunking_context.root_path().owned().await?,
200        available_parallelism().map_or(1, |v| v.get()),
201        debug,
202    );
203    additional_invalidation.await?;
204    Ok(pool.cell())
205}
206
207#[turbo_tasks::function]
208async fn common_node_env(env: Vc<Box<dyn ProcessEnv>>) -> Result<Vc<EnvMap>> {
209    let mut filtered = FxIndexMap::default();
210    let env = env.read_all().await?;
211    for (key, value) in &*env {
212        let uppercase = key.to_uppercase();
213        for filter in &["NODE_", "UV_", "SSL_"] {
214            if uppercase.starts_with(filter) {
215                filtered.insert(key.clone(), value.clone());
216                break;
217            }
218        }
219    }
220    Ok(Vc::cell(filtered))
221}
222
223struct PoolErrorHandler;
224
225/// Number of attempts before we start slowing down the retry.
226const MAX_FAST_ATTEMPTS: usize = 5;
227/// Total number of attempts.
228const MAX_ATTEMPTS: usize = MAX_FAST_ATTEMPTS * 2;
229
230impl futures_retry::ErrorHandler<anyhow::Error> for PoolErrorHandler {
231    type OutError = anyhow::Error;
232
233    fn handle(&mut self, attempt: usize, err: anyhow::Error) -> RetryPolicy<Self::OutError> {
234        if attempt >= MAX_ATTEMPTS {
235            RetryPolicy::ForwardError(err)
236        } else if attempt >= MAX_FAST_ATTEMPTS {
237            RetryPolicy::WaitRetry(Duration::from_secs(1))
238        } else {
239            RetryPolicy::Repeat
240        }
241    }
242}
243
244pub trait EvaluateContext {
245    type InfoMessage: DeserializeOwned;
246    type RequestMessage: DeserializeOwned;
247    type ResponseMessage: Serialize;
248    type State: Default;
249
250    fn pool(&self) -> OperationVc<NodeJsPool>;
251    fn keep_alive(&self) -> bool {
252        false
253    }
254    fn args(&self) -> &[ResolvedVc<JsonValue>];
255    fn cwd(&self) -> Vc<FileSystemPath>;
256    fn emit_error(
257        &self,
258        error: StructuredError,
259        pool: &NodeJsPool,
260    ) -> impl Future<Output = Result<()>> + Send;
261    fn info(
262        &self,
263        state: &mut Self::State,
264        data: Self::InfoMessage,
265        pool: &NodeJsPool,
266    ) -> impl Future<Output = Result<()>> + Send;
267    fn request(
268        &self,
269        state: &mut Self::State,
270        data: Self::RequestMessage,
271        pool: &NodeJsPool,
272    ) -> impl Future<Output = Result<Self::ResponseMessage>> + Send;
273    fn finish(
274        &self,
275        state: Self::State,
276        pool: &NodeJsPool,
277    ) -> impl Future<Output = Result<()>> + Send;
278}
279
280pub async fn custom_evaluate(evaluate_context: impl EvaluateContext) -> Result<Vc<Option<RcStr>>> {
281    let pool_op = evaluate_context.pool();
282    let mut state = Default::default();
283
284    // Read this strongly consistent, since we don't want to run inconsistent
285    // node.js code.
286    let pool = pool_op.read_strongly_consistent().await?;
287
288    let args = evaluate_context.args().iter().try_join().await?;
289    // Assume this is a one-off operation, so we can kill the process
290    // TODO use a better way to decide that.
291    let kill = !evaluate_context.keep_alive();
292
293    // Workers in the pool could be in a bad state that we didn't detect yet.
294    // The bad state might even be unnoticeable until we actually send the job to the
295    // worker. So we retry picking workers from the pools until we succeed
296    // sending the job.
297
298    let (mut operation, _) = FutureRetry::new(
299        || async {
300            let mut operation = pool.operation().await?;
301            operation
302                .send(EvalJavaScriptOutgoingMessage::Evaluate {
303                    args: args.iter().map(|v| &**v).collect(),
304                })
305                .await?;
306            Ok(operation)
307        },
308        PoolErrorHandler,
309    )
310    .await
311    .map_err(|(e, _)| e)?;
312
313    // The evaluation sent an initial intermediate value without completing. We'll
314    // need to spawn a new thread to continually pull data out of the process,
315    // and ferry that along.
316    let result = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).await?;
317
318    evaluate_context.finish(state, &pool).await?;
319
320    if kill {
321        operation.wait_or_kill().await?;
322    }
323
324    Ok(Vc::cell(result.map(RcStr::from)))
325}
326
327#[turbo_tasks::value]
328pub struct EvaluateEntries {
329    entries: Vec<ResolvedVc<Box<dyn EvaluatableAsset + 'static>>>,
330    main_entry_ident: ResolvedVc<AssetIdent>,
331}
332
333#[turbo_tasks::value_impl]
334impl EvaluateEntries {
335    #[turbo_tasks::function]
336    pub async fn graph_entries(self: Vc<Self>) -> Result<Vc<GraphEntries>> {
337        Ok(Vc::cell(vec![ChunkGroupEntry::Entry(
338            self.await?
339                .entries
340                .iter()
341                .cloned()
342                .map(ResolvedVc::upcast)
343                .collect(),
344        )]))
345    }
346}
347
348#[turbo_tasks::function]
349pub async fn get_evaluate_entries(
350    module_asset: ResolvedVc<Box<dyn Module>>,
351    asset_context: ResolvedVc<Box<dyn AssetContext>>,
352    runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
353) -> Result<Vc<EvaluateEntries>> {
354    let runtime_asset = asset_context
355        .process(
356            Vc::upcast(FileSource::new(
357                embed_file_path(rcstr!("ipc/evaluate.ts")).owned().await?,
358            )),
359            ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
360        )
361        .module()
362        .to_resolved()
363        .await?;
364
365    let entry_module = asset_context
366        .process(
367            Vc::upcast(VirtualSource::new(
368                runtime_asset.ident().path().await?.join("evaluate.js")?,
369                AssetContent::file(
370                    FileContent::Content(File::from(
371                        "import { run } from 'RUNTIME'; run(() => import('INNER'))",
372                    ))
373                    .cell(),
374                ),
375            )),
376            ReferenceType::Internal(ResolvedVc::cell(fxindexmap! {
377                rcstr!("INNER") => module_asset,
378                rcstr!("RUNTIME") => runtime_asset
379            })),
380        )
381        .module()
382        .to_resolved()
383        .await?;
384
385    let runtime_entries = {
386        let globals_module = asset_context
387            .process(
388                Vc::upcast(FileSource::new(
389                    embed_file_path(rcstr!("globals.ts")).owned().await?,
390                )),
391                ReferenceType::Internal(InnerAssets::empty().to_resolved().await?),
392            )
393            .module();
394
395        let Some(globals_module) =
396            Vc::try_resolve_sidecast::<Box<dyn EvaluatableAsset>>(globals_module).await?
397        else {
398            bail!("Internal module is not evaluatable");
399        };
400
401        let mut entries = vec![globals_module.to_resolved().await?];
402        if let Some(runtime_entries) = runtime_entries {
403            for &entry in &*runtime_entries.await? {
404                entries.push(entry)
405            }
406        }
407        entries
408    };
409
410    Ok(EvaluateEntries {
411        entries: runtime_entries
412            .iter()
413            .copied()
414            .chain(iter::once(ResolvedVc::try_downcast(entry_module).unwrap()))
415            .collect(),
416        main_entry_ident: module_asset.ident().to_resolved().await?,
417    }
418    .cell())
419}
420
421/// Pass the file you cared as `runtime_entries` to invalidate and reload the
422/// evaluated result automatically.
423#[turbo_tasks::function]
424pub async fn evaluate(
425    entries: ResolvedVc<EvaluateEntries>,
426    cwd: FileSystemPath,
427    env: ResolvedVc<Box<dyn ProcessEnv>>,
428    context_source_for_issue: ResolvedVc<Box<dyn Source>>,
429    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
430    module_graph: ResolvedVc<ModuleGraph>,
431    args: Vec<ResolvedVc<JsonValue>>,
432    additional_invalidation: ResolvedVc<Completion>,
433    debug: bool,
434) -> Result<Vc<Option<RcStr>>> {
435    custom_evaluate(BasicEvaluateContext {
436        entries,
437        cwd,
438        env,
439        context_source_for_issue,
440        chunking_context,
441        module_graph,
442        args,
443        additional_invalidation,
444        debug,
445    })
446    .await
447}
448
449/// Repeatedly pulls from the NodeJsOperation until we receive a
450/// value/error/end.
451async fn pull_operation<T: EvaluateContext>(
452    operation: &mut NodeJsOperation,
453    pool: &NodeJsPool,
454    evaluate_context: &T,
455    state: &mut T::State,
456) -> Result<Option<String>> {
457    let _guard = duration_span!("Node.js evaluation");
458
459    loop {
460        match operation.recv().await? {
461            EvalJavaScriptIncomingMessage::Error(error) => {
462                evaluate_context.emit_error(error, pool).await?;
463                // Do not reuse the process in case of error
464                operation.disallow_reuse();
465                // Issue emitted, we want to break but don't want to return an error
466                return Ok(None);
467            }
468            EvalJavaScriptIncomingMessage::End { data } => return Ok(data),
469            EvalJavaScriptIncomingMessage::Info { data } => {
470                evaluate_context
471                    .info(state, serde_json::from_value(data)?, pool)
472                    .await?;
473            }
474            EvalJavaScriptIncomingMessage::Request { id, data } => {
475                match evaluate_context
476                    .request(state, serde_json::from_value(data)?, pool)
477                    .await
478                {
479                    Ok(response) => {
480                        operation
481                            .send(EvalJavaScriptOutgoingMessage::Result {
482                                id,
483                                error: None,
484                                data: Some(serde_json::to_value(response)?),
485                            })
486                            .await?;
487                    }
488                    Err(e) => {
489                        operation
490                            .send(EvalJavaScriptOutgoingMessage::Result {
491                                id,
492                                error: Some(PrettyPrintError(&e).to_string()),
493                                data: None,
494                            })
495                            .await?;
496                    }
497                }
498            }
499        }
500    }
501}
502
503struct BasicEvaluateContext {
504    entries: ResolvedVc<EvaluateEntries>,
505    cwd: FileSystemPath,
506    env: ResolvedVc<Box<dyn ProcessEnv>>,
507    context_source_for_issue: ResolvedVc<Box<dyn Source>>,
508    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
509    module_graph: ResolvedVc<ModuleGraph>,
510    args: Vec<ResolvedVc<JsonValue>>,
511    additional_invalidation: ResolvedVc<Completion>,
512    debug: bool,
513}
514
515impl EvaluateContext for BasicEvaluateContext {
516    type InfoMessage = ();
517    type RequestMessage = ();
518    type ResponseMessage = ();
519    type State = ();
520
521    fn pool(&self) -> OperationVc<crate::pool::NodeJsPool> {
522        get_evaluate_pool(
523            self.entries,
524            self.cwd.clone(),
525            self.env,
526            self.chunking_context,
527            self.module_graph,
528            self.additional_invalidation,
529            self.debug,
530            EnvVarTracking::WholeEnvTracked,
531        )
532    }
533
534    fn args(&self) -> &[ResolvedVc<serde_json::Value>] {
535        &self.args
536    }
537
538    fn cwd(&self) -> Vc<turbo_tasks_fs::FileSystemPath> {
539        self.cwd.clone().cell()
540    }
541
542    fn keep_alive(&self) -> bool {
543        !self.args.is_empty()
544    }
545
546    async fn emit_error(&self, error: StructuredError, pool: &NodeJsPool) -> Result<()> {
547        EvaluationIssue {
548            error,
549            source: IssueSource::from_source_only(self.context_source_for_issue),
550            assets_for_source_mapping: pool.assets_for_source_mapping,
551            assets_root: pool.assets_root.clone(),
552            root_path: self.chunking_context.root_path().owned().await?,
553        }
554        .resolved_cell()
555        .emit();
556        Ok(())
557    }
558
559    async fn info(
560        &self,
561        _state: &mut Self::State,
562        _data: Self::InfoMessage,
563        _pool: &NodeJsPool,
564    ) -> Result<()> {
565        bail!("BasicEvaluateContext does not support info messages")
566    }
567
568    async fn request(
569        &self,
570        _state: &mut Self::State,
571        _data: Self::RequestMessage,
572        _pool: &NodeJsPool,
573    ) -> Result<Self::ResponseMessage> {
574        bail!("BasicEvaluateContext does not support request messages")
575    }
576
577    async fn finish(&self, _state: Self::State, _pool: &NodeJsPool) -> Result<()> {
578        Ok(())
579    }
580}
581
582pub fn scale_zero() {
583    NodeJsPool::scale_zero();
584}
585
586pub fn scale_down() {
587    NodeJsPool::scale_down();
588}
589
590/// An issue that occurred while evaluating node code.
591#[turbo_tasks::value(shared)]
592pub struct EvaluationIssue {
593    pub source: IssueSource,
594    pub error: StructuredError,
595    pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
596    pub assets_root: FileSystemPath,
597    pub root_path: FileSystemPath,
598}
599
600#[turbo_tasks::value_impl]
601impl Issue for EvaluationIssue {
602    #[turbo_tasks::function]
603    fn title(&self) -> Vc<StyledString> {
604        StyledString::Text(rcstr!("Error evaluating Node.js code")).cell()
605    }
606
607    #[turbo_tasks::function]
608    fn stage(&self) -> Vc<IssueStage> {
609        IssueStage::Transform.cell()
610    }
611
612    #[turbo_tasks::function]
613    fn file_path(&self) -> Vc<FileSystemPath> {
614        self.source.file_path()
615    }
616
617    #[turbo_tasks::function]
618    async fn description(&self) -> Result<Vc<OptionStyledString>> {
619        Ok(Vc::cell(Some(
620            StyledString::Text(
621                self.error
622                    .print(
623                        *self.assets_for_source_mapping,
624                        self.assets_root.clone(),
625                        self.root_path.clone(),
626                        FormattingMode::Plain,
627                    )
628                    .await?
629                    .into(),
630            )
631            .resolved_cell(),
632        )))
633    }
634
635    #[turbo_tasks::function]
636    fn source(&self) -> Vc<OptionIssueSource> {
637        Vc::cell(Some(self.source))
638    }
639}