1use anyhow::{Result, anyhow, bail};
2use async_stream::try_stream as generator;
3use futures::{
4 SinkExt, StreamExt, TryStreamExt,
5 channel::mpsc::{UnboundedSender, unbounded},
6 pin_mut,
7};
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10use turbo_rcstr::{RcStr, rcstr};
11use turbo_tasks::{
12 RawVc, ResolvedVc, TaskInput, ValueToString, Vc, VcValueType, duration_span, mark_finished,
13 prevent_gc, trace::TraceRawVcs, util::SharedError,
14};
15use turbo_tasks_bytes::{Bytes, Stream};
16use turbo_tasks_env::ProcessEnv;
17use turbo_tasks_fs::FileSystemPath;
18use turbopack_core::{
19 chunk::{ChunkingContext, EvaluatableAsset, EvaluatableAssets},
20 error::PrettyPrintError,
21 issue::{IssueExt, StyledString},
22 module::Module,
23};
24use turbopack_dev_server::source::{Body, ProxyResult};
25
26use super::{
27 RenderData, RenderProxyIncomingMessage, RenderProxyOutgoingMessage, ResponseHeaders,
28 issue::RenderingIssue,
29};
30use crate::{
31 get_intermediate_asset, get_renderer_pool_operation, pool::NodeJsOperation,
32 render::error_page::error_html, source_map::trace_stack,
33};
34
35#[turbo_tasks::function(operation)]
37pub async fn render_proxy_operation(
38 cwd: FileSystemPath,
39 env: ResolvedVc<Box<dyn ProcessEnv>>,
40 path: FileSystemPath,
41 module: ResolvedVc<Box<dyn EvaluatableAsset>>,
42 runtime_entries: ResolvedVc<EvaluatableAssets>,
43 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
44 intermediate_output_path: FileSystemPath,
45 output_root: FileSystemPath,
46 project_dir: FileSystemPath,
47 data: ResolvedVc<RenderData>,
48 body: ResolvedVc<Body>,
49 debug: bool,
50) -> Result<Vc<ProxyResult>> {
51 let render = render_stream(RenderStreamOptions {
52 cwd,
53 env,
54 path,
55 module,
56 runtime_entries,
57 chunking_context,
58 intermediate_output_path,
59 output_root,
60 project_dir,
61 data,
62 body,
63 debug,
64 })
65 .await?;
66
67 let mut stream = render.read();
68 let first = match stream.try_next().await? {
69 Some(f) => f,
70 None => {
71 bail!("did not receive response from render");
74 }
75 };
76
77 let RenderItem::Headers(data) = first else {
78 bail!("did not receive headers from render");
79 };
80
81 let body = Body::from_stream(stream.map(|item| match item {
82 Ok(RenderItem::BodyChunk(b)) => Ok(b),
83 Ok(v) => Err(SharedError::new(anyhow!(
84 "unexpected render item: {:#?}",
85 v
86 ))),
87 Err(e) => Err(e),
88 }));
89 let result = ProxyResult {
90 status: data.status,
91 headers: data.headers,
92 body,
93 };
94
95 Ok(result.cell())
96}
97
98async fn proxy_error(
99 path: FileSystemPath,
100 error: anyhow::Error,
101 operation: Option<NodeJsOperation>,
102) -> Result<(u16, RcStr)> {
103 let message = format!("{}", PrettyPrintError(&error));
104
105 let status = match operation {
106 Some(operation) => Some(operation.wait_or_kill().await?),
107 None => None,
108 };
109
110 let mut details = vec![];
111 if let Some(status) = status {
112 details.push(format!("status: {status}"));
113 }
114
115 let status_code = 500;
116 let body = error_html(
117 status_code,
118 rcstr!("An error occurred while proxying the request to Node.js"),
119 format!("{message}\n\n{}", details.join("\n")).into(),
120 )
121 .owned()
122 .await?;
123
124 RenderingIssue {
125 file_path: path,
126 message: StyledString::Text(message.into()).resolved_cell(),
127 status: status.and_then(|status| status.code()),
128 }
129 .resolved_cell()
130 .emit();
131
132 Ok((status_code, body))
133}
134
135#[derive(Clone, Debug)]
136#[turbo_tasks::value]
137enum RenderItem {
138 Headers(ResponseHeaders),
139 BodyChunk(Bytes),
140}
141
142type RenderItemResult = Result<RenderItem, SharedError>;
143
144#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
145struct RenderStreamSender {
146 #[turbo_tasks(trace_ignore, debug_ignore)]
147 get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
148}
149
150#[turbo_tasks::value(transparent)]
151struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);
152
153#[derive(Clone, Debug, TaskInput, PartialEq, Eq, Hash, Serialize, Deserialize, TraceRawVcs)]
154struct RenderStreamOptions {
155 cwd: FileSystemPath,
156 env: ResolvedVc<Box<dyn ProcessEnv>>,
157 path: FileSystemPath,
158 module: ResolvedVc<Box<dyn EvaluatableAsset>>,
159 runtime_entries: ResolvedVc<EvaluatableAssets>,
160 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
161 intermediate_output_path: FileSystemPath,
162 output_root: FileSystemPath,
163 project_dir: FileSystemPath,
164 data: ResolvedVc<RenderData>,
165 body: ResolvedVc<Body>,
166 debug: bool,
167}
168
169#[turbo_tasks::function]
170fn render_stream(options: RenderStreamOptions) -> Vc<RenderStream> {
171 prevent_gc();
174
175 let cell = turbo_tasks::macro_helpers::find_cell_by_type(
181 <RenderStream as VcValueType>::get_value_type_id(),
182 );
183
184 let (sender, receiver) = unbounded();
187 cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
188 let initial = Mutex::new(Some(sender));
189
190 let _ = render_stream_internal(
192 options,
193 RenderStreamSender {
194 get: Box::new(move || {
195 if let Some(sender) = initial.lock().take() {
196 sender
197 } else {
198 let (sender, receiver) = unbounded();
201 cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
202 sender
203 }
204 }),
205 }
206 .cell(),
207 );
208
209 let raw: RawVc = cell.into();
210 raw.into()
211}
212
213#[turbo_tasks::function]
214async fn render_stream_internal(
215 options: RenderStreamOptions,
216 sender: Vc<RenderStreamSender>,
217) -> Result<Vc<()>> {
218 let RenderStreamOptions {
219 cwd,
220 env,
221 path,
222 module,
223 runtime_entries,
224 chunking_context,
225 intermediate_output_path,
226 output_root,
227 project_dir,
228 data,
229 body,
230 debug,
231 } = options;
232
233 mark_finished();
234 let Ok(sender) = sender.await else {
235 return Ok(Default::default());
237 };
238
239 let stream = generator! {
240 let intermediate_asset = get_intermediate_asset(
241 *chunking_context,
242 *module,
243 *runtime_entries,
244 ).to_resolved().await?;
245 let pool_op = get_renderer_pool_operation(
246 cwd,
247 env,
248 intermediate_asset,
249 intermediate_output_path.clone(),
250 output_root,
251 project_dir.clone(),
252 debug,
253 );
254
255 let pool = pool_op.read_strongly_consistent().await?;
258 let data = data.await?;
259 let mut operation = pool.operation().await?;
260
261 operation
263 .send(RenderProxyOutgoingMessage::Headers { data: &data })
264 .await?;
265 let mut body = body.await?.read();
267 while let Some(data) = body.next().await {
268 operation
269 .send(RenderProxyOutgoingMessage::BodyChunk { data: &data.unwrap() })
270 .await?;
271 }
272 operation.send(RenderProxyOutgoingMessage::BodyEnd).await?;
273
274 let entry = module.ident().to_string().await?;
275 let guard = duration_span!("Node.js api execution", entry = display(entry));
276
277 match operation.recv().await? {
278 RenderProxyIncomingMessage::Headers { data } => yield RenderItem::Headers(data),
279 RenderProxyIncomingMessage::Error(error) => {
280 drop(guard);
281 let trace = trace_stack(
284 error,
285 *intermediate_asset,
286 intermediate_output_path.clone(),
287 project_dir.clone()
288 )
289 .await?;
290 let (status, body) = proxy_error(path, anyhow!("error rendering: {}", trace), Some(operation)).await?;
291 yield RenderItem::Headers(ResponseHeaders {
292 status,
293 headers: vec![(
294 rcstr!("content-type"),
295 rcstr!("text/html; charset=utf-8"),
296 )],
297 });
298 yield RenderItem::BodyChunk(body.into_owned().into_bytes().into());
299 return;
300 }
301 v => {
302 drop(guard);
303 Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
304 return;
305 },
306 };
307
308 loop {
309 match operation.recv().await? {
310 RenderProxyIncomingMessage::BodyChunk { data } => {
311 yield RenderItem::BodyChunk(data.into());
312 }
313 RenderProxyIncomingMessage::BodyEnd => break,
314 RenderProxyIncomingMessage::Error(error) => {
315 drop(guard);
316 operation.disallow_reuse();
319 let trace =
320 trace_stack(error, *intermediate_asset, intermediate_output_path.clone(), project_dir.clone()).await?;
321 Err(anyhow!("error during streaming render: {}", trace))?;
322 return;
323 }
324 v => {
325 drop(guard);
326 Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
327 return;
328 },
329 }
330 }
331 drop(guard);
332 };
333
334 let mut sender = (sender.get)();
335 pin_mut!(stream);
336 while let Some(value) = stream.next().await {
337 if sender.send(value).await.is_err() {
338 return Ok(Default::default());
339 }
340 if sender.flush().await.is_err() {
341 return Ok(Default::default());
342 }
343 }
344
345 Ok(Default::default())
346}