Skip to main content

turbopack_dev_server/update/
stream.rs

1use std::pin::Pin;
2
3use anyhow::Result;
4use futures::prelude::*;
5use tokio::sync::mpsc::Sender;
6use tokio_stream::wrappers::ReceiverStream;
7use tracing::Instrument;
8use turbo_rcstr::{RcStr, rcstr};
9use turbo_tasks::{
10    NonLocalValue, OperationVc, PrettyPrintError, ReadRef, ResolvedVc, TransientInstance, Vc,
11    trace::{TraceRawVcs, TraceRawVcsContext},
12};
13use turbo_tasks_fs::{FileSystem, FileSystemPath};
14use turbopack_core::{
15    issue::{
16        CollectibleIssuesExt, Issue, IssueFilter, IssueSeverity, IssueStage, OptionStyledString,
17        PlainIssue, StyledString,
18    },
19    server_fs::ServerFileSystem,
20    version::{
21        NotFoundVersion, PartialUpdate, TotalUpdate, Update, Version, VersionState,
22        VersionedContent,
23    },
24};
25
26use crate::source::{ProxyResult, resolve::ResolveSourceRequestResult};
27
28struct TypedGetContentFn<C> {
29    capture: C,
30    func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
31}
32
33// Manual (non-derive) impl required due to: https://github.com/rust-lang/rust/issues/70263
34// Safety: `capture` is `NonLocalValue`, `func` stores no data (is a static pointer to code)
35unsafe impl<C: NonLocalValue> NonLocalValue for TypedGetContentFn<C> {}
36
37// Manual (non-derive) impl required due to: https://github.com/rust-lang/rust/issues/70263
38impl<C: TraceRawVcs> TraceRawVcs for TypedGetContentFn<C> {
39    fn trace_raw_vcs(&self, trace_context: &mut TraceRawVcsContext) {
40        self.capture.trace_raw_vcs(trace_context);
41    }
42}
43
44trait TypedGetContentFnTrait: NonLocalValue + TraceRawVcs {
45    fn call(&self) -> OperationVc<ResolveSourceRequestResult>;
46}
47
48impl<C> TypedGetContentFnTrait for TypedGetContentFn<C>
49where
50    C: NonLocalValue + TraceRawVcs,
51{
52    fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
53        (self.func)(&self.capture)
54    }
55}
56
57/// A wrapper type returning [`OperationVc<ResolveSourceRequestResult>`][ResolveSourceRequestResult]
58/// that implements [`NonLocalValue`] and [`TraceRawVcs`].
59///
60/// The capture (e.g. moved values in a closure) and function pointer are stored separately to allow
61/// safe implementation of these desired traits.
62#[derive(NonLocalValue, TraceRawVcs)]
63pub struct GetContentFn {
64    inner: Box<dyn TypedGetContentFnTrait + Send + Sync>,
65}
66
67impl GetContentFn {
68    /// Wrap a function and an optional capture variable (used to simulate a closure) in
69    /// `GetContentFn`.
70    pub fn new<C>(
71        capture: C,
72        func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
73    ) -> Self
74    where
75        C: NonLocalValue + TraceRawVcs + Send + Sync + 'static,
76    {
77        Self {
78            inner: Box::new(TypedGetContentFn { capture, func }),
79        }
80    }
81}
82
83impl GetContentFn {
84    fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
85        self.inner.call()
86    }
87}
88
89async fn peek_issues<T: Send>(source: OperationVc<T>) -> Result<Vec<ReadRef<PlainIssue>>> {
90    let captured = source.peek_issues();
91
92    captured.get_plain_issues(IssueFilter::everything()).await
93}
94
95fn extend_issues(issues: &mut Vec<ReadRef<PlainIssue>>, new_issues: Vec<ReadRef<PlainIssue>>) {
96    for issue in new_issues {
97        if issues.contains(&issue) {
98            continue;
99        }
100
101        issues.push(issue);
102    }
103}
104
105#[turbo_tasks::function(operation)]
106fn versioned_content_update_operation(
107    content: ResolvedVc<Box<dyn VersionedContent>>,
108    from: ResolvedVc<Box<dyn Version>>,
109) -> Vc<Update> {
110    content.update(*from)
111}
112
113#[turbo_tasks::function(operation)]
114async fn get_update_stream_item_operation(
115    resource: RcStr,
116    from: ResolvedVc<VersionState>,
117    get_content: TransientInstance<GetContentFn>,
118) -> Result<Vc<UpdateStreamItem>> {
119    let content_op = get_content.call();
120    let content_result = content_op.read_strongly_consistent().await;
121    let mut plain_issues = peek_issues(content_op).await?;
122
123    let content_value = match content_result {
124        Ok(content) => content,
125        Err(e) => {
126            plain_issues.push(
127                PlainIssue::from_issue(
128                    Vc::upcast(
129                        FatalStreamIssue {
130                            resource,
131                            description: StyledString::Text(
132                                format!("{}", PrettyPrintError(&e)).into(),
133                            )
134                            .resolved_cell(),
135                        }
136                        .cell(),
137                    ),
138                    None,
139                )
140                .await?,
141            );
142
143            let update = Update::Total(TotalUpdate {
144                to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
145                    .into_trait_ref()
146                    .await?,
147            })
148            .cell();
149            return Ok(UpdateStreamItem::Found {
150                update: update.await?,
151                issues: plain_issues,
152            }
153            .cell());
154        }
155    };
156
157    match *content_value {
158        ResolveSourceRequestResult::Static(static_content_vc, _) => {
159            let static_content = static_content_vc.await?;
160
161            // This can happen when a chunk is removed from the asset graph.
162            if static_content.status_code == 404 {
163                return Ok(UpdateStreamItem::NotFound.cell());
164            }
165
166            let resolved_content = static_content.content;
167            let from = from.get().to_resolved().await?;
168            let update_op = versioned_content_update_operation(resolved_content, from);
169
170            extend_issues(&mut plain_issues, peek_issues(update_op).await?);
171
172            Ok(UpdateStreamItem::Found {
173                update: update_op.connect().await?,
174                issues: plain_issues,
175            }
176            .cell())
177        }
178        ResolveSourceRequestResult::HttpProxy(proxy_result_op) => {
179            let proxy_result_vc = proxy_result_op.connect();
180            let proxy_result_value = proxy_result_vc.await?;
181
182            if proxy_result_value.status == 404 {
183                return Ok(UpdateStreamItem::NotFound.cell());
184            }
185
186            extend_issues(&mut plain_issues, peek_issues(proxy_result_op).await?);
187
188            let from = from.get();
189            if let Some(from) =
190                ResolvedVc::try_downcast_type::<ProxyResult>(from.to_resolved().await?)
191                && from.await? == proxy_result_value
192            {
193                return Ok(UpdateStreamItem::Found {
194                    update: Update::None.cell().await?,
195                    issues: plain_issues,
196                }
197                .cell());
198            }
199
200            Ok(UpdateStreamItem::Found {
201                update: Update::Total(TotalUpdate {
202                    to: Vc::upcast::<Box<dyn Version>>(proxy_result_vc)
203                        .into_trait_ref()
204                        .await?,
205                })
206                .cell()
207                .await?,
208                issues: plain_issues,
209            }
210            .cell())
211        }
212        _ => {
213            let update = if plain_issues.is_empty() {
214                // Client requested a non-existing asset
215                // It might be removed in meantime, reload client
216                // TODO add special instructions for removed assets to handled it in a better
217                // way
218                Update::Total(TotalUpdate {
219                    to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
220                        .into_trait_ref()
221                        .await?,
222                })
223                .cell()
224            } else {
225                Update::None.cell()
226            };
227
228            Ok(UpdateStreamItem::Found {
229                update: update.await?,
230                issues: plain_issues,
231            }
232            .cell())
233        }
234    }
235}
236
237#[derive(TraceRawVcs)]
238struct ComputeUpdateStreamSender(
239    // HACK: `trace_ignore`: It's not correct or safe to send `Vc`s across this mpsc channel, but
240    // (without nightly auto traits) there's no easy way for us to statically assert that
241    // `UpdateStreamItem` does not contain a `RawVc`.
242    //
243    // It could be safe (at least for the GC use-case) if we had some way of wrapping arbitrary
244    // objects in a GC root container.
245    #[turbo_tasks(trace_ignore)] Sender<Result<ReadRef<UpdateStreamItem>>>,
246);
247
248/// This function sends an [`UpdateStreamItem`] to `sender` every time it gets recomputed by
249/// turbo-tasks due to invalidation.
250#[turbo_tasks::function]
251async fn compute_update_stream(
252    resource: RcStr,
253    from: ResolvedVc<VersionState>,
254    get_content: TransientInstance<GetContentFn>,
255    sender: TransientInstance<ComputeUpdateStreamSender>,
256) -> Vc<()> {
257    let item = get_update_stream_item_operation(resource, from, get_content)
258        .read_strongly_consistent()
259        .await;
260
261    // Send update. Ignore channel closed error.
262    let _ = sender.0.send(item).await;
263
264    Default::default()
265}
266
267pub(super) struct UpdateStream(
268    Pin<Box<dyn Stream<Item = Result<ReadRef<UpdateStreamItem>>> + Send + Sync>>,
269);
270
271impl UpdateStream {
272    #[tracing::instrument(skip(get_content), name = "UpdateStream::new")]
273    pub async fn new(
274        resource: RcStr,
275        get_content: TransientInstance<GetContentFn>,
276    ) -> Result<UpdateStream> {
277        let (sx, rx) = tokio::sync::mpsc::channel(32);
278
279        let content = get_content.call();
280        // We can ignore issues reported in content here since [compute_update_stream]
281        // will handle them
282        let version = match *content.connect().await? {
283            ResolveSourceRequestResult::Static(static_content, _) => {
284                static_content.await?.content.version()
285            }
286            ResolveSourceRequestResult::HttpProxy(proxy_result) => {
287                Vc::upcast(proxy_result.connect())
288            }
289            _ => Vc::upcast(NotFoundVersion::new()),
290        };
291        let version_state = VersionState::new(version.into_trait_ref().await?).await?;
292
293        let _ = compute_update_stream(
294            resource,
295            version_state,
296            get_content,
297            TransientInstance::new(ComputeUpdateStreamSender(sx)),
298        );
299
300        let mut last_had_issues = false;
301
302        let stream = ReceiverStream::new(rx).filter_map(move |item| {
303            {
304                let (has_issues, issues_changed) =
305                    if let Ok(UpdateStreamItem::Found { issues, .. }) = item.as_deref() {
306                        let has_issues = !issues.is_empty();
307                        let issues_changed = has_issues != last_had_issues;
308                        last_had_issues = has_issues;
309                        (has_issues, issues_changed)
310                    } else {
311                        (false, false)
312                    };
313
314                async move {
315                    match item.as_deref() {
316                        Ok(UpdateStreamItem::Found { update, .. }) => {
317                            match &**update {
318                                Update::Partial(PartialUpdate { to, .. })
319                                | Update::Total(TotalUpdate { to }) => {
320                                    version_state
321                                        .set(to.clone())
322                                        .await
323                                        .expect("failed to update version");
324
325                                    Some(item)
326                                }
327                                // Do not propagate empty updates.
328                                Update::None | Update::Missing => {
329                                    if has_issues || issues_changed {
330                                        Some(item)
331                                    } else {
332                                        None
333                                    }
334                                }
335                            }
336                        }
337                        _ => {
338                            // Propagate other updates
339                            Some(item)
340                        }
341                    }
342                }
343                .in_current_span()
344            }
345            .in_current_span()
346        });
347
348        Ok(UpdateStream(Box::pin(stream)))
349    }
350}
351
352impl Stream for UpdateStream {
353    type Item = Result<ReadRef<UpdateStreamItem>>;
354
355    fn poll_next(
356        self: Pin<&mut Self>,
357        cx: &mut std::task::Context<'_>,
358    ) -> std::task::Poll<Option<Self::Item>> {
359        Pin::new(&mut self.get_mut().0).poll_next(cx)
360    }
361}
362
363#[turbo_tasks::value(serialization = "none")]
364#[derive(Debug)]
365pub enum UpdateStreamItem {
366    NotFound,
367    Found {
368        update: ReadRef<Update>,
369        issues: Vec<ReadRef<PlainIssue>>,
370    },
371}
372
373#[turbo_tasks::value(serialization = "none")]
374struct FatalStreamIssue {
375    description: ResolvedVc<StyledString>,
376    resource: RcStr,
377}
378
379#[turbo_tasks::value_impl]
380impl Issue for FatalStreamIssue {
381    fn severity(&self) -> IssueSeverity {
382        IssueSeverity::Fatal
383    }
384
385    #[turbo_tasks::function]
386    fn stage(&self) -> Vc<IssueStage> {
387        IssueStage::Other(rcstr!("websocket")).cell()
388    }
389
390    #[turbo_tasks::function]
391    async fn file_path(&self) -> Result<Vc<FileSystemPath>> {
392        Ok(ServerFileSystem::new()
393            .root()
394            .await?
395            .join(&self.resource)?
396            .cell())
397    }
398
399    #[turbo_tasks::function]
400    fn title(&self) -> Vc<StyledString> {
401        StyledString::Text(rcstr!("Fatal error while getting content to stream")).cell()
402    }
403
404    #[turbo_tasks::function]
405    fn description(&self) -> Vc<OptionStyledString> {
406        Vc::cell(Some(self.description))
407    }
408}
409
410#[cfg(test)]
411pub mod test {
412    use std::sync::{
413        Arc,
414        atomic::{AtomicI32, Ordering},
415    };
416
417    use turbo_tasks::TurboTasks;
418    use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
419
420    use super::*;
421
422    #[turbo_tasks::function(operation)]
423    pub fn noop_operation() -> Vc<ResolveSourceRequestResult> {
424        ResolveSourceRequestResult::NotFound.cell()
425    }
426
427    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
428    async fn test_get_content_fn() {
429        let tt = TurboTasks::new(TurboTasksBackend::new(
430            BackendOptions::default(),
431            noop_backing_storage(),
432        ));
433        tt.run_once(async move {
434            let number = Arc::new(AtomicI32::new(0));
435            fn func(number: &Arc<AtomicI32>) -> OperationVc<ResolveSourceRequestResult> {
436                number.store(42, Ordering::SeqCst);
437                noop_operation()
438            }
439            let wrapped_func = GetContentFn::new(number.clone(), func);
440            let return_value = wrapped_func
441                .call()
442                .read_strongly_consistent()
443                .await
444                .unwrap();
445            assert_eq!(number.load(Ordering::SeqCst), 42);
446            // ResolveSourceRequestResult doesn't impl Debug
447            assert!(*return_value == ResolveSourceRequestResult::NotFound);
448            Ok(())
449        })
450        .await
451        .unwrap();
452    }
453}