1use anyhow::{Result, anyhow, bail};
2use async_stream::try_stream as generator;
3use futures::{
4 SinkExt, StreamExt, TryStreamExt,
5 channel::mpsc::{UnboundedSender, unbounded},
6 pin_mut,
7};
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10use turbo_rcstr::RcStr;
11use turbo_tasks::{
12 RawVc, ResolvedVc, TaskInput, ValueToString, Vc, duration_span, mark_finished, prevent_gc,
13 trace::TraceRawVcs, util::SharedError,
14};
15use turbo_tasks_bytes::{Bytes, Stream};
16use turbo_tasks_env::ProcessEnv;
17use turbo_tasks_fs::FileSystemPath;
18use turbopack_core::{
19 chunk::{ChunkingContext, EvaluatableAsset, EvaluatableAssets},
20 error::PrettyPrintError,
21 issue::{IssueExt, StyledString},
22 module::Module,
23};
24use turbopack_dev_server::source::{Body, ProxyResult};
25
26use super::{
27 RenderData, RenderProxyIncomingMessage, RenderProxyOutgoingMessage, ResponseHeaders,
28 issue::RenderingIssue,
29};
30use crate::{
31 get_intermediate_asset, get_renderer_pool_operation, pool::NodeJsOperation,
32 render::error_page::error_html, source_map::trace_stack,
33};
34
35#[turbo_tasks::function(operation)]
37pub async fn render_proxy_operation(
38 cwd: ResolvedVc<FileSystemPath>,
39 env: ResolvedVc<Box<dyn ProcessEnv>>,
40 path: ResolvedVc<FileSystemPath>,
41 module: ResolvedVc<Box<dyn EvaluatableAsset>>,
42 runtime_entries: ResolvedVc<EvaluatableAssets>,
43 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
44 intermediate_output_path: ResolvedVc<FileSystemPath>,
45 output_root: ResolvedVc<FileSystemPath>,
46 project_dir: ResolvedVc<FileSystemPath>,
47 data: ResolvedVc<RenderData>,
48 body: ResolvedVc<Body>,
49 debug: bool,
50) -> Result<Vc<ProxyResult>> {
51 let render = render_stream(RenderStreamOptions {
52 cwd,
53 env,
54 path,
55 module,
56 runtime_entries,
57 chunking_context,
58 intermediate_output_path,
59 output_root,
60 project_dir,
61 data,
62 body,
63 debug,
64 })
65 .await?;
66
67 let mut stream = render.read();
68 let first = match stream.try_next().await? {
69 Some(f) => f,
70 None => {
71 bail!("did not receive response from render");
74 }
75 };
76
77 let RenderItem::Headers(data) = first else {
78 bail!("did not receive headers from render");
79 };
80
81 let body = Body::from_stream(stream.map(|item| match item {
82 Ok(RenderItem::BodyChunk(b)) => Ok(b),
83 Ok(v) => Err(SharedError::new(anyhow!(
84 "unexpected render item: {:#?}",
85 v
86 ))),
87 Err(e) => Err(e),
88 }));
89 let result = ProxyResult {
90 status: data.status,
91 headers: data.headers,
92 body,
93 };
94
95 Ok(result.cell())
96}
97
98async fn proxy_error(
99 path: ResolvedVc<FileSystemPath>,
100 error: anyhow::Error,
101 operation: Option<NodeJsOperation>,
102) -> Result<(u16, RcStr)> {
103 let message = format!("{}", PrettyPrintError(&error));
104
105 let status = match operation {
106 Some(operation) => Some(operation.wait_or_kill().await?),
107 None => None,
108 };
109
110 let mut details = vec![];
111 if let Some(status) = status {
112 details.push(format!("status: {status}"));
113 }
114
115 let status_code = 500;
116 let body = error_html(
117 status_code,
118 "An error occurred while proxying the request to Node.js".into(),
119 format!("{message}\n\n{}", details.join("\n")).into(),
120 )
121 .owned()
122 .await?;
123
124 RenderingIssue {
125 file_path: path,
126 message: StyledString::Text(message.into()).resolved_cell(),
127 status: status.and_then(|status| status.code()),
128 }
129 .resolved_cell()
130 .emit();
131
132 Ok((status_code, body))
133}
134
135#[derive(Clone, Debug)]
136#[turbo_tasks::value]
137enum RenderItem {
138 Headers(ResponseHeaders),
139 BodyChunk(Bytes),
140}
141
142type RenderItemResult = Result<RenderItem, SharedError>;
143
144#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
145struct RenderStreamSender {
146 #[turbo_tasks(trace_ignore, debug_ignore)]
147 get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
148}
149
150#[turbo_tasks::value(transparent)]
151struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);
152
153#[derive(Clone, Debug, TaskInput, PartialEq, Eq, Hash, Serialize, Deserialize, TraceRawVcs)]
154struct RenderStreamOptions {
155 cwd: ResolvedVc<FileSystemPath>,
156 env: ResolvedVc<Box<dyn ProcessEnv>>,
157 path: ResolvedVc<FileSystemPath>,
158 module: ResolvedVc<Box<dyn EvaluatableAsset>>,
159 runtime_entries: ResolvedVc<EvaluatableAssets>,
160 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
161 intermediate_output_path: ResolvedVc<FileSystemPath>,
162 output_root: ResolvedVc<FileSystemPath>,
163 project_dir: ResolvedVc<FileSystemPath>,
164 data: ResolvedVc<RenderData>,
165 body: ResolvedVc<Body>,
166 debug: bool,
167}
168
169#[turbo_tasks::function]
170fn render_stream(options: RenderStreamOptions) -> Vc<RenderStream> {
171 prevent_gc();
174
175 let cell = turbo_tasks::macro_helpers::find_cell_by_type(*RENDERSTREAM_VALUE_TYPE_ID);
181
182 let (sender, receiver) = unbounded();
185 cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
186 let initial = Mutex::new(Some(sender));
187
188 let _ = render_stream_internal(
190 options,
191 RenderStreamSender {
192 get: Box::new(move || {
193 if let Some(sender) = initial.lock().take() {
194 sender
195 } else {
196 let (sender, receiver) = unbounded();
199 cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
200 sender
201 }
202 }),
203 }
204 .cell(),
205 );
206
207 let raw: RawVc = cell.into();
208 raw.into()
209}
210
211#[turbo_tasks::function]
212async fn render_stream_internal(
213 options: RenderStreamOptions,
214 sender: Vc<RenderStreamSender>,
215) -> Result<Vc<()>> {
216 let RenderStreamOptions {
217 cwd,
218 env,
219 path,
220 module,
221 runtime_entries,
222 chunking_context,
223 intermediate_output_path,
224 output_root,
225 project_dir,
226 data,
227 body,
228 debug,
229 } = options;
230
231 mark_finished();
232 let Ok(sender) = sender.await else {
233 return Ok(Default::default());
235 };
236
237 let stream = generator! {
238 let intermediate_asset = get_intermediate_asset(
239 *chunking_context,
240 *module,
241 *runtime_entries,
242 ).to_resolved().await?;
243 let pool_op = get_renderer_pool_operation(
244 cwd,
245 env,
246 intermediate_asset,
247 intermediate_output_path,
248 output_root,
249 project_dir,
250 debug,
251 );
252
253 let pool = pool_op.read_strongly_consistent().await?;
256 let data = data.await?;
257 let mut operation = pool.operation().await?;
258
259 operation
261 .send(RenderProxyOutgoingMessage::Headers { data: &data })
262 .await?;
263 let mut body = body.await?.read();
265 while let Some(data) = body.next().await {
266 operation
267 .send(RenderProxyOutgoingMessage::BodyChunk { data: &data.unwrap() })
268 .await?;
269 }
270 operation.send(RenderProxyOutgoingMessage::BodyEnd).await?;
271
272 let entry = module.ident().to_string().await?;
273 let guard = duration_span!("Node.js api execution", entry = display(entry));
274
275 match operation.recv().await? {
276 RenderProxyIncomingMessage::Headers { data } => yield RenderItem::Headers(data),
277 RenderProxyIncomingMessage::Error(error) => {
278 drop(guard);
279 let trace = trace_stack(
282 error,
283 *intermediate_asset,
284 *intermediate_output_path,
285 *project_dir
286 )
287 .await?;
288 let (status, body) = proxy_error(path, anyhow!("error rendering: {}", trace), Some(operation)).await?;
289 yield RenderItem::Headers(ResponseHeaders {
290 status,
291 headers: vec![(
292 "content-type".into(),
293 "text/html; charset=utf-8".into(),
294 )],
295 });
296 yield RenderItem::BodyChunk(body.into_owned().into_bytes().into());
297 return;
298 }
299 v => {
300 drop(guard);
301 Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
302 return;
303 },
304 };
305
306 loop {
307 match operation.recv().await? {
308 RenderProxyIncomingMessage::BodyChunk { data } => {
309 yield RenderItem::BodyChunk(data.into());
310 }
311 RenderProxyIncomingMessage::BodyEnd => break,
312 RenderProxyIncomingMessage::Error(error) => {
313 drop(guard);
314 operation.disallow_reuse();
317 let trace =
318 trace_stack(error, *intermediate_asset, *intermediate_output_path, *project_dir).await?;
319 Err(anyhow!("error during streaming render: {}", trace))?;
320 return;
321 }
322 v => {
323 drop(guard);
324 Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
325 return;
326 },
327 }
328 }
329 drop(guard);
330 };
331
332 let mut sender = (sender.get)();
333 pin_mut!(stream);
334 while let Some(value) = stream.next().await {
335 if sender.send(value).await.is_err() {
336 return Ok(Default::default());
337 }
338 if sender.flush().await.is_err() {
339 return Ok(Default::default());
340 }
341 }
342
343 Ok(Default::default())
344}