turbopack_dev_server/update/
stream.rs

1use std::pin::Pin;
2
3use anyhow::Result;
4use futures::prelude::*;
5use tokio::sync::mpsc::Sender;
6use tokio_stream::wrappers::ReceiverStream;
7use tracing::Instrument;
8use turbo_rcstr::{RcStr, rcstr};
9use turbo_tasks::{
10    IntoTraitRef, NonLocalValue, OperationVc, ReadRef, ResolvedVc, TransientInstance, Vc,
11    trace::{TraceRawVcs, TraceRawVcsContext},
12};
13use turbo_tasks_fs::{FileSystem, FileSystemPath};
14use turbopack_core::{
15    error::PrettyPrintError,
16    issue::{
17        Issue, IssueDescriptionExt, IssueSeverity, IssueStage, OptionIssueProcessingPathItems,
18        OptionStyledString, PlainIssue, StyledString,
19    },
20    server_fs::ServerFileSystem,
21    version::{
22        NotFoundVersion, PartialUpdate, TotalUpdate, Update, Version, VersionState,
23        VersionedContent,
24    },
25};
26
27use crate::source::{ProxyResult, resolve::ResolveSourceRequestResult};
28
29struct TypedGetContentFn<C> {
30    capture: C,
31    func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
32}
33
34// Manual (non-derive) impl required due to: https://github.com/rust-lang/rust/issues/70263
35// Safety: `capture` is `NonLocalValue`, `func` stores no data (is a static pointer to code)
36unsafe impl<C: NonLocalValue> NonLocalValue for TypedGetContentFn<C> {}
37
38// Manual (non-derive) impl required due to: https://github.com/rust-lang/rust/issues/70263
39impl<C: TraceRawVcs> TraceRawVcs for TypedGetContentFn<C> {
40    fn trace_raw_vcs(&self, trace_context: &mut TraceRawVcsContext) {
41        self.capture.trace_raw_vcs(trace_context);
42    }
43}
44
45trait TypedGetContentFnTrait: NonLocalValue + TraceRawVcs {
46    fn call(&self) -> OperationVc<ResolveSourceRequestResult>;
47}
48
49impl<C> TypedGetContentFnTrait for TypedGetContentFn<C>
50where
51    C: NonLocalValue + TraceRawVcs,
52{
53    fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
54        (self.func)(&self.capture)
55    }
56}
57
58/// A wrapper type returning [`OperationVc<ResolveSourceRequestResult>`][ResolveSourceRequestResult]
59/// that implements [`NonLocalValue`] and [`TraceRawVcs`].
60///
61/// The capture (e.g. moved values in a closure) and function pointer are stored separately to allow
62/// safe implementation of these desired traits.
63#[derive(NonLocalValue, TraceRawVcs)]
64pub struct GetContentFn {
65    inner: Box<dyn TypedGetContentFnTrait + Send + Sync>,
66}
67
68impl GetContentFn {
69    /// Wrap a function and an optional capture variable (used to simulate a closure) in
70    /// `GetContentFn`.
71    pub fn new<C>(
72        capture: C,
73        func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
74    ) -> Self
75    where
76        C: NonLocalValue + TraceRawVcs + Send + Sync + 'static,
77    {
78        Self {
79            inner: Box::new(TypedGetContentFn { capture, func }),
80        }
81    }
82}
83
84impl GetContentFn {
85    fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
86        self.inner.call()
87    }
88}
89
90async fn peek_issues<T: Send>(source: OperationVc<T>) -> Result<Vec<ReadRef<PlainIssue>>> {
91    let captured = source.peek_issues_with_path().await?;
92
93    captured.get_plain_issues().await
94}
95
96fn extend_issues(issues: &mut Vec<ReadRef<PlainIssue>>, new_issues: Vec<ReadRef<PlainIssue>>) {
97    for issue in new_issues {
98        if issues.contains(&issue) {
99            continue;
100        }
101
102        issues.push(issue);
103    }
104}
105
106#[turbo_tasks::function(operation)]
107fn versioned_content_update_operation(
108    content: ResolvedVc<Box<dyn VersionedContent>>,
109    from: ResolvedVc<Box<dyn Version>>,
110) -> Vc<Update> {
111    content.update(*from)
112}
113
114#[turbo_tasks::function(operation)]
115async fn get_update_stream_item_operation(
116    resource: RcStr,
117    from: ResolvedVc<VersionState>,
118    get_content: TransientInstance<GetContentFn>,
119) -> Result<Vc<UpdateStreamItem>> {
120    let content_op = get_content.call();
121    let content_result = content_op.read_strongly_consistent().await;
122    let mut plain_issues = peek_issues(content_op).await?;
123
124    let content_value = match content_result {
125        Ok(content) => content,
126        Err(e) => {
127            plain_issues.push(
128                PlainIssue::from_issue(
129                    Vc::upcast(
130                        FatalStreamIssue {
131                            resource,
132                            description: StyledString::Text(
133                                format!("{}", PrettyPrintError(&e)).into(),
134                            )
135                            .resolved_cell(),
136                        }
137                        .cell(),
138                    ),
139                    None,
140                    OptionIssueProcessingPathItems::none(),
141                )
142                .await?,
143            );
144
145            let update = Update::Total(TotalUpdate {
146                to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
147                    .into_trait_ref()
148                    .await?,
149            })
150            .cell();
151            return Ok(UpdateStreamItem::Found {
152                update: update.await?,
153                issues: plain_issues,
154            }
155            .cell());
156        }
157    };
158
159    match *content_value {
160        ResolveSourceRequestResult::Static(static_content_vc, _) => {
161            let static_content = static_content_vc.await?;
162
163            // This can happen when a chunk is removed from the asset graph.
164            if static_content.status_code == 404 {
165                return Ok(UpdateStreamItem::NotFound.cell());
166            }
167
168            let resolved_content = static_content.content;
169            let from = from.get().to_resolved().await?;
170            let update_op = versioned_content_update_operation(resolved_content, from);
171
172            extend_issues(&mut plain_issues, peek_issues(update_op).await?);
173
174            Ok(UpdateStreamItem::Found {
175                update: update_op.connect().await?,
176                issues: plain_issues,
177            }
178            .cell())
179        }
180        ResolveSourceRequestResult::HttpProxy(proxy_result_op) => {
181            let proxy_result_vc = proxy_result_op.connect();
182            let proxy_result_value = proxy_result_vc.await?;
183
184            if proxy_result_value.status == 404 {
185                return Ok(UpdateStreamItem::NotFound.cell());
186            }
187
188            extend_issues(&mut plain_issues, peek_issues(proxy_result_op).await?);
189
190            let from = from.get();
191            if let Some(from) = Vc::try_resolve_downcast_type::<ProxyResult>(from).await?
192                && from.await? == proxy_result_value
193            {
194                return Ok(UpdateStreamItem::Found {
195                    update: Update::None.cell().await?,
196                    issues: plain_issues,
197                }
198                .cell());
199            }
200
201            Ok(UpdateStreamItem::Found {
202                update: Update::Total(TotalUpdate {
203                    to: Vc::upcast::<Box<dyn Version>>(proxy_result_vc)
204                        .into_trait_ref()
205                        .await?,
206                })
207                .cell()
208                .await?,
209                issues: plain_issues,
210            }
211            .cell())
212        }
213        _ => {
214            let update = if plain_issues.is_empty() {
215                // Client requested a non-existing asset
216                // It might be removed in meantime, reload client
217                // TODO add special instructions for removed assets to handled it in a better
218                // way
219                Update::Total(TotalUpdate {
220                    to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
221                        .into_trait_ref()
222                        .await?,
223                })
224                .cell()
225            } else {
226                Update::None.cell()
227            };
228
229            Ok(UpdateStreamItem::Found {
230                update: update.await?,
231                issues: plain_issues,
232            }
233            .cell())
234        }
235    }
236}
237
238#[derive(TraceRawVcs)]
239struct ComputeUpdateStreamSender(
240    // HACK: `trace_ignore`: It's not correct or safe to send `Vc`s across this mpsc channel, but
241    // (without nightly auto traits) there's no easy way for us to statically assert that
242    // `UpdateStreamItem` does not contain a `RawVc`.
243    //
244    // It could be safe (at least for the GC use-case) if we had some way of wrapping arbitrary
245    // objects in a GC root container.
246    #[turbo_tasks(trace_ignore)] Sender<Result<ReadRef<UpdateStreamItem>>>,
247);
248
249/// This function sends an [`UpdateStreamItem`] to `sender` every time it gets recomputed by
250/// turbo-tasks due to invalidation.
251#[turbo_tasks::function]
252async fn compute_update_stream(
253    resource: RcStr,
254    from: ResolvedVc<VersionState>,
255    get_content: TransientInstance<GetContentFn>,
256    sender: TransientInstance<ComputeUpdateStreamSender>,
257) -> Vc<()> {
258    let item = get_update_stream_item_operation(resource, from, get_content)
259        .read_strongly_consistent()
260        .await;
261
262    // Send update. Ignore channel closed error.
263    let _ = sender.0.send(item).await;
264
265    Default::default()
266}
267
268pub(super) struct UpdateStream(
269    Pin<Box<dyn Stream<Item = Result<ReadRef<UpdateStreamItem>>> + Send + Sync>>,
270);
271
272impl UpdateStream {
273    #[tracing::instrument(skip(get_content), name = "UpdateStream::new")]
274    pub async fn new(
275        resource: RcStr,
276        get_content: TransientInstance<GetContentFn>,
277    ) -> Result<UpdateStream> {
278        let (sx, rx) = tokio::sync::mpsc::channel(32);
279
280        let content = get_content.call();
281        // We can ignore issues reported in content here since [compute_update_stream]
282        // will handle them
283        let version = match *content.connect().await? {
284            ResolveSourceRequestResult::Static(static_content, _) => {
285                static_content.await?.content.version()
286            }
287            ResolveSourceRequestResult::HttpProxy(proxy_result) => {
288                Vc::upcast(proxy_result.connect())
289            }
290            _ => Vc::upcast(NotFoundVersion::new()),
291        };
292        let version_state = VersionState::new(version.into_trait_ref().await?).await?;
293
294        let _ = compute_update_stream(
295            resource,
296            version_state,
297            get_content,
298            TransientInstance::new(ComputeUpdateStreamSender(sx)),
299        );
300
301        let mut last_had_issues = false;
302
303        let stream = ReceiverStream::new(rx).filter_map(move |item| {
304            {
305                let (has_issues, issues_changed) =
306                    if let Ok(UpdateStreamItem::Found { issues, .. }) = item.as_deref() {
307                        let has_issues = !issues.is_empty();
308                        let issues_changed = has_issues != last_had_issues;
309                        last_had_issues = has_issues;
310                        (has_issues, issues_changed)
311                    } else {
312                        (false, false)
313                    };
314
315                async move {
316                    match item.as_deref() {
317                        Ok(UpdateStreamItem::Found { update, .. }) => {
318                            match &**update {
319                                Update::Partial(PartialUpdate { to, .. })
320                                | Update::Total(TotalUpdate { to }) => {
321                                    version_state
322                                        .set(to.clone())
323                                        .await
324                                        .expect("failed to update version");
325
326                                    Some(item)
327                                }
328                                // Do not propagate empty updates.
329                                Update::None | Update::Missing => {
330                                    if has_issues || issues_changed {
331                                        Some(item)
332                                    } else {
333                                        None
334                                    }
335                                }
336                            }
337                        }
338                        _ => {
339                            // Propagate other updates
340                            Some(item)
341                        }
342                    }
343                }
344                .in_current_span()
345            }
346            .in_current_span()
347        });
348
349        Ok(UpdateStream(Box::pin(stream)))
350    }
351}
352
353impl Stream for UpdateStream {
354    type Item = Result<ReadRef<UpdateStreamItem>>;
355
356    fn poll_next(
357        self: Pin<&mut Self>,
358        cx: &mut std::task::Context<'_>,
359    ) -> std::task::Poll<Option<Self::Item>> {
360        Pin::new(&mut self.get_mut().0).poll_next(cx)
361    }
362}
363
364#[turbo_tasks::value(serialization = "none")]
365#[derive(Debug)]
366pub enum UpdateStreamItem {
367    NotFound,
368    Found {
369        update: ReadRef<Update>,
370        issues: Vec<ReadRef<PlainIssue>>,
371    },
372}
373
374#[turbo_tasks::value(serialization = "none")]
375struct FatalStreamIssue {
376    description: ResolvedVc<StyledString>,
377    resource: RcStr,
378}
379
380#[turbo_tasks::value_impl]
381impl Issue for FatalStreamIssue {
382    fn severity(&self) -> IssueSeverity {
383        IssueSeverity::Fatal
384    }
385
386    #[turbo_tasks::function]
387    fn stage(&self) -> Vc<IssueStage> {
388        IssueStage::Other("websocket".into()).cell()
389    }
390
391    #[turbo_tasks::function]
392    async fn file_path(&self) -> Result<Vc<FileSystemPath>> {
393        Ok(ServerFileSystem::new()
394            .root()
395            .await?
396            .join(&self.resource)?
397            .cell())
398    }
399
400    #[turbo_tasks::function]
401    fn title(&self) -> Vc<StyledString> {
402        StyledString::Text(rcstr!("Fatal error while getting content to stream")).cell()
403    }
404
405    #[turbo_tasks::function]
406    fn description(&self) -> Vc<OptionStyledString> {
407        Vc::cell(Some(self.description))
408    }
409}
410
411#[cfg(test)]
412pub mod test {
413    use std::sync::{
414        Arc,
415        atomic::{AtomicI32, Ordering},
416    };
417
418    use turbo_tasks::TurboTasks;
419    use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
420
421    use super::*;
422
423    #[turbo_tasks::function(operation)]
424    pub fn noop_operation() -> Vc<ResolveSourceRequestResult> {
425        ResolveSourceRequestResult::NotFound.cell()
426    }
427
428    #[tokio::test]
429    async fn test_get_content_fn() {
430        crate::register();
431        let tt = TurboTasks::new(TurboTasksBackend::new(
432            BackendOptions::default(),
433            noop_backing_storage(),
434        ));
435        tt.run_once(async move {
436            let number = Arc::new(AtomicI32::new(0));
437            fn func(number: &Arc<AtomicI32>) -> OperationVc<ResolveSourceRequestResult> {
438                number.store(42, Ordering::SeqCst);
439                noop_operation()
440            }
441            let wrapped_func = GetContentFn::new(number.clone(), func);
442            let return_value = wrapped_func
443                .call()
444                .read_strongly_consistent()
445                .await
446                .unwrap();
447            assert_eq!(number.load(Ordering::SeqCst), 42);
448            // ResolveSourceRequestResult doesn't impl Debug
449            assert!(*return_value == ResolveSourceRequestResult::NotFound);
450            Ok(())
451        })
452        .await
453        .unwrap();
454    }
455}