turbopack_node/render/
render_static.rs

1use anyhow::{Context, Result, anyhow, bail};
2use async_stream::try_stream as generator;
3use futures::{
4    SinkExt, StreamExt, TryStreamExt,
5    channel::mpsc::{UnboundedSender, unbounded},
6    pin_mut,
7};
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10use turbo_tasks::{
11    RawVc, ResolvedVc, TaskInput, ValueToString, Vc, duration_span, mark_finished, prevent_gc,
12    trace::TraceRawVcs, util::SharedError,
13};
14use turbo_tasks_bytes::{Bytes, Stream};
15use turbo_tasks_env::ProcessEnv;
16use turbo_tasks_fs::{File, FileSystemPath};
17use turbopack_core::{
18    asset::{Asset, AssetContent},
19    chunk::{ChunkingContext, EvaluatableAsset, EvaluatableAssets},
20    error::PrettyPrintError,
21    issue::{IssueExt, StyledString},
22    module::Module,
23};
24use turbopack_dev_server::{
25    html::DevHtmlAsset,
26    source::{Body, HeaderList, Rewrite, RewriteBuilder},
27};
28
29use super::{
30    RenderData, RenderStaticIncomingMessage, RenderStaticOutgoingMessage, issue::RenderingIssue,
31};
32use crate::{
33    ResponseHeaders, get_intermediate_asset, get_renderer_pool_operation, pool::NodeJsOperation,
34    render::error_page::error_html_body, source_map::trace_stack,
35};
36
37#[derive(Clone, Debug)]
38#[turbo_tasks::value]
39pub enum StaticResult {
40    Content {
41        content: ResolvedVc<AssetContent>,
42        status_code: u16,
43        headers: ResolvedVc<HeaderList>,
44    },
45    StreamedContent {
46        status: u16,
47        headers: ResolvedVc<HeaderList>,
48        body: Body,
49    },
50    Rewrite(ResolvedVc<Rewrite>),
51}
52
53#[turbo_tasks::value_impl]
54impl StaticResult {
55    #[turbo_tasks::function]
56    pub fn content(
57        content: ResolvedVc<AssetContent>,
58        status_code: u16,
59        headers: ResolvedVc<HeaderList>,
60    ) -> Vc<Self> {
61        StaticResult::Content {
62            content,
63            status_code,
64            headers,
65        }
66        .cell()
67    }
68
69    #[turbo_tasks::function]
70    pub fn rewrite(rewrite: ResolvedVc<Rewrite>) -> Vc<Self> {
71        StaticResult::Rewrite(rewrite).cell()
72    }
73}
74
75/// Renders a module as static HTML in a node.js process.
76#[turbo_tasks::function(operation)]
77pub async fn render_static_operation(
78    cwd: ResolvedVc<FileSystemPath>,
79    env: ResolvedVc<Box<dyn ProcessEnv>>,
80    path: ResolvedVc<FileSystemPath>,
81    module: ResolvedVc<Box<dyn EvaluatableAsset>>,
82    runtime_entries: ResolvedVc<EvaluatableAssets>,
83    fallback_page: ResolvedVc<DevHtmlAsset>,
84    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
85    intermediate_output_path: ResolvedVc<FileSystemPath>,
86    output_root: ResolvedVc<FileSystemPath>,
87    project_dir: ResolvedVc<FileSystemPath>,
88    data: ResolvedVc<RenderData>,
89    debug: bool,
90) -> Result<Vc<StaticResult>> {
91    let render = render_stream(RenderStreamOptions {
92        cwd,
93        env,
94        path,
95        module,
96        runtime_entries,
97        fallback_page,
98        chunking_context,
99        intermediate_output_path,
100        output_root,
101        project_dir,
102        data,
103        debug,
104    })
105    .await?;
106
107    let mut stream = render.read();
108    let first = match stream.try_next().await? {
109        Some(f) => f,
110        None => {
111            // If an Error was received first, then it would have been
112            // transformed into a proxy err error response.
113            bail!("did not receive response from render");
114        }
115    };
116
117    Ok(match first {
118        RenderItem::Response(response) => *response,
119        RenderItem::Headers(data) => {
120            let body = stream.map(|item| match item {
121                Ok(RenderItem::BodyChunk(b)) => Ok(b),
122                Ok(v) => Err(SharedError::new(anyhow!(
123                    "unexpected render item: {:#?}",
124                    v
125                ))),
126                Err(e) => Err(e),
127            });
128            StaticResult::StreamedContent {
129                status: data.status,
130                headers: ResolvedVc::cell(data.headers),
131                body: Body::from_stream(body),
132            }
133            .cell()
134        }
135        v => bail!("unexpected render item: {:#?}", v),
136    })
137}
138
139async fn static_error(
140    path: ResolvedVc<FileSystemPath>,
141    error: anyhow::Error,
142    operation: Option<NodeJsOperation>,
143    fallback_page: Vc<DevHtmlAsset>,
144) -> Result<Vc<AssetContent>> {
145    let status = match operation {
146        Some(operation) => Some(operation.wait_or_kill().await?),
147        None => None,
148    };
149
150    let error = format!("{}", PrettyPrintError(&error));
151    let mut message = error
152        // TODO this is pretty inefficient
153        .replace('&', "&amp;")
154        .replace('>', "&gt;")
155        .replace('<', "&lt;");
156
157    if let Some(status) = status {
158        message.push_str(&format!("\n\nStatus: {status}"));
159    }
160
161    let mut body = "<script id=\"__NEXT_DATA__\" type=\"application/json\">{ \"props\": {} \
162                    }</script>"
163        .to_string();
164
165    body.push_str(
166        error_html_body(500, "Error rendering page".into(), message.into())
167            .await?
168            .as_str(),
169    );
170
171    let issue = RenderingIssue {
172        file_path: path,
173        message: StyledString::Text(error.into()).resolved_cell(),
174        status: status.and_then(|status| status.code()),
175    };
176
177    issue.resolved_cell().emit();
178
179    let html = fallback_page.with_body(body.into());
180
181    Ok(html.content())
182}
183
184#[derive(Clone, Debug)]
185#[turbo_tasks::value]
186enum RenderItem {
187    Response(ResolvedVc<StaticResult>),
188    Headers(ResponseHeaders),
189    BodyChunk(Bytes),
190}
191
192type RenderItemResult = Result<RenderItem, SharedError>;
193
194#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
195struct RenderStreamSender {
196    #[turbo_tasks(trace_ignore, debug_ignore)]
197    get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
198}
199
200#[turbo_tasks::value(transparent)]
201struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);
202
203#[derive(Clone, Debug, TaskInput, PartialEq, Eq, Hash, Deserialize, Serialize, TraceRawVcs)]
204struct RenderStreamOptions {
205    cwd: ResolvedVc<FileSystemPath>,
206    env: ResolvedVc<Box<dyn ProcessEnv>>,
207    path: ResolvedVc<FileSystemPath>,
208    module: ResolvedVc<Box<dyn EvaluatableAsset>>,
209    runtime_entries: ResolvedVc<EvaluatableAssets>,
210    fallback_page: ResolvedVc<DevHtmlAsset>,
211    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
212    intermediate_output_path: ResolvedVc<FileSystemPath>,
213    output_root: ResolvedVc<FileSystemPath>,
214    project_dir: ResolvedVc<FileSystemPath>,
215    data: ResolvedVc<RenderData>,
216    debug: bool,
217}
218
219#[turbo_tasks::function]
220fn render_stream(options: RenderStreamOptions) -> Vc<RenderStream> {
221    // TODO: The way we invoke render_stream_internal as side effect is not
222    // GC-safe, so we disable GC for this task.
223    prevent_gc();
224
225    // Note the following code uses some hacks to create a child task that produces
226    // a stream that is returned by this task.
227
228    // We create a new cell in this task, which will be updated from the
229    // [render_stream_internal] task.
230    let cell = turbo_tasks::macro_helpers::find_cell_by_type(*RENDERSTREAM_VALUE_TYPE_ID);
231
232    // We initialize the cell with a stream that is open, but has no values.
233    // The first [render_stream_internal] pipe call will pick up that stream.
234    let (sender, receiver) = unbounded();
235    cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
236    let initial = Mutex::new(Some(sender));
237
238    // run the evaluation as side effect
239    let _ = render_stream_internal(
240        options,
241        RenderStreamSender {
242            get: Box::new(move || {
243                if let Some(sender) = initial.lock().take() {
244                    sender
245                } else {
246                    // In cases when only [render_stream_internal] is (re)executed, we need to
247                    // update the old stream with a new value.
248                    let (sender, receiver) = unbounded();
249                    cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
250                    sender
251                }
252            }),
253        }
254        .cell(),
255    );
256
257    let raw: RawVc = cell.into();
258    raw.into()
259}
260
261#[turbo_tasks::function]
262async fn render_stream_internal(
263    options: RenderStreamOptions,
264    sender: Vc<RenderStreamSender>,
265) -> Result<Vc<()>> {
266    let RenderStreamOptions {
267        cwd,
268        env,
269        path,
270        module,
271        runtime_entries,
272        fallback_page,
273        chunking_context,
274        intermediate_output_path,
275        output_root,
276        project_dir,
277        data,
278        debug,
279    } = options;
280
281    mark_finished();
282    let Ok(sender) = sender.await else {
283        // Impossible to handle the error in a good way.
284        return Ok(Default::default());
285    };
286
287    let stream = generator! {
288        let intermediate_asset = get_intermediate_asset(
289            *chunking_context,
290            *module,
291            *runtime_entries,
292        ).to_resolved().await?;
293        let renderer_pool_op = get_renderer_pool_operation(
294            cwd,
295            env,
296            intermediate_asset,
297            intermediate_output_path,
298            output_root,
299            project_dir,
300            debug,
301        );
302
303        // Read this strongly consistent, since we don't want to run inconsistent
304        // node.js code.
305        let pool = renderer_pool_op.read_strongly_consistent().await?;
306        let data = data.await?;
307        let mut operation = pool.operation().await?;
308
309        operation
310            .send(RenderStaticOutgoingMessage::Headers { data: &data })
311            .await
312            .context("sending headers to node.js process")?;
313
314        let entry = module.ident().to_string().await?;
315        let guard = duration_span!("Node.js rendering", entry = display(entry));
316
317        match operation.recv().await? {
318            RenderStaticIncomingMessage::Headers { data } => yield RenderItem::Headers(data),
319            RenderStaticIncomingMessage::Rewrite { path } => {
320                drop(guard);
321                yield RenderItem::Response(
322                    StaticResult::rewrite(RewriteBuilder::new(path).build()).to_resolved().await?
323                );
324                return;
325            }
326            RenderStaticIncomingMessage::Response {
327                status_code,
328                headers,
329                body,
330            } => {
331                drop(guard);
332                yield RenderItem::Response(
333                    StaticResult::content(
334                        AssetContent::file(File::from(body).into()),
335                        status_code,
336                        Vc::cell(headers),
337                    ).to_resolved().await?
338                );
339                return;
340            }
341            RenderStaticIncomingMessage::Error(error) => {
342                drop(guard);
343                // If we don't get headers, then something is very wrong. Instead, we send down a
344                // 500 proxy error as if it were the proper result.
345                let trace = trace_stack(
346                    error,
347                    *intermediate_asset,
348                    *intermediate_output_path,
349                    *project_dir,
350                )
351                .await?;
352                yield RenderItem::Response(
353                    StaticResult::content(
354                        static_error(path, anyhow!(trace), Some(operation), *fallback_page).await?,
355                        500,
356                        HeaderList::empty(),
357                    ).to_resolved().await?
358                );
359                return;
360            }
361            v => {
362                drop(guard);
363                Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
364                return;
365            },
366        };
367
368        // If we get here, then the first message was a Headers. Now we need to stream out the body
369        // chunks.
370        loop {
371            match operation.recv().await? {
372                RenderStaticIncomingMessage::BodyChunk { data } => {
373                    yield RenderItem::BodyChunk(data.into());
374                }
375                RenderStaticIncomingMessage::BodyEnd => break,
376                RenderStaticIncomingMessage::Error(error) => {
377                    // We have already started to send a result, so we can't change the
378                    // headers/body to a proxy error.
379                    operation.disallow_reuse();
380                    let trace =
381                        trace_stack(error, *intermediate_asset, *intermediate_output_path, *project_dir).await?;
382                        drop(guard);
383                    Err(anyhow!("error during streaming render: {}", trace))?;
384                    return;
385                }
386                v => {
387                    drop(guard);
388                    Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
389                    return;
390                },
391            }
392        }
393        drop(guard);
394    };
395
396    let mut sender = (sender.get)();
397    pin_mut!(stream);
398    while let Some(value) = stream.next().await {
399        if sender.send(value).await.is_err() {
400            return Ok(Default::default());
401        }
402        if sender.flush().await.is_err() {
403            return Ok(Default::default());
404        }
405    }
406
407    Ok(Default::default())
408}