1use anyhow::{Context, Result, anyhow, bail};
2use async_stream::try_stream as generator;
3use futures::{
4 SinkExt, StreamExt, TryStreamExt,
5 channel::mpsc::{UnboundedSender, unbounded},
6 pin_mut,
7};
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10use turbo_tasks::{
11 RawVc, ResolvedVc, TaskInput, ValueToString, Vc, duration_span, mark_finished, prevent_gc,
12 trace::TraceRawVcs, util::SharedError,
13};
14use turbo_tasks_bytes::{Bytes, Stream};
15use turbo_tasks_env::ProcessEnv;
16use turbo_tasks_fs::{File, FileSystemPath};
17use turbopack_core::{
18 asset::{Asset, AssetContent},
19 chunk::{ChunkingContext, EvaluatableAsset, EvaluatableAssets},
20 error::PrettyPrintError,
21 issue::{IssueExt, StyledString},
22 module::Module,
23};
24use turbopack_dev_server::{
25 html::DevHtmlAsset,
26 source::{Body, HeaderList, Rewrite, RewriteBuilder},
27};
28
29use super::{
30 RenderData, RenderStaticIncomingMessage, RenderStaticOutgoingMessage, issue::RenderingIssue,
31};
32use crate::{
33 ResponseHeaders, get_intermediate_asset, get_renderer_pool_operation, pool::NodeJsOperation,
34 render::error_page::error_html_body, source_map::trace_stack,
35};
36
37#[derive(Clone, Debug)]
38#[turbo_tasks::value]
39pub enum StaticResult {
40 Content {
41 content: ResolvedVc<AssetContent>,
42 status_code: u16,
43 headers: ResolvedVc<HeaderList>,
44 },
45 StreamedContent {
46 status: u16,
47 headers: ResolvedVc<HeaderList>,
48 body: Body,
49 },
50 Rewrite(ResolvedVc<Rewrite>),
51}
52
53#[turbo_tasks::value_impl]
54impl StaticResult {
55 #[turbo_tasks::function]
56 pub fn content(
57 content: ResolvedVc<AssetContent>,
58 status_code: u16,
59 headers: ResolvedVc<HeaderList>,
60 ) -> Vc<Self> {
61 StaticResult::Content {
62 content,
63 status_code,
64 headers,
65 }
66 .cell()
67 }
68
69 #[turbo_tasks::function]
70 pub fn rewrite(rewrite: ResolvedVc<Rewrite>) -> Vc<Self> {
71 StaticResult::Rewrite(rewrite).cell()
72 }
73}
74
75#[turbo_tasks::function(operation)]
77pub async fn render_static_operation(
78 cwd: ResolvedVc<FileSystemPath>,
79 env: ResolvedVc<Box<dyn ProcessEnv>>,
80 path: ResolvedVc<FileSystemPath>,
81 module: ResolvedVc<Box<dyn EvaluatableAsset>>,
82 runtime_entries: ResolvedVc<EvaluatableAssets>,
83 fallback_page: ResolvedVc<DevHtmlAsset>,
84 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
85 intermediate_output_path: ResolvedVc<FileSystemPath>,
86 output_root: ResolvedVc<FileSystemPath>,
87 project_dir: ResolvedVc<FileSystemPath>,
88 data: ResolvedVc<RenderData>,
89 debug: bool,
90) -> Result<Vc<StaticResult>> {
91 let render = render_stream(RenderStreamOptions {
92 cwd,
93 env,
94 path,
95 module,
96 runtime_entries,
97 fallback_page,
98 chunking_context,
99 intermediate_output_path,
100 output_root,
101 project_dir,
102 data,
103 debug,
104 })
105 .await?;
106
107 let mut stream = render.read();
108 let first = match stream.try_next().await? {
109 Some(f) => f,
110 None => {
111 bail!("did not receive response from render");
114 }
115 };
116
117 Ok(match first {
118 RenderItem::Response(response) => *response,
119 RenderItem::Headers(data) => {
120 let body = stream.map(|item| match item {
121 Ok(RenderItem::BodyChunk(b)) => Ok(b),
122 Ok(v) => Err(SharedError::new(anyhow!(
123 "unexpected render item: {:#?}",
124 v
125 ))),
126 Err(e) => Err(e),
127 });
128 StaticResult::StreamedContent {
129 status: data.status,
130 headers: ResolvedVc::cell(data.headers),
131 body: Body::from_stream(body),
132 }
133 .cell()
134 }
135 v => bail!("unexpected render item: {:#?}", v),
136 })
137}
138
139async fn static_error(
140 path: ResolvedVc<FileSystemPath>,
141 error: anyhow::Error,
142 operation: Option<NodeJsOperation>,
143 fallback_page: Vc<DevHtmlAsset>,
144) -> Result<Vc<AssetContent>> {
145 let status = match operation {
146 Some(operation) => Some(operation.wait_or_kill().await?),
147 None => None,
148 };
149
150 let error = format!("{}", PrettyPrintError(&error));
151 let mut message = error
152 .replace('&', "&")
154 .replace('>', ">")
155 .replace('<', "<");
156
157 if let Some(status) = status {
158 message.push_str(&format!("\n\nStatus: {status}"));
159 }
160
161 let mut body = "<script id=\"__NEXT_DATA__\" type=\"application/json\">{ \"props\": {} \
162 }</script>"
163 .to_string();
164
165 body.push_str(
166 error_html_body(500, "Error rendering page".into(), message.into())
167 .await?
168 .as_str(),
169 );
170
171 let issue = RenderingIssue {
172 file_path: path,
173 message: StyledString::Text(error.into()).resolved_cell(),
174 status: status.and_then(|status| status.code()),
175 };
176
177 issue.resolved_cell().emit();
178
179 let html = fallback_page.with_body(body.into());
180
181 Ok(html.content())
182}
183
184#[derive(Clone, Debug)]
185#[turbo_tasks::value]
186enum RenderItem {
187 Response(ResolvedVc<StaticResult>),
188 Headers(ResponseHeaders),
189 BodyChunk(Bytes),
190}
191
192type RenderItemResult = Result<RenderItem, SharedError>;
193
194#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
195struct RenderStreamSender {
196 #[turbo_tasks(trace_ignore, debug_ignore)]
197 get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
198}
199
200#[turbo_tasks::value(transparent)]
201struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);
202
203#[derive(Clone, Debug, TaskInput, PartialEq, Eq, Hash, Deserialize, Serialize, TraceRawVcs)]
204struct RenderStreamOptions {
205 cwd: ResolvedVc<FileSystemPath>,
206 env: ResolvedVc<Box<dyn ProcessEnv>>,
207 path: ResolvedVc<FileSystemPath>,
208 module: ResolvedVc<Box<dyn EvaluatableAsset>>,
209 runtime_entries: ResolvedVc<EvaluatableAssets>,
210 fallback_page: ResolvedVc<DevHtmlAsset>,
211 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
212 intermediate_output_path: ResolvedVc<FileSystemPath>,
213 output_root: ResolvedVc<FileSystemPath>,
214 project_dir: ResolvedVc<FileSystemPath>,
215 data: ResolvedVc<RenderData>,
216 debug: bool,
217}
218
219#[turbo_tasks::function]
220fn render_stream(options: RenderStreamOptions) -> Vc<RenderStream> {
221 prevent_gc();
224
225 let cell = turbo_tasks::macro_helpers::find_cell_by_type(*RENDERSTREAM_VALUE_TYPE_ID);
231
232 let (sender, receiver) = unbounded();
235 cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
236 let initial = Mutex::new(Some(sender));
237
238 let _ = render_stream_internal(
240 options,
241 RenderStreamSender {
242 get: Box::new(move || {
243 if let Some(sender) = initial.lock().take() {
244 sender
245 } else {
246 let (sender, receiver) = unbounded();
249 cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
250 sender
251 }
252 }),
253 }
254 .cell(),
255 );
256
257 let raw: RawVc = cell.into();
258 raw.into()
259}
260
261#[turbo_tasks::function]
262async fn render_stream_internal(
263 options: RenderStreamOptions,
264 sender: Vc<RenderStreamSender>,
265) -> Result<Vc<()>> {
266 let RenderStreamOptions {
267 cwd,
268 env,
269 path,
270 module,
271 runtime_entries,
272 fallback_page,
273 chunking_context,
274 intermediate_output_path,
275 output_root,
276 project_dir,
277 data,
278 debug,
279 } = options;
280
281 mark_finished();
282 let Ok(sender) = sender.await else {
283 return Ok(Default::default());
285 };
286
287 let stream = generator! {
288 let intermediate_asset = get_intermediate_asset(
289 *chunking_context,
290 *module,
291 *runtime_entries,
292 ).to_resolved().await?;
293 let renderer_pool_op = get_renderer_pool_operation(
294 cwd,
295 env,
296 intermediate_asset,
297 intermediate_output_path,
298 output_root,
299 project_dir,
300 debug,
301 );
302
303 let pool = renderer_pool_op.read_strongly_consistent().await?;
306 let data = data.await?;
307 let mut operation = pool.operation().await?;
308
309 operation
310 .send(RenderStaticOutgoingMessage::Headers { data: &data })
311 .await
312 .context("sending headers to node.js process")?;
313
314 let entry = module.ident().to_string().await?;
315 let guard = duration_span!("Node.js rendering", entry = display(entry));
316
317 match operation.recv().await? {
318 RenderStaticIncomingMessage::Headers { data } => yield RenderItem::Headers(data),
319 RenderStaticIncomingMessage::Rewrite { path } => {
320 drop(guard);
321 yield RenderItem::Response(
322 StaticResult::rewrite(RewriteBuilder::new(path).build()).to_resolved().await?
323 );
324 return;
325 }
326 RenderStaticIncomingMessage::Response {
327 status_code,
328 headers,
329 body,
330 } => {
331 drop(guard);
332 yield RenderItem::Response(
333 StaticResult::content(
334 AssetContent::file(File::from(body).into()),
335 status_code,
336 Vc::cell(headers),
337 ).to_resolved().await?
338 );
339 return;
340 }
341 RenderStaticIncomingMessage::Error(error) => {
342 drop(guard);
343 let trace = trace_stack(
346 error,
347 *intermediate_asset,
348 *intermediate_output_path,
349 *project_dir,
350 )
351 .await?;
352 yield RenderItem::Response(
353 StaticResult::content(
354 static_error(path, anyhow!(trace), Some(operation), *fallback_page).await?,
355 500,
356 HeaderList::empty(),
357 ).to_resolved().await?
358 );
359 return;
360 }
361 v => {
362 drop(guard);
363 Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
364 return;
365 },
366 };
367
368 loop {
371 match operation.recv().await? {
372 RenderStaticIncomingMessage::BodyChunk { data } => {
373 yield RenderItem::BodyChunk(data.into());
374 }
375 RenderStaticIncomingMessage::BodyEnd => break,
376 RenderStaticIncomingMessage::Error(error) => {
377 operation.disallow_reuse();
380 let trace =
381 trace_stack(error, *intermediate_asset, *intermediate_output_path, *project_dir).await?;
382 drop(guard);
383 Err(anyhow!("error during streaming render: {}", trace))?;
384 return;
385 }
386 v => {
387 drop(guard);
388 Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
389 return;
390 },
391 }
392 }
393 drop(guard);
394 };
395
396 let mut sender = (sender.get)();
397 pin_mut!(stream);
398 while let Some(value) = stream.next().await {
399 if sender.send(value).await.is_err() {
400 return Ok(Default::default());
401 }
402 if sender.flush().await.is_err() {
403 return Ok(Default::default());
404 }
405 }
406
407 Ok(Default::default())
408}