turbopack_node/render/
render_proxy.rs

1use anyhow::{Result, anyhow, bail};
2use async_stream::try_stream as generator;
3use futures::{
4    SinkExt, StreamExt, TryStreamExt,
5    channel::mpsc::{UnboundedSender, unbounded},
6    pin_mut,
7};
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10use turbo_rcstr::RcStr;
11use turbo_tasks::{
12    RawVc, ResolvedVc, TaskInput, ValueToString, Vc, duration_span, mark_finished, prevent_gc,
13    trace::TraceRawVcs, util::SharedError,
14};
15use turbo_tasks_bytes::{Bytes, Stream};
16use turbo_tasks_env::ProcessEnv;
17use turbo_tasks_fs::FileSystemPath;
18use turbopack_core::{
19    chunk::{ChunkingContext, EvaluatableAsset, EvaluatableAssets},
20    error::PrettyPrintError,
21    issue::{IssueExt, StyledString},
22    module::Module,
23};
24use turbopack_dev_server::source::{Body, ProxyResult};
25
26use super::{
27    RenderData, RenderProxyIncomingMessage, RenderProxyOutgoingMessage, ResponseHeaders,
28    issue::RenderingIssue,
29};
30use crate::{
31    get_intermediate_asset, get_renderer_pool_operation, pool::NodeJsOperation,
32    render::error_page::error_html, source_map::trace_stack,
33};
34
35/// Renders a module as static HTML in a node.js process.
36#[turbo_tasks::function(operation)]
37pub async fn render_proxy_operation(
38    cwd: ResolvedVc<FileSystemPath>,
39    env: ResolvedVc<Box<dyn ProcessEnv>>,
40    path: ResolvedVc<FileSystemPath>,
41    module: ResolvedVc<Box<dyn EvaluatableAsset>>,
42    runtime_entries: ResolvedVc<EvaluatableAssets>,
43    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
44    intermediate_output_path: ResolvedVc<FileSystemPath>,
45    output_root: ResolvedVc<FileSystemPath>,
46    project_dir: ResolvedVc<FileSystemPath>,
47    data: ResolvedVc<RenderData>,
48    body: ResolvedVc<Body>,
49    debug: bool,
50) -> Result<Vc<ProxyResult>> {
51    let render = render_stream(RenderStreamOptions {
52        cwd,
53        env,
54        path,
55        module,
56        runtime_entries,
57        chunking_context,
58        intermediate_output_path,
59        output_root,
60        project_dir,
61        data,
62        body,
63        debug,
64    })
65    .await?;
66
67    let mut stream = render.read();
68    let first = match stream.try_next().await? {
69        Some(f) => f,
70        None => {
71            // If an Error was received first, then it would have been
72            // transformed into a proxy err error response.
73            bail!("did not receive response from render");
74        }
75    };
76
77    let RenderItem::Headers(data) = first else {
78        bail!("did not receive headers from render");
79    };
80
81    let body = Body::from_stream(stream.map(|item| match item {
82        Ok(RenderItem::BodyChunk(b)) => Ok(b),
83        Ok(v) => Err(SharedError::new(anyhow!(
84            "unexpected render item: {:#?}",
85            v
86        ))),
87        Err(e) => Err(e),
88    }));
89    let result = ProxyResult {
90        status: data.status,
91        headers: data.headers,
92        body,
93    };
94
95    Ok(result.cell())
96}
97
98async fn proxy_error(
99    path: ResolvedVc<FileSystemPath>,
100    error: anyhow::Error,
101    operation: Option<NodeJsOperation>,
102) -> Result<(u16, RcStr)> {
103    let message = format!("{}", PrettyPrintError(&error));
104
105    let status = match operation {
106        Some(operation) => Some(operation.wait_or_kill().await?),
107        None => None,
108    };
109
110    let mut details = vec![];
111    if let Some(status) = status {
112        details.push(format!("status: {status}"));
113    }
114
115    let status_code = 500;
116    let body = error_html(
117        status_code,
118        "An error occurred while proxying the request to Node.js".into(),
119        format!("{message}\n\n{}", details.join("\n")).into(),
120    )
121    .owned()
122    .await?;
123
124    RenderingIssue {
125        file_path: path,
126        message: StyledString::Text(message.into()).resolved_cell(),
127        status: status.and_then(|status| status.code()),
128    }
129    .resolved_cell()
130    .emit();
131
132    Ok((status_code, body))
133}
134
135#[derive(Clone, Debug)]
136#[turbo_tasks::value]
137enum RenderItem {
138    Headers(ResponseHeaders),
139    BodyChunk(Bytes),
140}
141
142type RenderItemResult = Result<RenderItem, SharedError>;
143
144#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
145struct RenderStreamSender {
146    #[turbo_tasks(trace_ignore, debug_ignore)]
147    get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
148}
149
150#[turbo_tasks::value(transparent)]
151struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);
152
153#[derive(Clone, Debug, TaskInput, PartialEq, Eq, Hash, Serialize, Deserialize, TraceRawVcs)]
154struct RenderStreamOptions {
155    cwd: ResolvedVc<FileSystemPath>,
156    env: ResolvedVc<Box<dyn ProcessEnv>>,
157    path: ResolvedVc<FileSystemPath>,
158    module: ResolvedVc<Box<dyn EvaluatableAsset>>,
159    runtime_entries: ResolvedVc<EvaluatableAssets>,
160    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
161    intermediate_output_path: ResolvedVc<FileSystemPath>,
162    output_root: ResolvedVc<FileSystemPath>,
163    project_dir: ResolvedVc<FileSystemPath>,
164    data: ResolvedVc<RenderData>,
165    body: ResolvedVc<Body>,
166    debug: bool,
167}
168
169#[turbo_tasks::function]
170fn render_stream(options: RenderStreamOptions) -> Vc<RenderStream> {
171    // TODO: The way we invoke render_stream_internal as side effect is not
172    // GC-safe, so we disable GC for this task.
173    prevent_gc();
174
175    // Note the following code uses some hacks to create a child task that produces
176    // a stream that is returned by this task.
177
178    // We create a new cell in this task, which will be updated from the
179    // [render_stream_internal] task.
180    let cell = turbo_tasks::macro_helpers::find_cell_by_type(*RENDERSTREAM_VALUE_TYPE_ID);
181
182    // We initialize the cell with a stream that is open, but has no values.
183    // The first [render_stream_internal] pipe call will pick up that stream.
184    let (sender, receiver) = unbounded();
185    cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
186    let initial = Mutex::new(Some(sender));
187
188    // run the evaluation as side effect
189    let _ = render_stream_internal(
190        options,
191        RenderStreamSender {
192            get: Box::new(move || {
193                if let Some(sender) = initial.lock().take() {
194                    sender
195                } else {
196                    // In cases when only [render_stream_internal] is (re)executed, we need to
197                    // update the old stream with a new value.
198                    let (sender, receiver) = unbounded();
199                    cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
200                    sender
201                }
202            }),
203        }
204        .cell(),
205    );
206
207    let raw: RawVc = cell.into();
208    raw.into()
209}
210
211#[turbo_tasks::function]
212async fn render_stream_internal(
213    options: RenderStreamOptions,
214    sender: Vc<RenderStreamSender>,
215) -> Result<Vc<()>> {
216    let RenderStreamOptions {
217        cwd,
218        env,
219        path,
220        module,
221        runtime_entries,
222        chunking_context,
223        intermediate_output_path,
224        output_root,
225        project_dir,
226        data,
227        body,
228        debug,
229    } = options;
230
231    mark_finished();
232    let Ok(sender) = sender.await else {
233        // Impossible to handle the error in a good way.
234        return Ok(Default::default());
235    };
236
237    let stream = generator! {
238        let intermediate_asset = get_intermediate_asset(
239            *chunking_context,
240            *module,
241            *runtime_entries,
242        ).to_resolved().await?;
243        let pool_op = get_renderer_pool_operation(
244            cwd,
245            env,
246            intermediate_asset,
247            intermediate_output_path,
248            output_root,
249            project_dir,
250            debug,
251        );
252
253        // Read this strongly consistent, since we don't want to run inconsistent
254        // node.js code.
255        let pool = pool_op.read_strongly_consistent().await?;
256        let data = data.await?;
257        let mut operation = pool.operation().await?;
258
259        // First, send the render data.
260        operation
261            .send(RenderProxyOutgoingMessage::Headers { data: &data })
262            .await?;
263        // Then, send the binary body in chunks.
264        let mut body = body.await?.read();
265        while let Some(data) = body.next().await {
266            operation
267                .send(RenderProxyOutgoingMessage::BodyChunk { data: &data.unwrap() })
268                .await?;
269        }
270        operation.send(RenderProxyOutgoingMessage::BodyEnd).await?;
271
272        let entry = module.ident().to_string().await?;
273        let guard = duration_span!("Node.js api execution", entry = display(entry));
274
275        match operation.recv().await? {
276            RenderProxyIncomingMessage::Headers { data } => yield RenderItem::Headers(data),
277            RenderProxyIncomingMessage::Error(error) => {
278                drop(guard);
279                // If we don't get headers, then something is very wrong. Instead, we send down a
280                // 500 proxy error as if it were the proper result.
281                let trace = trace_stack(
282                    error,
283                    *intermediate_asset,
284                    *intermediate_output_path,
285                    *project_dir
286                )
287                .await?;
288                let (status, body) =  proxy_error(path, anyhow!("error rendering: {}", trace), Some(operation)).await?;
289                yield RenderItem::Headers(ResponseHeaders {
290                    status,
291                    headers: vec![(
292                        "content-type".into(),
293                        "text/html; charset=utf-8".into(),
294                    )],
295                });
296                yield RenderItem::BodyChunk(body.into_owned().into_bytes().into());
297                return;
298            }
299            v => {
300                drop(guard);
301                Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
302                return;
303            },
304        };
305
306        loop {
307            match operation.recv().await? {
308                RenderProxyIncomingMessage::BodyChunk { data } => {
309                    yield RenderItem::BodyChunk(data.into());
310                }
311                RenderProxyIncomingMessage::BodyEnd => break,
312                RenderProxyIncomingMessage::Error(error) => {
313                    drop(guard);
314                    // We have already started to send a result, so we can't change the
315                    // headers/body to a proxy error.
316                    operation.disallow_reuse();
317                    let trace =
318                        trace_stack(error, *intermediate_asset, *intermediate_output_path, *project_dir).await?;
319                    Err(anyhow!("error during streaming render: {}", trace))?;
320                    return;
321                }
322                v => {
323                    drop(guard);
324                    Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
325                    return;
326                },
327            }
328        }
329        drop(guard);
330    };
331
332    let mut sender = (sender.get)();
333    pin_mut!(stream);
334    while let Some(value) = stream.next().await {
335        if sender.send(value).await.is_err() {
336            return Ok(Default::default());
337        }
338        if sender.flush().await.is_err() {
339            return Ok(Default::default());
340        }
341    }
342
343    Ok(Default::default())
344}