Skip to main content

turbopack_dev_server/update/
stream.rs

1use std::pin::Pin;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::prelude::*;
6use tokio::sync::mpsc::Sender;
7use tokio_stream::wrappers::ReceiverStream;
8use tracing::Instrument;
9use turbo_rcstr::{RcStr, rcstr};
10use turbo_tasks::{
11    NonLocalValue, OperationVc, PrettyPrintError, ReadRef, ResolvedVc, TransientInstance, Vc,
12    trace::{TraceRawVcs, TraceRawVcsContext},
13};
14use turbo_tasks_fs::{FileSystem, FileSystemPath};
15use turbopack_core::{
16    issue::{
17        CollectibleIssuesExt, Issue, IssueFilter, IssueSeverity, IssueStage, PlainIssue,
18        StyledString,
19    },
20    server_fs::ServerFileSystem,
21    version::{
22        NotFoundVersion, PartialUpdate, TotalUpdate, Update, Version, VersionState,
23        VersionedContent,
24    },
25};
26
27use crate::source::{ProxyResult, resolve::ResolveSourceRequestResult};
28
29struct TypedGetContentFn<C> {
30    capture: C,
31    func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
32}
33
34// Manual (non-derive) impl required due to: https://github.com/rust-lang/rust/issues/70263
35// Safety: `capture` is `NonLocalValue`, `func` stores no data (is a static pointer to code)
36unsafe impl<C: NonLocalValue> NonLocalValue for TypedGetContentFn<C> {}
37
38// Manual (non-derive) impl required due to: https://github.com/rust-lang/rust/issues/70263
39impl<C: TraceRawVcs> TraceRawVcs for TypedGetContentFn<C> {
40    fn trace_raw_vcs(&self, trace_context: &mut TraceRawVcsContext) {
41        self.capture.trace_raw_vcs(trace_context);
42    }
43}
44
45trait TypedGetContentFnTrait: NonLocalValue + TraceRawVcs {
46    fn call(&self) -> OperationVc<ResolveSourceRequestResult>;
47}
48
49impl<C> TypedGetContentFnTrait for TypedGetContentFn<C>
50where
51    C: NonLocalValue + TraceRawVcs,
52{
53    fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
54        (self.func)(&self.capture)
55    }
56}
57
58/// A wrapper type returning [`OperationVc<ResolveSourceRequestResult>`][ResolveSourceRequestResult]
59/// that implements [`NonLocalValue`] and [`TraceRawVcs`].
60///
61/// The capture (e.g. moved values in a closure) and function pointer are stored separately to allow
62/// safe implementation of these desired traits.
63#[derive(NonLocalValue, TraceRawVcs)]
64pub struct GetContentFn {
65    inner: Box<dyn TypedGetContentFnTrait + Send + Sync>,
66}
67
68impl GetContentFn {
69    /// Wrap a function and an optional capture variable (used to simulate a closure) in
70    /// `GetContentFn`.
71    pub fn new<C>(
72        capture: C,
73        func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
74    ) -> Self
75    where
76        C: NonLocalValue + TraceRawVcs + Send + Sync + 'static,
77    {
78        Self {
79            inner: Box::new(TypedGetContentFn { capture, func }),
80        }
81    }
82}
83
84impl GetContentFn {
85    fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
86        self.inner.call()
87    }
88}
89
90async fn peek_issues<T: Send>(source: OperationVc<T>) -> Result<Vec<ReadRef<PlainIssue>>> {
91    let captured = source.peek_issues();
92
93    captured.get_plain_issues(IssueFilter::everything()).await
94}
95
96fn extend_issues(issues: &mut Vec<ReadRef<PlainIssue>>, new_issues: Vec<ReadRef<PlainIssue>>) {
97    for issue in new_issues {
98        if issues.contains(&issue) {
99            continue;
100        }
101
102        issues.push(issue);
103    }
104}
105
106#[turbo_tasks::function(operation, root)]
107fn versioned_content_update_operation(
108    content: ResolvedVc<Box<dyn VersionedContent>>,
109    from: ResolvedVc<Box<dyn Version>>,
110) -> Vc<Update> {
111    content.update(*from)
112}
113
114#[turbo_tasks::function(operation, root)]
115async fn get_update_stream_item_operation(
116    resource: RcStr,
117    from: ResolvedVc<VersionState>,
118    get_content: TransientInstance<GetContentFn>,
119) -> Result<Vc<UpdateStreamItem>> {
120    let content_op = get_content.call();
121    let content_result = content_op.read_strongly_consistent().await;
122    let mut plain_issues = peek_issues(content_op).await?;
123
124    let content_value = match content_result {
125        Ok(content) => content,
126        Err(e) => {
127            plain_issues.push(
128                PlainIssue::from_issue(
129                    Vc::upcast(
130                        FatalStreamIssue {
131                            resource,
132                            description: StyledString::Text(
133                                format!("{}", PrettyPrintError(&e)).into(),
134                            )
135                            .resolved_cell(),
136                        }
137                        .cell(),
138                    ),
139                    None,
140                )
141                .await?,
142            );
143
144            let update = Update::Total(TotalUpdate {
145                to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
146                    .into_trait_ref()
147                    .await?,
148            })
149            .cell();
150            return Ok(UpdateStreamItem::Found {
151                update: update.await?,
152                issues: plain_issues,
153            }
154            .cell());
155        }
156    };
157
158    match *content_value {
159        ResolveSourceRequestResult::Static(static_content_vc, _) => {
160            let static_content = static_content_vc.await?;
161
162            // This can happen when a chunk is removed from the asset graph.
163            if static_content.status_code == 404 {
164                return Ok(UpdateStreamItem::NotFound.cell());
165            }
166
167            let resolved_content = static_content.content;
168            let from = from.get().to_resolved().await?;
169            let update_op = versioned_content_update_operation(resolved_content, from);
170
171            extend_issues(&mut plain_issues, peek_issues(update_op).await?);
172
173            Ok(UpdateStreamItem::Found {
174                update: update_op.connect().await?,
175                issues: plain_issues,
176            }
177            .cell())
178        }
179        ResolveSourceRequestResult::HttpProxy(proxy_result_op) => {
180            let proxy_result_vc = proxy_result_op.connect();
181            let proxy_result_value = proxy_result_vc.await?;
182
183            if proxy_result_value.status == 404 {
184                return Ok(UpdateStreamItem::NotFound.cell());
185            }
186
187            extend_issues(&mut plain_issues, peek_issues(proxy_result_op).await?);
188
189            let from = from.get();
190            if let Some(from) =
191                ResolvedVc::try_downcast_type::<ProxyResult>(from.to_resolved().await?)
192                && from.await? == proxy_result_value
193            {
194                return Ok(UpdateStreamItem::Found {
195                    update: Update::None.cell().await?,
196                    issues: plain_issues,
197                }
198                .cell());
199            }
200
201            Ok(UpdateStreamItem::Found {
202                update: Update::Total(TotalUpdate {
203                    to: Vc::upcast::<Box<dyn Version>>(proxy_result_vc)
204                        .into_trait_ref()
205                        .await?,
206                })
207                .cell()
208                .await?,
209                issues: plain_issues,
210            }
211            .cell())
212        }
213        _ => {
214            let update = if plain_issues.is_empty() {
215                // Client requested a non-existing asset
216                // It might be removed in meantime, reload client
217                // TODO add special instructions for removed assets to handled it in a better
218                // way
219                Update::Total(TotalUpdate {
220                    to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
221                        .into_trait_ref()
222                        .await?,
223                })
224                .cell()
225            } else {
226                Update::None.cell()
227            };
228
229            Ok(UpdateStreamItem::Found {
230                update: update.await?,
231                issues: plain_issues,
232            }
233            .cell())
234        }
235    }
236}
237
238#[derive(TraceRawVcs)]
239struct ComputeUpdateStreamSender(
240    // HACK: `trace_ignore`: It's not correct or safe to send `Vc`s across this mpsc channel, but
241    // (without nightly auto traits) there's no easy way for us to statically assert that
242    // `UpdateStreamItem` does not contain a `RawVc`.
243    //
244    // It could be safe (at least for the GC use-case) if we had some way of wrapping arbitrary
245    // objects in a GC root container.
246    #[turbo_tasks(trace_ignore)] Sender<Result<ReadRef<UpdateStreamItem>>>,
247);
248
249/// This function sends an [`UpdateStreamItem`] to `sender` every time it gets recomputed by
250/// turbo-tasks due to invalidation.
251#[turbo_tasks::function]
252async fn compute_update_stream(
253    resource: RcStr,
254    from: ResolvedVc<VersionState>,
255    get_content: TransientInstance<GetContentFn>,
256    sender: TransientInstance<ComputeUpdateStreamSender>,
257) -> () {
258    let item = get_update_stream_item_operation(resource, from, get_content)
259        .read_strongly_consistent()
260        .await;
261
262    // Send update. Ignore channel closed error.
263    let _ = sender.0.send(item).await;
264}
265
266pub(super) struct UpdateStream(
267    Pin<Box<dyn Stream<Item = Result<ReadRef<UpdateStreamItem>>> + Send + Sync>>,
268);
269
270impl UpdateStream {
271    #[tracing::instrument(skip(get_content), name = "UpdateStream::new")]
272    pub async fn new(
273        resource: RcStr,
274        get_content: TransientInstance<GetContentFn>,
275    ) -> Result<UpdateStream> {
276        let (sx, rx) = tokio::sync::mpsc::channel(32);
277
278        let content = get_content.call();
279        // We can ignore issues reported in content here since [compute_update_stream]
280        // will handle them
281        let version = match *content.connect().await? {
282            ResolveSourceRequestResult::Static(static_content, _) => {
283                static_content.await?.content.version()
284            }
285            ResolveSourceRequestResult::HttpProxy(proxy_result) => {
286                Vc::upcast(proxy_result.connect())
287            }
288            _ => Vc::upcast(NotFoundVersion::new()),
289        };
290        let version_state = VersionState::new(version.into_trait_ref().await?).await?;
291
292        let _ = compute_update_stream(
293            resource,
294            version_state,
295            get_content,
296            TransientInstance::new(ComputeUpdateStreamSender(sx)),
297        );
298
299        let mut last_had_issues = false;
300
301        let stream = ReceiverStream::new(rx).filter_map(move |item| {
302            {
303                let (has_issues, issues_changed) =
304                    if let Ok(UpdateStreamItem::Found { issues, .. }) = item.as_deref() {
305                        let has_issues = !issues.is_empty();
306                        let issues_changed = has_issues != last_had_issues;
307                        last_had_issues = has_issues;
308                        (has_issues, issues_changed)
309                    } else {
310                        (false, false)
311                    };
312
313                async move {
314                    match item.as_deref() {
315                        Ok(UpdateStreamItem::Found { update, .. }) => {
316                            match &**update {
317                                Update::Partial(PartialUpdate { to, .. })
318                                | Update::Total(TotalUpdate { to }) => {
319                                    version_state
320                                        .set(to.clone())
321                                        .await
322                                        .expect("failed to update version");
323
324                                    Some(item)
325                                }
326                                // Do not propagate empty updates.
327                                Update::None | Update::Missing => {
328                                    if has_issues || issues_changed {
329                                        Some(item)
330                                    } else {
331                                        None
332                                    }
333                                }
334                            }
335                        }
336                        _ => {
337                            // Propagate other updates
338                            Some(item)
339                        }
340                    }
341                }
342                .in_current_span()
343            }
344            .in_current_span()
345        });
346
347        Ok(UpdateStream(Box::pin(stream)))
348    }
349}
350
351impl Stream for UpdateStream {
352    type Item = Result<ReadRef<UpdateStreamItem>>;
353
354    fn poll_next(
355        self: Pin<&mut Self>,
356        cx: &mut std::task::Context<'_>,
357    ) -> std::task::Poll<Option<Self::Item>> {
358        Pin::new(&mut self.get_mut().0).poll_next(cx)
359    }
360}
361
362#[turbo_tasks::value(serialization = "skip")]
363#[derive(Debug)]
364pub enum UpdateStreamItem {
365    NotFound,
366    Found {
367        update: ReadRef<Update>,
368        issues: Vec<ReadRef<PlainIssue>>,
369    },
370}
371
372#[turbo_tasks::value(serialization = "skip")]
373struct FatalStreamIssue {
374    description: ResolvedVc<StyledString>,
375    resource: RcStr,
376}
377
378#[async_trait]
379#[turbo_tasks::value_impl]
380impl Issue for FatalStreamIssue {
381    fn severity(&self) -> IssueSeverity {
382        IssueSeverity::Fatal
383    }
384
385    fn stage(&self) -> IssueStage {
386        IssueStage::Other(rcstr!("websocket"))
387    }
388
389    async fn file_path(&self) -> Result<FileSystemPath> {
390        ServerFileSystem::new().root().await?.join(&self.resource)
391    }
392
393    async fn title(&self) -> Result<StyledString> {
394        Ok(StyledString::Text(rcstr!(
395            "Fatal error while getting content to stream"
396        )))
397    }
398
399    async fn description(&self) -> Result<Option<StyledString>> {
400        Ok(Some((*self.description.await?).clone()))
401    }
402}
403
404#[cfg(test)]
405pub mod test {
406    use std::sync::{
407        Arc,
408        atomic::{AtomicI32, Ordering},
409    };
410
411    use turbo_tasks::TurboTasks;
412    use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
413
414    use super::*;
415
416    #[turbo_tasks::function(operation, root)]
417    pub fn noop_operation() -> Vc<ResolveSourceRequestResult> {
418        ResolveSourceRequestResult::NotFound.cell()
419    }
420
421    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
422    async fn test_get_content_fn() {
423        let tt = TurboTasks::new(TurboTasksBackend::new(
424            BackendOptions::default(),
425            noop_backing_storage(),
426        ));
427        tt.run_once(async move {
428            let number = Arc::new(AtomicI32::new(0));
429            fn func(number: &Arc<AtomicI32>) -> OperationVc<ResolveSourceRequestResult> {
430                number.store(42, Ordering::SeqCst);
431                noop_operation()
432            }
433            let wrapped_func = GetContentFn::new(number.clone(), func);
434            let return_value = wrapped_func
435                .call()
436                .read_strongly_consistent()
437                .await
438                .unwrap();
439            assert_eq!(number.load(Ordering::SeqCst), 42);
440            // ResolveSourceRequestResult doesn't impl Debug
441            assert!(*return_value == ResolveSourceRequestResult::NotFound);
442            Ok(())
443        })
444        .await
445        .unwrap();
446    }
447}