turbopack_dev_server/update/
stream.rs1use std::pin::Pin;
2
3use anyhow::Result;
4use futures::prelude::*;
5use tokio::sync::mpsc::Sender;
6use tokio_stream::wrappers::ReceiverStream;
7use tracing::Instrument;
8use turbo_rcstr::{RcStr, rcstr};
9use turbo_tasks::{
10 IntoTraitRef, NonLocalValue, OperationVc, ReadRef, ResolvedVc, TransientInstance, Vc,
11 trace::{TraceRawVcs, TraceRawVcsContext},
12};
13use turbo_tasks_fs::{FileSystem, FileSystemPath};
14use turbopack_core::{
15 error::PrettyPrintError,
16 issue::{
17 Issue, IssueDescriptionExt, IssueSeverity, IssueStage, OptionIssueProcessingPathItems,
18 OptionStyledString, PlainIssue, StyledString,
19 },
20 server_fs::ServerFileSystem,
21 version::{
22 NotFoundVersion, PartialUpdate, TotalUpdate, Update, Version, VersionState,
23 VersionedContent,
24 },
25};
26
27use crate::source::{ProxyResult, resolve::ResolveSourceRequestResult};
28
29struct TypedGetContentFn<C> {
30 capture: C,
31 func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
32}
33
34unsafe impl<C: NonLocalValue> NonLocalValue for TypedGetContentFn<C> {}
37
38impl<C: TraceRawVcs> TraceRawVcs for TypedGetContentFn<C> {
40 fn trace_raw_vcs(&self, trace_context: &mut TraceRawVcsContext) {
41 self.capture.trace_raw_vcs(trace_context);
42 }
43}
44
45trait TypedGetContentFnTrait: NonLocalValue + TraceRawVcs {
46 fn call(&self) -> OperationVc<ResolveSourceRequestResult>;
47}
48
49impl<C> TypedGetContentFnTrait for TypedGetContentFn<C>
50where
51 C: NonLocalValue + TraceRawVcs,
52{
53 fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
54 (self.func)(&self.capture)
55 }
56}
57
58#[derive(NonLocalValue, TraceRawVcs)]
64pub struct GetContentFn {
65 inner: Box<dyn TypedGetContentFnTrait + Send + Sync>,
66}
67
68impl GetContentFn {
69 pub fn new<C>(
72 capture: C,
73 func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
74 ) -> Self
75 where
76 C: NonLocalValue + TraceRawVcs + Send + Sync + 'static,
77 {
78 Self {
79 inner: Box::new(TypedGetContentFn { capture, func }),
80 }
81 }
82}
83
84impl GetContentFn {
85 fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
86 self.inner.call()
87 }
88}
89
90async fn peek_issues<T: Send>(source: OperationVc<T>) -> Result<Vec<ReadRef<PlainIssue>>> {
91 let captured = source.peek_issues_with_path().await?;
92
93 captured.get_plain_issues().await
94}
95
96fn extend_issues(issues: &mut Vec<ReadRef<PlainIssue>>, new_issues: Vec<ReadRef<PlainIssue>>) {
97 for issue in new_issues {
98 if issues.contains(&issue) {
99 continue;
100 }
101
102 issues.push(issue);
103 }
104}
105
106#[turbo_tasks::function(operation)]
107fn versioned_content_update_operation(
108 content: ResolvedVc<Box<dyn VersionedContent>>,
109 from: ResolvedVc<Box<dyn Version>>,
110) -> Vc<Update> {
111 content.update(*from)
112}
113
114#[turbo_tasks::function(operation)]
115async fn get_update_stream_item_operation(
116 resource: RcStr,
117 from: ResolvedVc<VersionState>,
118 get_content: TransientInstance<GetContentFn>,
119) -> Result<Vc<UpdateStreamItem>> {
120 let content_op = get_content.call();
121 let content_result = content_op.read_strongly_consistent().await;
122 let mut plain_issues = peek_issues(content_op).await?;
123
124 let content_value = match content_result {
125 Ok(content) => content,
126 Err(e) => {
127 plain_issues.push(
128 PlainIssue::from_issue(
129 Vc::upcast(
130 FatalStreamIssue {
131 resource,
132 description: StyledString::Text(
133 format!("{}", PrettyPrintError(&e)).into(),
134 )
135 .resolved_cell(),
136 }
137 .cell(),
138 ),
139 None,
140 OptionIssueProcessingPathItems::none(),
141 )
142 .await?,
143 );
144
145 let update = Update::Total(TotalUpdate {
146 to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
147 .into_trait_ref()
148 .await?,
149 })
150 .cell();
151 return Ok(UpdateStreamItem::Found {
152 update: update.await?,
153 issues: plain_issues,
154 }
155 .cell());
156 }
157 };
158
159 match *content_value {
160 ResolveSourceRequestResult::Static(static_content_vc, _) => {
161 let static_content = static_content_vc.await?;
162
163 if static_content.status_code == 404 {
165 return Ok(UpdateStreamItem::NotFound.cell());
166 }
167
168 let resolved_content = static_content.content;
169 let from = from.get().to_resolved().await?;
170 let update_op = versioned_content_update_operation(resolved_content, from);
171
172 extend_issues(&mut plain_issues, peek_issues(update_op).await?);
173
174 Ok(UpdateStreamItem::Found {
175 update: update_op.connect().await?,
176 issues: plain_issues,
177 }
178 .cell())
179 }
180 ResolveSourceRequestResult::HttpProxy(proxy_result_op) => {
181 let proxy_result_vc = proxy_result_op.connect();
182 let proxy_result_value = proxy_result_vc.await?;
183
184 if proxy_result_value.status == 404 {
185 return Ok(UpdateStreamItem::NotFound.cell());
186 }
187
188 extend_issues(&mut plain_issues, peek_issues(proxy_result_op).await?);
189
190 let from = from.get();
191 if let Some(from) = Vc::try_resolve_downcast_type::<ProxyResult>(from).await?
192 && from.await? == proxy_result_value
193 {
194 return Ok(UpdateStreamItem::Found {
195 update: Update::None.cell().await?,
196 issues: plain_issues,
197 }
198 .cell());
199 }
200
201 Ok(UpdateStreamItem::Found {
202 update: Update::Total(TotalUpdate {
203 to: Vc::upcast::<Box<dyn Version>>(proxy_result_vc)
204 .into_trait_ref()
205 .await?,
206 })
207 .cell()
208 .await?,
209 issues: plain_issues,
210 }
211 .cell())
212 }
213 _ => {
214 let update = if plain_issues.is_empty() {
215 Update::Total(TotalUpdate {
220 to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
221 .into_trait_ref()
222 .await?,
223 })
224 .cell()
225 } else {
226 Update::None.cell()
227 };
228
229 Ok(UpdateStreamItem::Found {
230 update: update.await?,
231 issues: plain_issues,
232 }
233 .cell())
234 }
235 }
236}
237
238#[derive(TraceRawVcs)]
239struct ComputeUpdateStreamSender(
240 #[turbo_tasks(trace_ignore)] Sender<Result<ReadRef<UpdateStreamItem>>>,
247);
248
249#[turbo_tasks::function]
252async fn compute_update_stream(
253 resource: RcStr,
254 from: ResolvedVc<VersionState>,
255 get_content: TransientInstance<GetContentFn>,
256 sender: TransientInstance<ComputeUpdateStreamSender>,
257) -> Vc<()> {
258 let item = get_update_stream_item_operation(resource, from, get_content)
259 .read_strongly_consistent()
260 .await;
261
262 let _ = sender.0.send(item).await;
264
265 Default::default()
266}
267
268pub(super) struct UpdateStream(
269 Pin<Box<dyn Stream<Item = Result<ReadRef<UpdateStreamItem>>> + Send + Sync>>,
270);
271
272impl UpdateStream {
273 #[tracing::instrument(skip(get_content), name = "UpdateStream::new")]
274 pub async fn new(
275 resource: RcStr,
276 get_content: TransientInstance<GetContentFn>,
277 ) -> Result<UpdateStream> {
278 let (sx, rx) = tokio::sync::mpsc::channel(32);
279
280 let content = get_content.call();
281 let version = match *content.connect().await? {
284 ResolveSourceRequestResult::Static(static_content, _) => {
285 static_content.await?.content.version()
286 }
287 ResolveSourceRequestResult::HttpProxy(proxy_result) => {
288 Vc::upcast(proxy_result.connect())
289 }
290 _ => Vc::upcast(NotFoundVersion::new()),
291 };
292 let version_state = VersionState::new(version.into_trait_ref().await?).await?;
293
294 let _ = compute_update_stream(
295 resource,
296 version_state,
297 get_content,
298 TransientInstance::new(ComputeUpdateStreamSender(sx)),
299 );
300
301 let mut last_had_issues = false;
302
303 let stream = ReceiverStream::new(rx).filter_map(move |item| {
304 {
305 let (has_issues, issues_changed) =
306 if let Ok(UpdateStreamItem::Found { issues, .. }) = item.as_deref() {
307 let has_issues = !issues.is_empty();
308 let issues_changed = has_issues != last_had_issues;
309 last_had_issues = has_issues;
310 (has_issues, issues_changed)
311 } else {
312 (false, false)
313 };
314
315 async move {
316 match item.as_deref() {
317 Ok(UpdateStreamItem::Found { update, .. }) => {
318 match &**update {
319 Update::Partial(PartialUpdate { to, .. })
320 | Update::Total(TotalUpdate { to }) => {
321 version_state
322 .set(to.clone())
323 .await
324 .expect("failed to update version");
325
326 Some(item)
327 }
328 Update::None | Update::Missing => {
330 if has_issues || issues_changed {
331 Some(item)
332 } else {
333 None
334 }
335 }
336 }
337 }
338 _ => {
339 Some(item)
341 }
342 }
343 }
344 .in_current_span()
345 }
346 .in_current_span()
347 });
348
349 Ok(UpdateStream(Box::pin(stream)))
350 }
351}
352
353impl Stream for UpdateStream {
354 type Item = Result<ReadRef<UpdateStreamItem>>;
355
356 fn poll_next(
357 self: Pin<&mut Self>,
358 cx: &mut std::task::Context<'_>,
359 ) -> std::task::Poll<Option<Self::Item>> {
360 Pin::new(&mut self.get_mut().0).poll_next(cx)
361 }
362}
363
364#[turbo_tasks::value(serialization = "none")]
365#[derive(Debug)]
366pub enum UpdateStreamItem {
367 NotFound,
368 Found {
369 update: ReadRef<Update>,
370 issues: Vec<ReadRef<PlainIssue>>,
371 },
372}
373
374#[turbo_tasks::value(serialization = "none")]
375struct FatalStreamIssue {
376 description: ResolvedVc<StyledString>,
377 resource: RcStr,
378}
379
380#[turbo_tasks::value_impl]
381impl Issue for FatalStreamIssue {
382 fn severity(&self) -> IssueSeverity {
383 IssueSeverity::Fatal
384 }
385
386 #[turbo_tasks::function]
387 fn stage(&self) -> Vc<IssueStage> {
388 IssueStage::Other("websocket".into()).cell()
389 }
390
391 #[turbo_tasks::function]
392 async fn file_path(&self) -> Result<Vc<FileSystemPath>> {
393 Ok(ServerFileSystem::new()
394 .root()
395 .await?
396 .join(&self.resource)?
397 .cell())
398 }
399
400 #[turbo_tasks::function]
401 fn title(&self) -> Vc<StyledString> {
402 StyledString::Text(rcstr!("Fatal error while getting content to stream")).cell()
403 }
404
405 #[turbo_tasks::function]
406 fn description(&self) -> Vc<OptionStyledString> {
407 Vc::cell(Some(self.description))
408 }
409}
410
411#[cfg(test)]
412pub mod test {
413 use std::sync::{
414 Arc,
415 atomic::{AtomicI32, Ordering},
416 };
417
418 use turbo_tasks::TurboTasks;
419 use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
420
421 use super::*;
422
423 #[turbo_tasks::function(operation)]
424 pub fn noop_operation() -> Vc<ResolveSourceRequestResult> {
425 ResolveSourceRequestResult::NotFound.cell()
426 }
427
428 #[tokio::test]
429 async fn test_get_content_fn() {
430 crate::register();
431 let tt = TurboTasks::new(TurboTasksBackend::new(
432 BackendOptions::default(),
433 noop_backing_storage(),
434 ));
435 tt.run_once(async move {
436 let number = Arc::new(AtomicI32::new(0));
437 fn func(number: &Arc<AtomicI32>) -> OperationVc<ResolveSourceRequestResult> {
438 number.store(42, Ordering::SeqCst);
439 noop_operation()
440 }
441 let wrapped_func = GetContentFn::new(number.clone(), func);
442 let return_value = wrapped_func
443 .call()
444 .read_strongly_consistent()
445 .await
446 .unwrap();
447 assert_eq!(number.load(Ordering::SeqCst), 42);
448 assert!(*return_value == ResolveSourceRequestResult::NotFound);
450 Ok(())
451 })
452 .await
453 .unwrap();
454 }
455}