1use anyhow::{Context, Result, anyhow, bail};
2use async_stream::try_stream as generator;
3use futures::{
4 SinkExt, StreamExt, TryStreamExt,
5 channel::mpsc::{UnboundedSender, unbounded},
6 pin_mut,
7};
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10use turbo_rcstr::rcstr;
11use turbo_tasks::{
12 RawVc, ResolvedVc, TaskInput, ValueToString, Vc, VcValueType, duration_span, mark_finished,
13 prevent_gc, trace::TraceRawVcs, util::SharedError,
14};
15use turbo_tasks_bytes::{Bytes, Stream};
16use turbo_tasks_env::ProcessEnv;
17use turbo_tasks_fs::{File, FileSystemPath};
18use turbopack_core::{
19 asset::{Asset, AssetContent},
20 chunk::{ChunkingContext, EvaluatableAsset, EvaluatableAssets},
21 error::PrettyPrintError,
22 issue::{IssueExt, StyledString},
23 module::Module,
24};
25use turbopack_dev_server::{
26 html::DevHtmlAsset,
27 source::{Body, HeaderList, Rewrite, RewriteBuilder},
28};
29
30use super::{
31 RenderData, RenderStaticIncomingMessage, RenderStaticOutgoingMessage, issue::RenderingIssue,
32};
33use crate::{
34 ResponseHeaders, get_intermediate_asset, get_renderer_pool_operation, pool::NodeJsOperation,
35 render::error_page::error_html_body, source_map::trace_stack,
36};
37
38#[derive(Clone, Debug)]
39#[turbo_tasks::value]
40pub enum StaticResult {
41 Content {
42 content: ResolvedVc<AssetContent>,
43 status_code: u16,
44 headers: ResolvedVc<HeaderList>,
45 },
46 StreamedContent {
47 status: u16,
48 headers: ResolvedVc<HeaderList>,
49 body: Body,
50 },
51 Rewrite(ResolvedVc<Rewrite>),
52}
53
54#[turbo_tasks::value_impl]
55impl StaticResult {
56 #[turbo_tasks::function]
57 pub fn content(
58 content: ResolvedVc<AssetContent>,
59 status_code: u16,
60 headers: ResolvedVc<HeaderList>,
61 ) -> Vc<Self> {
62 StaticResult::Content {
63 content,
64 status_code,
65 headers,
66 }
67 .cell()
68 }
69
70 #[turbo_tasks::function]
71 pub fn rewrite(rewrite: ResolvedVc<Rewrite>) -> Vc<Self> {
72 StaticResult::Rewrite(rewrite).cell()
73 }
74}
75
76#[turbo_tasks::function(operation)]
78pub async fn render_static_operation(
79 cwd: FileSystemPath,
80 env: ResolvedVc<Box<dyn ProcessEnv>>,
81 path: FileSystemPath,
82 module: ResolvedVc<Box<dyn EvaluatableAsset>>,
83 runtime_entries: ResolvedVc<EvaluatableAssets>,
84 fallback_page: ResolvedVc<DevHtmlAsset>,
85 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
86 intermediate_output_path: FileSystemPath,
87 output_root: FileSystemPath,
88 project_dir: FileSystemPath,
89 data: ResolvedVc<RenderData>,
90 debug: bool,
91) -> Result<Vc<StaticResult>> {
92 let render = render_stream(RenderStreamOptions {
93 cwd,
94 env,
95 path,
96 module,
97 runtime_entries,
98 fallback_page,
99 chunking_context,
100 intermediate_output_path,
101 output_root,
102 project_dir,
103 data,
104 debug,
105 })
106 .await?;
107
108 let mut stream = render.read();
109 let first = match stream.try_next().await? {
110 Some(f) => f,
111 None => {
112 bail!("did not receive response from render");
115 }
116 };
117
118 Ok(match first {
119 RenderItem::Response(response) => *response,
120 RenderItem::Headers(data) => {
121 let body = stream.map(|item| match item {
122 Ok(RenderItem::BodyChunk(b)) => Ok(b),
123 Ok(v) => Err(SharedError::new(anyhow!(
124 "unexpected render item: {:#?}",
125 v
126 ))),
127 Err(e) => Err(e),
128 });
129 StaticResult::StreamedContent {
130 status: data.status,
131 headers: ResolvedVc::cell(data.headers),
132 body: Body::from_stream(body),
133 }
134 .cell()
135 }
136 v => bail!("unexpected render item: {:#?}", v),
137 })
138}
139
140async fn static_error(
141 path: FileSystemPath,
142 error: anyhow::Error,
143 operation: Option<NodeJsOperation>,
144 fallback_page: Vc<DevHtmlAsset>,
145) -> Result<Vc<AssetContent>> {
146 let status = match operation {
147 Some(operation) => Some(operation.wait_or_kill().await?),
148 None => None,
149 };
150
151 let error = format!("{}", PrettyPrintError(&error));
152 let mut message = error
153 .replace('&', "&")
155 .replace('>', ">")
156 .replace('<', "<");
157
158 if let Some(status) = status {
159 message.push_str(&format!("\n\nStatus: {status}"));
160 }
161
162 let mut body = "<script id=\"__NEXT_DATA__\" type=\"application/json\">{ \"props\": {} \
163 }</script>"
164 .to_string();
165
166 body.push_str(
167 error_html_body(500, rcstr!("Error rendering page"), message.into())
168 .await?
169 .as_str(),
170 );
171
172 let issue = RenderingIssue {
173 file_path: path,
174 message: StyledString::Text(error.into()).resolved_cell(),
175 status: status.and_then(|status| status.code()),
176 };
177
178 issue.resolved_cell().emit();
179
180 let html = fallback_page.with_body(body.into());
181
182 Ok(html.content())
183}
184
185#[derive(Clone, Debug)]
186#[turbo_tasks::value]
187enum RenderItem {
188 Response(ResolvedVc<StaticResult>),
189 Headers(ResponseHeaders),
190 BodyChunk(Bytes),
191}
192
193type RenderItemResult = Result<RenderItem, SharedError>;
194
195#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
196struct RenderStreamSender {
197 #[turbo_tasks(trace_ignore, debug_ignore)]
198 get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
199}
200
201#[turbo_tasks::value(transparent)]
202struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);
203
204#[derive(Clone, Debug, TaskInput, PartialEq, Eq, Hash, Deserialize, Serialize, TraceRawVcs)]
205struct RenderStreamOptions {
206 cwd: FileSystemPath,
207 env: ResolvedVc<Box<dyn ProcessEnv>>,
208 path: FileSystemPath,
209 module: ResolvedVc<Box<dyn EvaluatableAsset>>,
210 runtime_entries: ResolvedVc<EvaluatableAssets>,
211 fallback_page: ResolvedVc<DevHtmlAsset>,
212 chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
213 intermediate_output_path: FileSystemPath,
214 output_root: FileSystemPath,
215 project_dir: FileSystemPath,
216 data: ResolvedVc<RenderData>,
217 debug: bool,
218}
219
220#[turbo_tasks::function]
221fn render_stream(options: RenderStreamOptions) -> Vc<RenderStream> {
222 prevent_gc();
225
226 let cell = turbo_tasks::macro_helpers::find_cell_by_type(
232 <RenderStream as VcValueType>::get_value_type_id(),
233 );
234
235 let (sender, receiver) = unbounded();
238 cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
239 let initial = Mutex::new(Some(sender));
240
241 let _ = render_stream_internal(
243 options,
244 RenderStreamSender {
245 get: Box::new(move || {
246 if let Some(sender) = initial.lock().take() {
247 sender
248 } else {
249 let (sender, receiver) = unbounded();
252 cell.update(RenderStream(Stream::new_open(vec![], Box::new(receiver))));
253 sender
254 }
255 }),
256 }
257 .cell(),
258 );
259
260 let raw: RawVc = cell.into();
261 raw.into()
262}
263
264#[turbo_tasks::function]
265async fn render_stream_internal(
266 options: RenderStreamOptions,
267 sender: Vc<RenderStreamSender>,
268) -> Result<Vc<()>> {
269 let RenderStreamOptions {
270 cwd,
271 env,
272 path,
273 module,
274 runtime_entries,
275 fallback_page,
276 chunking_context,
277 intermediate_output_path,
278 output_root,
279 project_dir,
280 data,
281 debug,
282 } = options;
283
284 mark_finished();
285 let Ok(sender) = sender.await else {
286 return Ok(Default::default());
288 };
289
290 let stream = generator! {
291 let intermediate_asset = get_intermediate_asset(
292 *chunking_context,
293 *module,
294 *runtime_entries,
295 ).to_resolved().await?;
296 let renderer_pool_op = get_renderer_pool_operation(
297 cwd,
298 env,
299 intermediate_asset,
300 intermediate_output_path.clone(),
301 output_root,
302 project_dir.clone(),
303 debug,
304 );
305
306 let pool = renderer_pool_op.read_strongly_consistent().await?;
309 let data = data.await?;
310 let mut operation = pool.operation().await?;
311
312 operation
313 .send(RenderStaticOutgoingMessage::Headers { data: &data })
314 .await
315 .context("sending headers to node.js process")?;
316
317 let entry = module.ident().to_string().await?;
318 let guard = duration_span!("Node.js rendering", entry = display(entry));
319
320 match operation.recv().await? {
321 RenderStaticIncomingMessage::Headers { data } => yield RenderItem::Headers(data),
322 RenderStaticIncomingMessage::Rewrite { path } => {
323 drop(guard);
324 yield RenderItem::Response(
325 StaticResult::rewrite(RewriteBuilder::new(path).build()).to_resolved().await?
326 );
327 return;
328 }
329 RenderStaticIncomingMessage::Response {
330 status_code,
331 headers,
332 body,
333 } => {
334 drop(guard);
335 yield RenderItem::Response(
336 StaticResult::content(
337 AssetContent::file(File::from(body).into()),
338 status_code,
339 Vc::cell(headers),
340 ).to_resolved().await?
341 );
342 return;
343 }
344 RenderStaticIncomingMessage::Error(error) => {
345 drop(guard);
346 let trace = trace_stack(
349 error,
350 *intermediate_asset,
351 intermediate_output_path.clone(),
352 project_dir.clone(),
353 )
354 .await?;
355 yield RenderItem::Response(
356 StaticResult::content(
357 static_error(path, anyhow!(trace), Some(operation), *fallback_page).await?,
358 500,
359 HeaderList::empty(),
360 ).to_resolved().await?
361 );
362 return;
363 }
364 v => {
365 drop(guard);
366 Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
367 return;
368 },
369 };
370
371 loop {
374 match operation.recv().await? {
375 RenderStaticIncomingMessage::BodyChunk { data } => {
376 yield RenderItem::BodyChunk(data.into());
377 }
378 RenderStaticIncomingMessage::BodyEnd => break,
379 RenderStaticIncomingMessage::Error(error) => {
380 operation.disallow_reuse();
383 let trace =
384 trace_stack(error, *intermediate_asset, intermediate_output_path.clone(), project_dir.clone()).await?;
385 drop(guard);
386 Err(anyhow!("error during streaming render: {}", trace))?;
387 return;
388 }
389 v => {
390 drop(guard);
391 Err(anyhow!("unexpected message during rendering: {:#?}", v))?;
392 return;
393 },
394 }
395 }
396 drop(guard);
397 };
398
399 let mut sender = (sender.get)();
400 pin_mut!(stream);
401 while let Some(value) = stream.next().await {
402 if sender.send(value).await.is_err() {
403 return Ok(Default::default());
404 }
405 if sender.flush().await.is_err() {
406 return Ok(Default::default());
407 }
408 }
409
410 Ok(Default::default())
411}