turbopack_node/render/
render_proxy.rs

1use anyhow::{Result, anyhow, bail};
2use async_stream::try_stream as generator;
3use futures::{
4    SinkExt, StreamExt, TryStreamExt,
5    channel::mpsc::{UnboundedSender, unbounded},
6    pin_mut,
7};
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10use turbo_rcstr::{RcStr, rcstr};
11use turbo_tasks::{
12    RawVc, ResolvedVc, TaskInput, ValueToString, Vc, VcValueType, duration_span, mark_finished,
13    prevent_gc, trace::TraceRawVcs, util::SharedError,
14};
15use turbo_tasks_bytes::{Bytes, Stream};
16use turbo_tasks_env::ProcessEnv;
17use turbo_tasks_fs::FileSystemPath;
18use turbopack_core::{
19    chunk::{ChunkingContext, EvaluatableAsset, EvaluatableAssets},
20    error::PrettyPrintError,
21    issue::{IssueExt, StyledString},
22    module::Module,
23};
24use turbopack_dev_server::source::{Body, ProxyResult};
25
26use super::{
27    RenderData, RenderProxyIncomingMessage, RenderProxyOutgoingMessage, ResponseHeaders,
28    issue::RenderingIssue,
29};
30use crate::{
31    get_intermediate_asset, get_renderer_pool_operation, pool::NodeJsOperation,
32    render::error_page::error_html, source_map::trace_stack,
33};
34
35/// Renders a module as static HTML in a node.js process.
36#[turbo_tasks::function(operation)]
37pub async fn render_proxy_operation(
38    cwd: FileSystemPath,
39    env: ResolvedVc<Box<dyn ProcessEnv>>,
40    path: FileSystemPath,
41    module: ResolvedVc<Box<dyn EvaluatableAsset>>,
42    runtime_entries: ResolvedVc<EvaluatableAssets>,
43    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
44    intermediate_output_path: FileSystemPath,
45    output_root: FileSystemPath,
46    project_dir: FileSystemPath,
47    data: ResolvedVc<RenderData>,
48    body: ResolvedVc<Body>,
49    debug: bool,
50) -> Result<Vc<ProxyResult>> {
51    let render = render_stream(RenderStreamOptions {
52        cwd,
53        env,
54        path,
55        module,
56        runtime_entries,
57        chunking_context,
58        intermediate_output_path,
59        output_root,
60        project_dir,
61        data,
62        body,
63        debug,
64    })
65    .await?;
66
67    let mut stream = render.read();
68    let first = match stream.try_next().await? {
69        Some(f) => f,
70        None => {
71            // If an Error was received first, then it would have been
72            // transformed into a proxy err error response.
73            bail!("did not receive response from render");
74        }
75    };
76
77    let RenderItem::Headers(data) = first else {
78        bail!("did not receive headers from render");
79    };
80
81    let body = Body::from_stream(stream.map(|item| match item {
82        Ok(RenderItem::BodyChunk(b)) => Ok(b),
83        Ok(v) => Err(SharedError::new(anyhow!(
84            "unexpected render item: {:#?}",
85            v
86        ))),
87        Err(e) => Err(e),
88    }));
89    let result = ProxyResult {
90        status: data.status,
91        headers: data.headers,
92        body,
93    };
94
95    Ok(result.cell())
96}
97
98async fn proxy_error(
99    path: FileSystemPath,
100    error: anyhow::Error,
101    operation: Option<NodeJsOperation>,
102) -> Result<(u16, RcStr)> {
103    let message = format!("{}", PrettyPrintError(&error));
104
105    let status = match operation {
106        Some(operation) => Some(operation.wait_or_kill().await?),
107        None => None,
108    };
109
110    let mut details = vec![];
111    if let Some(status) = status {
112        details.push(format!("status: {status}"));
113    }
114
115    let status_code = 500;
116    let body = error_html(
117        status_code,
118        rcstr!("An error occurred while proxying the request to Node.js"),
119        format!("{message}\n\n{}", details.join("\n")).into(),
120    )
121    .owned()
122    .await?;
123
124    RenderingIssue {
125        file_path: path,
126        message: StyledString::Text(message.into()).resolved_cell(),
127        status: status.and_then(|status| status.code()),
128    }
129    .resolved_cell()
130    .emit();
131
132    Ok((status_code, body))
133}
134
135#[derive(Clone, Debug)]
136#[turbo_tasks::value]
137enum RenderItem {
138    Headers(ResponseHeaders),
139    BodyChunk(Bytes),
140}
141
142type RenderItemResult = Result<RenderItem, SharedError>;
143
144#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
145struct RenderStreamSender {
146    #[turbo_tasks(trace_ignore, debug_ignore)]
147    get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
148}
149
150#[turbo_tasks::value(transparent)]
151struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);
152
153#[derive(Clone, Debug, TaskInput, PartialEq, Eq, Hash, Serialize, Deserialize, TraceRawVcs)]
154struct RenderStreamOptions {
155    cwd: FileSystemPath,
156    env: ResolvedVc<Box<dyn ProcessEnv>>,
157    path: FileSystemPath,
158    module: ResolvedVc<Box<dyn EvaluatableAsset>>,
159    runtime_entries: ResolvedVc<EvaluatableAssets>,
160    chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
161    intermediate_output_path: FileSystemPath,
162    output_root: FileSystemPath,
163    project_dir: FileSystemPath,
164    data: ResolvedVc<RenderData>,
165    body: ResolvedVc<Body>,
166    debug: bool,
167}
168
169#[turbo_tasks::function]
170fn render_stream(options: RenderStreamOptions) -> Vc<RenderStream> {
171    // TODO: The way we invoke render_stream_internal as side effect is not
172    // GC-safe, so we disable GC for this task.
173    prevent_gc();
174
175    // Note the following code uses some hacks to create a child task that produces
176    // a stream that is returned by this task.
177
178    // We create a new cell in this task, which will be updated from the
179    // [render_stream_internal] task.
180    let cell = turbo_tasks::macro_helpers::find_cell_by_type(
181        <RenderStream as VcValueType>::get_value_type_id(),
182    );
183
184    // We initialize the cell with a stream that is open, but has no values.
185    // The first [render_stream_internal] pipe call will pick up that stream.
186    let (sender, receiver) = unbounded();
187    cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
188    let initial = Mutex::new(Some(sender));
189
190    // run the evaluation as side effect
191    let _ = render_stream_internal(
192        options,
193        RenderStreamSender {
194            get: Box::new(move || {
195                if let Some(sender) = initial.lock().take() {
196                    sender
197                } else {
198                    // In cases when only [render_stream_internal] is (re)executed, we need to
199                    // update the old stream with a new value.
200                    let (sender, receiver) = unbounded();
201                    cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
202                    sender
203                }
204            }),
205        }
206        .cell(),
207    );
208
209    let raw: RawVc = cell.into();
210    raw.into()
211}
212
213#[turbo_tasks::function]
214async fn render_stream_internal(
215    options: RenderStreamOptions,
216    sender: Vc<RenderStreamSender>,
217) -> Result<Vc<()>> {
218    let RenderStreamOptions {
219        cwd,
220        env,
221        path,
222        module,
223        runtime_entries,
224        chunking_context,
225        intermediate_output_path,
226        output_root,
227        project_dir,
228        data,
229        body,
230        debug,
231    } = options;
232
233    mark_finished();
234    let Ok(sender) = sender.await else {
235        // Impossible to handle the error in a good way.
236        return Ok(Default::default());
237    };
238
239    let stream = generator! {
240        let intermediate_asset = get_intermediate_asset(
241            *chunking_context,
242            *module,
243            *runtime_entries,
244        ).to_resolved().await?;
245        let pool_op = get_renderer_pool_operation(
246            cwd,
247            env,
248            intermediate_asset,
249            intermediate_output_path.clone(),
250            output_root,
251            project_dir.clone(),
252            debug,
253        );
254
255        // Read this strongly consistent, since we don't want to run inconsistent
256        // node.js code.
257        let pool = pool_op.read_strongly_consistent().await?;
258        let data = data.await?;
259        let mut operation = pool.operation().await?;
260
261        // First, send the render data.
262        operation
263            .send(RenderProxyOutgoingMessage::Headers { data: &data })
264            .await?;
265        // Then, send the binary body in chunks.
266        let mut body = body.await?.read();
267        while let Some(data) = body.next().await {
268            operation
269                .send(RenderProxyOutgoingMessage::BodyChunk { data: &data.unwrap() })
270                .await?;
271        }
272        operation.send(RenderProxyOutgoingMessage::BodyEnd).await?;
273
274        let entry = module.ident().to_string().await?;
275        let guard = duration_span!("Node.js api execution", entry = display(entry));
276
277        match operation.recv().await? {
278            RenderProxyIncomingMessage::Headers { data } => yield RenderItem::Headers(data),
279            RenderProxyIncomingMessage::Error(error) => {
280                drop(guard);
281                // If we don't get headers, then something is very wrong. Instead, we send down a
282                // 500 proxy error as if it were the proper result.
283                let trace = trace_stack(
284                    error,
285                    *intermediate_asset,
286                    intermediate_output_path.clone(),
287                    project_dir.clone()
288                )
289                .await?;
290                let (status, body) =  proxy_error(path, anyhow!("error rendering: {}", trace), Some(operation)).await?;
291                yield RenderItem::Headers(ResponseHeaders {
292                    status,
293                    headers: vec![(
294                        rcstr!("content-type"),
295                        rcstr!("text/html; charset=utf-8"),
296                    )],
297                });
298                yield RenderItem::BodyChunk(body.into_owned().into_bytes().into());
299                return;
300            }
301            v => {
302                drop(guard);
303                Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
304                return;
305            },
306        };
307
308        loop {
309            match operation.recv().await? {
310                RenderProxyIncomingMessage::BodyChunk { data } => {
311                    yield RenderItem::BodyChunk(data.into());
312                }
313                RenderProxyIncomingMessage::BodyEnd => break,
314                RenderProxyIncomingMessage::Error(error) => {
315                    drop(guard);
316                    // We have already started to send a result, so we can't change the
317                    // headers/body to a proxy error.
318                    operation.disallow_reuse();
319                    let trace =
320                        trace_stack(error, *intermediate_asset, intermediate_output_path.clone(), project_dir.clone()).await?;
321                    Err(anyhow!("error during streaming render: {}", trace))?;
322                    return;
323                }
324                v => {
325                    drop(guard);
326                    Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
327                    return;
328                },
329            }
330        }
331        drop(guard);
332    };
333
334    let mut sender = (sender.get)();
335    pin_mut!(stream);
336    while let Some(value) = stream.next().await {
337        if sender.send(value).await.is_err() {
338            return Ok(Default::default());
339        }
340        if sender.flush().await.is_err() {
341            return Ok(Default::default());
342        }
343    }
344
345    Ok(Default::default())
346}