turbopack_dev_server/update/
stream.rs1use std::pin::Pin;
2
3use anyhow::Result;
4use futures::prelude::*;
5use tokio::sync::mpsc::Sender;
6use tokio_stream::wrappers::ReceiverStream;
7use tracing::Instrument;
8use turbo_rcstr::{RcStr, rcstr};
9use turbo_tasks::{
10 NonLocalValue, OperationVc, PrettyPrintError, ReadRef, ResolvedVc, TransientInstance, Vc,
11 trace::{TraceRawVcs, TraceRawVcsContext},
12};
13use turbo_tasks_fs::{FileSystem, FileSystemPath};
14use turbopack_core::{
15 issue::{
16 CollectibleIssuesExt, Issue, IssueFilter, IssueSeverity, IssueStage, OptionStyledString,
17 PlainIssue, StyledString,
18 },
19 server_fs::ServerFileSystem,
20 version::{
21 NotFoundVersion, PartialUpdate, TotalUpdate, Update, Version, VersionState,
22 VersionedContent,
23 },
24};
25
26use crate::source::{ProxyResult, resolve::ResolveSourceRequestResult};
27
28struct TypedGetContentFn<C> {
29 capture: C,
30 func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
31}
32
33unsafe impl<C: NonLocalValue> NonLocalValue for TypedGetContentFn<C> {}
36
37impl<C: TraceRawVcs> TraceRawVcs for TypedGetContentFn<C> {
39 fn trace_raw_vcs(&self, trace_context: &mut TraceRawVcsContext) {
40 self.capture.trace_raw_vcs(trace_context);
41 }
42}
43
44trait TypedGetContentFnTrait: NonLocalValue + TraceRawVcs {
45 fn call(&self) -> OperationVc<ResolveSourceRequestResult>;
46}
47
48impl<C> TypedGetContentFnTrait for TypedGetContentFn<C>
49where
50 C: NonLocalValue + TraceRawVcs,
51{
52 fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
53 (self.func)(&self.capture)
54 }
55}
56
57#[derive(NonLocalValue, TraceRawVcs)]
63pub struct GetContentFn {
64 inner: Box<dyn TypedGetContentFnTrait + Send + Sync>,
65}
66
67impl GetContentFn {
68 pub fn new<C>(
71 capture: C,
72 func: for<'a> fn(&'a C) -> OperationVc<ResolveSourceRequestResult>,
73 ) -> Self
74 where
75 C: NonLocalValue + TraceRawVcs + Send + Sync + 'static,
76 {
77 Self {
78 inner: Box::new(TypedGetContentFn { capture, func }),
79 }
80 }
81}
82
83impl GetContentFn {
84 fn call(&self) -> OperationVc<ResolveSourceRequestResult> {
85 self.inner.call()
86 }
87}
88
89async fn peek_issues<T: Send>(source: OperationVc<T>) -> Result<Vec<ReadRef<PlainIssue>>> {
90 let captured = source.peek_issues();
91
92 captured.get_plain_issues(IssueFilter::everything()).await
93}
94
95fn extend_issues(issues: &mut Vec<ReadRef<PlainIssue>>, new_issues: Vec<ReadRef<PlainIssue>>) {
96 for issue in new_issues {
97 if issues.contains(&issue) {
98 continue;
99 }
100
101 issues.push(issue);
102 }
103}
104
105#[turbo_tasks::function(operation)]
106fn versioned_content_update_operation(
107 content: ResolvedVc<Box<dyn VersionedContent>>,
108 from: ResolvedVc<Box<dyn Version>>,
109) -> Vc<Update> {
110 content.update(*from)
111}
112
113#[turbo_tasks::function(operation)]
114async fn get_update_stream_item_operation(
115 resource: RcStr,
116 from: ResolvedVc<VersionState>,
117 get_content: TransientInstance<GetContentFn>,
118) -> Result<Vc<UpdateStreamItem>> {
119 let content_op = get_content.call();
120 let content_result = content_op.read_strongly_consistent().await;
121 let mut plain_issues = peek_issues(content_op).await?;
122
123 let content_value = match content_result {
124 Ok(content) => content,
125 Err(e) => {
126 plain_issues.push(
127 PlainIssue::from_issue(
128 Vc::upcast(
129 FatalStreamIssue {
130 resource,
131 description: StyledString::Text(
132 format!("{}", PrettyPrintError(&e)).into(),
133 )
134 .resolved_cell(),
135 }
136 .cell(),
137 ),
138 None,
139 )
140 .await?,
141 );
142
143 let update = Update::Total(TotalUpdate {
144 to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
145 .into_trait_ref()
146 .await?,
147 })
148 .cell();
149 return Ok(UpdateStreamItem::Found {
150 update: update.await?,
151 issues: plain_issues,
152 }
153 .cell());
154 }
155 };
156
157 match *content_value {
158 ResolveSourceRequestResult::Static(static_content_vc, _) => {
159 let static_content = static_content_vc.await?;
160
161 if static_content.status_code == 404 {
163 return Ok(UpdateStreamItem::NotFound.cell());
164 }
165
166 let resolved_content = static_content.content;
167 let from = from.get().to_resolved().await?;
168 let update_op = versioned_content_update_operation(resolved_content, from);
169
170 extend_issues(&mut plain_issues, peek_issues(update_op).await?);
171
172 Ok(UpdateStreamItem::Found {
173 update: update_op.connect().await?,
174 issues: plain_issues,
175 }
176 .cell())
177 }
178 ResolveSourceRequestResult::HttpProxy(proxy_result_op) => {
179 let proxy_result_vc = proxy_result_op.connect();
180 let proxy_result_value = proxy_result_vc.await?;
181
182 if proxy_result_value.status == 404 {
183 return Ok(UpdateStreamItem::NotFound.cell());
184 }
185
186 extend_issues(&mut plain_issues, peek_issues(proxy_result_op).await?);
187
188 let from = from.get();
189 if let Some(from) =
190 ResolvedVc::try_downcast_type::<ProxyResult>(from.to_resolved().await?)
191 && from.await? == proxy_result_value
192 {
193 return Ok(UpdateStreamItem::Found {
194 update: Update::None.cell().await?,
195 issues: plain_issues,
196 }
197 .cell());
198 }
199
200 Ok(UpdateStreamItem::Found {
201 update: Update::Total(TotalUpdate {
202 to: Vc::upcast::<Box<dyn Version>>(proxy_result_vc)
203 .into_trait_ref()
204 .await?,
205 })
206 .cell()
207 .await?,
208 issues: plain_issues,
209 }
210 .cell())
211 }
212 _ => {
213 let update = if plain_issues.is_empty() {
214 Update::Total(TotalUpdate {
219 to: Vc::upcast::<Box<dyn Version>>(NotFoundVersion::new())
220 .into_trait_ref()
221 .await?,
222 })
223 .cell()
224 } else {
225 Update::None.cell()
226 };
227
228 Ok(UpdateStreamItem::Found {
229 update: update.await?,
230 issues: plain_issues,
231 }
232 .cell())
233 }
234 }
235}
236
237#[derive(TraceRawVcs)]
238struct ComputeUpdateStreamSender(
239 #[turbo_tasks(trace_ignore)] Sender<Result<ReadRef<UpdateStreamItem>>>,
246);
247
248#[turbo_tasks::function]
251async fn compute_update_stream(
252 resource: RcStr,
253 from: ResolvedVc<VersionState>,
254 get_content: TransientInstance<GetContentFn>,
255 sender: TransientInstance<ComputeUpdateStreamSender>,
256) -> Vc<()> {
257 let item = get_update_stream_item_operation(resource, from, get_content)
258 .read_strongly_consistent()
259 .await;
260
261 let _ = sender.0.send(item).await;
263
264 Default::default()
265}
266
267pub(super) struct UpdateStream(
268 Pin<Box<dyn Stream<Item = Result<ReadRef<UpdateStreamItem>>> + Send + Sync>>,
269);
270
271impl UpdateStream {
272 #[tracing::instrument(skip(get_content), name = "UpdateStream::new")]
273 pub async fn new(
274 resource: RcStr,
275 get_content: TransientInstance<GetContentFn>,
276 ) -> Result<UpdateStream> {
277 let (sx, rx) = tokio::sync::mpsc::channel(32);
278
279 let content = get_content.call();
280 let version = match *content.connect().await? {
283 ResolveSourceRequestResult::Static(static_content, _) => {
284 static_content.await?.content.version()
285 }
286 ResolveSourceRequestResult::HttpProxy(proxy_result) => {
287 Vc::upcast(proxy_result.connect())
288 }
289 _ => Vc::upcast(NotFoundVersion::new()),
290 };
291 let version_state = VersionState::new(version.into_trait_ref().await?).await?;
292
293 let _ = compute_update_stream(
294 resource,
295 version_state,
296 get_content,
297 TransientInstance::new(ComputeUpdateStreamSender(sx)),
298 );
299
300 let mut last_had_issues = false;
301
302 let stream = ReceiverStream::new(rx).filter_map(move |item| {
303 {
304 let (has_issues, issues_changed) =
305 if let Ok(UpdateStreamItem::Found { issues, .. }) = item.as_deref() {
306 let has_issues = !issues.is_empty();
307 let issues_changed = has_issues != last_had_issues;
308 last_had_issues = has_issues;
309 (has_issues, issues_changed)
310 } else {
311 (false, false)
312 };
313
314 async move {
315 match item.as_deref() {
316 Ok(UpdateStreamItem::Found { update, .. }) => {
317 match &**update {
318 Update::Partial(PartialUpdate { to, .. })
319 | Update::Total(TotalUpdate { to }) => {
320 version_state
321 .set(to.clone())
322 .await
323 .expect("failed to update version");
324
325 Some(item)
326 }
327 Update::None | Update::Missing => {
329 if has_issues || issues_changed {
330 Some(item)
331 } else {
332 None
333 }
334 }
335 }
336 }
337 _ => {
338 Some(item)
340 }
341 }
342 }
343 .in_current_span()
344 }
345 .in_current_span()
346 });
347
348 Ok(UpdateStream(Box::pin(stream)))
349 }
350}
351
352impl Stream for UpdateStream {
353 type Item = Result<ReadRef<UpdateStreamItem>>;
354
355 fn poll_next(
356 self: Pin<&mut Self>,
357 cx: &mut std::task::Context<'_>,
358 ) -> std::task::Poll<Option<Self::Item>> {
359 Pin::new(&mut self.get_mut().0).poll_next(cx)
360 }
361}
362
363#[turbo_tasks::value(serialization = "none")]
364#[derive(Debug)]
365pub enum UpdateStreamItem {
366 NotFound,
367 Found {
368 update: ReadRef<Update>,
369 issues: Vec<ReadRef<PlainIssue>>,
370 },
371}
372
373#[turbo_tasks::value(serialization = "none")]
374struct FatalStreamIssue {
375 description: ResolvedVc<StyledString>,
376 resource: RcStr,
377}
378
379#[turbo_tasks::value_impl]
380impl Issue for FatalStreamIssue {
381 fn severity(&self) -> IssueSeverity {
382 IssueSeverity::Fatal
383 }
384
385 #[turbo_tasks::function]
386 fn stage(&self) -> Vc<IssueStage> {
387 IssueStage::Other(rcstr!("websocket")).cell()
388 }
389
390 #[turbo_tasks::function]
391 async fn file_path(&self) -> Result<Vc<FileSystemPath>> {
392 Ok(ServerFileSystem::new()
393 .root()
394 .await?
395 .join(&self.resource)?
396 .cell())
397 }
398
399 #[turbo_tasks::function]
400 fn title(&self) -> Vc<StyledString> {
401 StyledString::Text(rcstr!("Fatal error while getting content to stream")).cell()
402 }
403
404 #[turbo_tasks::function]
405 fn description(&self) -> Vc<OptionStyledString> {
406 Vc::cell(Some(self.description))
407 }
408}
409
410#[cfg(test)]
411pub mod test {
412 use std::sync::{
413 Arc,
414 atomic::{AtomicI32, Ordering},
415 };
416
417 use turbo_tasks::TurboTasks;
418 use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
419
420 use super::*;
421
422 #[turbo_tasks::function(operation)]
423 pub fn noop_operation() -> Vc<ResolveSourceRequestResult> {
424 ResolveSourceRequestResult::NotFound.cell()
425 }
426
427 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
428 async fn test_get_content_fn() {
429 let tt = TurboTasks::new(TurboTasksBackend::new(
430 BackendOptions::default(),
431 noop_backing_storage(),
432 ));
433 tt.run_once(async move {
434 let number = Arc::new(AtomicI32::new(0));
435 fn func(number: &Arc<AtomicI32>) -> OperationVc<ResolveSourceRequestResult> {
436 number.store(42, Ordering::SeqCst);
437 noop_operation()
438 }
439 let wrapped_func = GetContentFn::new(number.clone(), func);
440 let return_value = wrapped_func
441 .call()
442 .read_strongly_consistent()
443 .await
444 .unwrap();
445 assert_eq!(number.load(Ordering::SeqCst), 42);
446 assert!(*return_value == ResolveSourceRequestResult::NotFound);
448 Ok(())
449 })
450 .await
451 .unwrap();
452 }
453}