turbopack_node/render/
render_static.rs

1use anyhow::{Context, Result, anyhow, bail};
2use async_stream::try_stream as generator;
3use futures::{
4    SinkExt, StreamExt, TryStreamExt,
5    channel::mpsc::{UnboundedSender, unbounded},
6    pin_mut,
7};
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10use turbo_rcstr::rcstr;
11use turbo_tasks::{
12    RawVc, ResolvedVc, TaskInput, ValueToString, Vc, VcValueType, duration_span, mark_finished,
13    prevent_gc, trace::TraceRawVcs, util::SharedError,
14};
15use turbo_tasks_bytes::{Bytes, Stream};
16use turbo_tasks_env::ProcessEnv;
17use turbo_tasks_fs::{File, FileSystemPath};
18use turbopack_core::{
19    asset::{Asset, AssetContent},
20    chunk::{ChunkingContext, EvaluatableAsset, EvaluatableAssets},
21    error::PrettyPrintError,
22    issue::{IssueExt, StyledString},
23    module::Module,
24};
25use turbopack_dev_server::{
26    html::DevHtmlAsset,
27    source::{Body, HeaderList, Rewrite, RewriteBuilder},
28};
29
30use super::{
31    RenderData, RenderStaticIncomingMessage, RenderStaticOutgoingMessage, issue::RenderingIssue,
32};
33use crate::{
34    ResponseHeaders, get_intermediate_asset, get_renderer_pool_operation, pool::NodeJsOperation,
35    render::error_page::error_html_body, source_map::trace_stack,
36};
37
38#[derive(Clone, Debug)]
39#[turbo_tasks::value]
40pub enum StaticResult {
41    Content {
42        content: ResolvedVc<AssetContent>,
43        status_code: u16,
44        headers: ResolvedVc<HeaderList>,
45    },
46    StreamedContent {
47        status: u16,
48        headers: ResolvedVc<HeaderList>,
49        body: Body,
50    },
51    Rewrite(ResolvedVc<Rewrite>),
52}
53
54#[turbo_tasks::value_impl]
55impl StaticResult {
56    #[turbo_tasks::function]
57    pub fn content(
58        content: ResolvedVc<AssetContent>,
59        status_code: u16,
60        headers: ResolvedVc<HeaderList>,
61    ) -> Vc<Self> {
62        StaticResult::Content {
63            content,
64            status_code,
65            headers,
66        }
67        .cell()
68    }
69
70    #[turbo_tasks::function]
71    pub fn rewrite(rewrite: ResolvedVc<Rewrite>) -> Vc<Self> {
72        StaticResult::Rewrite(rewrite).cell()
73    }
74}
75
76/// Renders a module as static HTML in a node.js process.
77#[turbo_tasks::function(operation)]
78pub async fn render_static_operation(
79    cwd: FileSystemPath,
80    env: ResolvedVc<Box<dyn ProcessEnv>>,
81    path: FileSystemPath,
82    module: ResolvedVc<Box<dyn EvaluatableAsset>>,
83    runtime_entries: ResolvedVc<EvaluatableAssets>,
84    fallback_page: ResolvedVc<DevHtmlAsset>,
85    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
86    intermediate_output_path: FileSystemPath,
87    output_root: FileSystemPath,
88    project_dir: FileSystemPath,
89    data: ResolvedVc<RenderData>,
90    debug: bool,
91) -> Result<Vc<StaticResult>> {
92    let render = render_stream(RenderStreamOptions {
93        cwd,
94        env,
95        path,
96        module,
97        runtime_entries,
98        fallback_page,
99        chunking_context,
100        intermediate_output_path,
101        output_root,
102        project_dir,
103        data,
104        debug,
105    })
106    .await?;
107
108    let mut stream = render.read();
109    let first = match stream.try_next().await? {
110        Some(f) => f,
111        None => {
112            // If an Error was received first, then it would have been
113            // transformed into a proxy err error response.
114            bail!("did not receive response from render");
115        }
116    };
117
118    Ok(match first {
119        RenderItem::Response(response) => *response,
120        RenderItem::Headers(data) => {
121            let body = stream.map(|item| match item {
122                Ok(RenderItem::BodyChunk(b)) => Ok(b),
123                Ok(v) => Err(SharedError::new(anyhow!(
124                    "unexpected render item: {:#?}",
125                    v
126                ))),
127                Err(e) => Err(e),
128            });
129            StaticResult::StreamedContent {
130                status: data.status,
131                headers: ResolvedVc::cell(data.headers),
132                body: Body::from_stream(body),
133            }
134            .cell()
135        }
136        v => bail!("unexpected render item: {:#?}", v),
137    })
138}
139
140async fn static_error(
141    path: FileSystemPath,
142    error: anyhow::Error,
143    operation: Option<NodeJsOperation>,
144    fallback_page: Vc<DevHtmlAsset>,
145) -> Result<Vc<AssetContent>> {
146    let status = match operation {
147        Some(operation) => Some(operation.wait_or_kill().await?),
148        None => None,
149    };
150
151    let error = format!("{}", PrettyPrintError(&error));
152    let mut message = error
153        // TODO this is pretty inefficient
154        .replace('&', "&amp;")
155        .replace('>', "&gt;")
156        .replace('<', "&lt;");
157
158    if let Some(status) = status {
159        message.push_str(&format!("\n\nStatus: {status}"));
160    }
161
162    let mut body = "<script id=\"__NEXT_DATA__\" type=\"application/json\">{ \"props\": {} \
163                    }</script>"
164        .to_string();
165
166    body.push_str(
167        error_html_body(500, rcstr!("Error rendering page"), message.into())
168            .await?
169            .as_str(),
170    );
171
172    let issue = RenderingIssue {
173        file_path: path,
174        message: StyledString::Text(error.into()).resolved_cell(),
175        status: status.and_then(|status| status.code()),
176    };
177
178    issue.resolved_cell().emit();
179
180    let html = fallback_page.with_body(body.into());
181
182    Ok(html.content())
183}
184
185#[derive(Clone, Debug)]
186#[turbo_tasks::value]
187enum RenderItem {
188    Response(ResolvedVc<StaticResult>),
189    Headers(ResponseHeaders),
190    BodyChunk(Bytes),
191}
192
193type RenderItemResult = Result<RenderItem, SharedError>;
194
195#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
196struct RenderStreamSender {
197    #[turbo_tasks(trace_ignore, debug_ignore)]
198    get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
199}
200
201#[turbo_tasks::value(transparent)]
202struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);
203
204#[derive(Clone, Debug, TaskInput, PartialEq, Eq, Hash, Deserialize, Serialize, TraceRawVcs)]
205struct RenderStreamOptions {
206    cwd: FileSystemPath,
207    env: ResolvedVc<Box<dyn ProcessEnv>>,
208    path: FileSystemPath,
209    module: ResolvedVc<Box<dyn EvaluatableAsset>>,
210    runtime_entries: ResolvedVc<EvaluatableAssets>,
211    fallback_page: ResolvedVc<DevHtmlAsset>,
212    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
213    intermediate_output_path: FileSystemPath,
214    output_root: FileSystemPath,
215    project_dir: FileSystemPath,
216    data: ResolvedVc<RenderData>,
217    debug: bool,
218}
219
220#[turbo_tasks::function]
221fn render_stream(options: RenderStreamOptions) -> Vc<RenderStream> {
222    // TODO: The way we invoke render_stream_internal as side effect is not
223    // GC-safe, so we disable GC for this task.
224    prevent_gc();
225
226    // Note the following code uses some hacks to create a child task that produces
227    // a stream that is returned by this task.
228
229    // We create a new cell in this task, which will be updated from the
230    // [render_stream_internal] task.
231    let cell = turbo_tasks::macro_helpers::find_cell_by_type(
232        <RenderStream as VcValueType>::get_value_type_id(),
233    );
234
235    // We initialize the cell with a stream that is open, but has no values.
236    // The first [render_stream_internal] pipe call will pick up that stream.
237    let (sender, receiver) = unbounded();
238    cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
239    let initial = Mutex::new(Some(sender));
240
241    // run the evaluation as side effect
242    let _ = render_stream_internal(
243        options,
244        RenderStreamSender {
245            get: Box::new(move || {
246                if let Some(sender) = initial.lock().take() {
247                    sender
248                } else {
249                    // In cases when only [render_stream_internal] is (re)executed, we need to
250                    // update the old stream with a new value.
251                    let (sender, receiver) = unbounded();
252                    cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
253                    sender
254                }
255            }),
256        }
257        .cell(),
258    );
259
260    let raw: RawVc = cell.into();
261    raw.into()
262}
263
264#[turbo_tasks::function]
265async fn render_stream_internal(
266    options: RenderStreamOptions,
267    sender: Vc<RenderStreamSender>,
268) -> Result<Vc<()>> {
269    let RenderStreamOptions {
270        cwd,
271        env,
272        path,
273        module,
274        runtime_entries,
275        fallback_page,
276        chunking_context,
277        intermediate_output_path,
278        output_root,
279        project_dir,
280        data,
281        debug,
282    } = options;
283
284    mark_finished();
285    let Ok(sender) = sender.await else {
286        // Impossible to handle the error in a good way.
287        return Ok(Default::default());
288    };
289
290    let stream = generator! {
291        let intermediate_asset = get_intermediate_asset(
292            *chunking_context,
293            *module,
294            *runtime_entries,
295        ).to_resolved().await?;
296        let renderer_pool_op = get_renderer_pool_operation(
297            cwd,
298            env,
299            intermediate_asset,
300            intermediate_output_path.clone(),
301            output_root,
302            project_dir.clone(),
303            debug,
304        );
305
306        // Read this strongly consistent, since we don't want to run inconsistent
307        // node.js code.
308        let pool = renderer_pool_op.read_strongly_consistent().await?;
309        let data = data.await?;
310        let mut operation = pool.operation().await?;
311
312        operation
313            .send(RenderStaticOutgoingMessage::Headers { data: &data })
314            .await
315            .context("sending headers to node.js process")?;
316
317        let entry = module.ident().to_string().await?;
318        let guard = duration_span!("Node.js rendering", entry = display(entry));
319
320        match operation.recv().await? {
321            RenderStaticIncomingMessage::Headers { data } => yield RenderItem::Headers(data),
322            RenderStaticIncomingMessage::Rewrite { path } => {
323                drop(guard);
324                yield RenderItem::Response(
325                    StaticResult::rewrite(RewriteBuilder::new(path).build()).to_resolved().await?
326                );
327                return;
328            }
329            RenderStaticIncomingMessage::Response {
330                status_code,
331                headers,
332                body,
333            } => {
334                drop(guard);
335                yield RenderItem::Response(
336                    StaticResult::content(
337                        AssetContent::file(File::from(body).into()),
338                        status_code,
339                        Vc::cell(headers),
340                    ).to_resolved().await?
341                );
342                return;
343            }
344            RenderStaticIncomingMessage::Error(error) => {
345                drop(guard);
346                // If we don't get headers, then something is very wrong. Instead, we send down a
347                // 500 proxy error as if it were the proper result.
348                let trace = trace_stack(
349                    error,
350                    *intermediate_asset,
351                    intermediate_output_path.clone(),
352                    project_dir.clone(),
353                )
354                .await?;
355                yield RenderItem::Response(
356                    StaticResult::content(
357                        static_error(path, anyhow!(trace), Some(operation), *fallback_page).await?,
358                        500,
359                        HeaderList::empty(),
360                    ).to_resolved().await?
361                );
362                return;
363            }
364            v => {
365                drop(guard);
366                Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
367                return;
368            },
369        };
370
371        // If we get here, then the first message was a Headers. Now we need to stream out the body
372        // chunks.
373        loop {
374            match operation.recv().await? {
375                RenderStaticIncomingMessage::BodyChunk { data } => {
376                    yield RenderItem::BodyChunk(data.into());
377                }
378                RenderStaticIncomingMessage::BodyEnd => break,
379                RenderStaticIncomingMessage::Error(error) => {
380                    // We have already started to send a result, so we can't change the
381                    // headers/body to a proxy error.
382                    operation.disallow_reuse();
383                    let trace =
384                        trace_stack(error, *intermediate_asset, intermediate_output_path.clone(), project_dir.clone()).await?;
385                        drop(guard);
386                    Err(anyhow!("error during streaming render: {}", trace))?;
387                    return;
388                }
389                v => {
390                    drop(guard);
391                    Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
392                    return;
393                },
394            }
395        }
396        drop(guard);
397    };
398
399    let mut sender = (sender.get)();
400    pin_mut!(stream);
401    while let Some(value) = stream.next().await {
402        if sender.send(value).await.is_err() {
403            return Ok(Default::default());
404        }
405        if sender.flush().await.is_err() {
406            return Ok(Default::default());
407        }
408    }
409
410    Ok(Default::default())
411}