Skip to main content

turbo_tasks/
raw_vc.rs

1use std::{
2    fmt::{Debug, Display},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6    task::Poll,
7};
8
9use anyhow::Result;
10use auto_hash_map::AutoSet;
11use bincode::{Decode, Encode};
12use serde::{Deserialize, Serialize};
13
14use crate::{
15    CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
16    TaskPersistence, TraitTypeId, ValueTypeId, VcValueTrait,
17    backend::TypedCellContent,
18    event::EventListener,
19    id::{ExecutionId, LocalTaskId},
20    manager::{
21        ReadCellTracking, ReadTracking, SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK,
22        TurboTasksApi, read_local_output, read_task_output, with_turbo_tasks,
23    },
24    registry::{self, get_value_type},
25    turbo_tasks,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
29pub struct CellId {
30    pub type_id: ValueTypeId,
31    pub index: u32,
32}
33
34impl Display for CellId {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(
37            f,
38            "{}#{}",
39            registry::get_value_type(self.type_id).name,
40            self.index
41        )
42    }
43}
44
45/// A type-erased representation of [`Vc`][crate::Vc].
46///
47/// Type erasure reduces the [monomorphization] (and therefore binary size and compilation time)
48/// required to support [`Vc`][crate::Vc].
49///
50/// This type is heavily used within the [`Backend`][crate::backend::Backend] trait, but should
51/// otherwise be treated as an internal implementation detail of `turbo-tasks`.
52///
53/// [monomorphization]: https://doc.rust-lang.org/book/ch10-01-syntax.html#performance-of-code-using-generics
54#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
55pub enum RawVc {
56    /// The synchronous return value of a task (after argument resolution). This is the
57    /// representation used by [`OperationVc`][crate::OperationVc].
58    TaskOutput(TaskId),
59    /// A pointer to a specific [`Vc::cell`][crate::Vc::cell] or `.cell()` call within a task. This
60    /// is the representation used by [`ResolvedVc`].
61    ///
62    /// [`CellId`] contains the [`ValueTypeId`], which can be useful for efficient downcasting.
63    TaskCell(TaskId, CellId),
64    /// The synchronous return value of a local task. This is created when a function is called
65    /// with unresolved arguments or more explicitly with
66    /// [`#[turbo_tasks::function(local)]`][crate::function].
67    ///
68    /// Local outputs are only valid within the context of their parent "non-local" task. Turbo
69    /// Task's APIs are designed to prevent escapes of local [`Vc`]s, but [`ExecutionId`] is used
70    /// for a fallback runtime assertion.
71    LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
72}
73
74impl Debug for RawVc {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        match self {
77            RawVc::TaskOutput(task_id) => f
78                .debug_tuple("RawVc::TaskOutput")
79                .field(&**task_id)
80                .finish(),
81            RawVc::TaskCell(task_id, cell_id) => f
82                .debug_tuple("RawVc::TaskCell")
83                .field(&**task_id)
84                .field(&cell_id.to_string())
85                .finish(),
86            RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
87                .debug_tuple("RawVc::LocalOutput")
88                .field(&**execution_id)
89                .field(&**local_task_id)
90                .field(task_persistence)
91                .finish(),
92        }
93    }
94}
95
96impl Display for RawVc {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match self {
99            RawVc::TaskOutput(task_id) => write!(f, "output of task {}", **task_id),
100            RawVc::TaskCell(task_id, cell_id) => {
101                write!(f, "{} of task {}", cell_id, **task_id)
102            }
103            RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => write!(
104                f,
105                "output of local task {} ({}, {})",
106                **local_task_id, **execution_id, task_persistence
107            ),
108        }
109    }
110}
111
112impl RawVc {
113    pub fn is_resolved(&self) -> bool {
114        match self {
115            RawVc::TaskOutput(..) => false,
116            RawVc::TaskCell(..) => true,
117            RawVc::LocalOutput(..) => false,
118        }
119    }
120
121    pub fn is_local(&self) -> bool {
122        match self {
123            RawVc::TaskOutput(..) => false,
124            RawVc::TaskCell(..) => false,
125            RawVc::LocalOutput(..) => true,
126        }
127    }
128
129    /// Returns `true` if the task this `RawVc` reads from cannot be serialized and will not be
130    /// stored in the filesystem cache.
131    ///
132    /// See [`TaskPersistence`] for more details.
133    pub fn is_transient(&self) -> bool {
134        match self {
135            RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
136            RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
137        }
138    }
139
140    pub(crate) fn into_read(self, is_serializable_cell_content: bool) -> ReadRawVcFuture {
141        // returns a custom future to have something concrete and sized
142        // this avoids boxing in IntoFuture
143        ReadRawVcFuture::new(self, Some(is_serializable_cell_content))
144    }
145
146    pub(crate) fn into_read_with_unknown_is_serializable_cell_content(self) -> ReadRawVcFuture {
147        // returns a custom future to have something concrete and sized
148        // this avoids boxing in IntoFuture
149        ReadRawVcFuture::new(self, None)
150    }
151
152    /// See [`crate::Vc::resolve`].
153    pub(crate) async fn resolve(self) -> Result<RawVc> {
154        self.resolve_inner(ReadOutputOptions {
155            consistency: ReadConsistency::Eventual,
156            ..Default::default()
157        })
158        .await
159    }
160
161    /// See [`crate::Vc::resolve_strongly_consistent`].
162    pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
163        SuppressTopLevelTaskCheckFuture {
164            inner: self.resolve_inner(ReadOutputOptions {
165                consistency: ReadConsistency::Strong,
166                ..Default::default()
167            }),
168        }
169        .await
170    }
171
172    async fn resolve_inner(self, mut options: ReadOutputOptions) -> Result<RawVc> {
173        let tt = turbo_tasks();
174        let mut current = self;
175        loop {
176            match current {
177                RawVc::TaskOutput(task) => {
178                    current = read_task_output(&*tt, task, options).await?;
179                    // We no longer need to read strongly consistent, as any Vc returned
180                    // from the first task will be inside of the scope of the first
181                    // task. So it's already strongly consistent.
182                    options.consistency = ReadConsistency::Eventual;
183                }
184                RawVc::TaskCell(_, _) => return Ok(current),
185                RawVc::LocalOutput(execution_id, local_task_id, ..) => {
186                    debug_assert_eq!(options.consistency, ReadConsistency::Eventual);
187                    current = read_local_output(&*tt, execution_id, local_task_id).await?;
188                }
189            }
190        }
191    }
192
193    /// Convert a potentially local `RawVc` into a non-local `RawVc`. This is a subset of resolution
194    /// resolution, because the returned `RawVc` can be a `TaskOutput`.
195    pub(crate) async fn to_non_local(self) -> Result<RawVc> {
196        Ok(match self {
197            RawVc::LocalOutput(execution_id, local_task_id, ..) => {
198                let tt = turbo_tasks();
199                let local_output = read_local_output(&*tt, execution_id, local_task_id).await?;
200                debug_assert!(
201                    !matches!(local_output, RawVc::LocalOutput(_, _, _)),
202                    "a LocalOutput cannot point at other LocalOutputs"
203                );
204                local_output
205            }
206            non_local => non_local,
207        })
208    }
209
210    pub(crate) fn connect(&self) {
211        let RawVc::TaskOutput(task_id) = self else {
212            panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
213        };
214        let tt = turbo_tasks();
215        tt.connect_task(*task_id);
216    }
217
218    pub fn try_get_task_id(&self) -> Option<TaskId> {
219        match self {
220            RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
221            RawVc::LocalOutput(..) => None,
222        }
223    }
224
225    pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
226        match self {
227            RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
228            RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
229        }
230    }
231
232    /// For a cell that's already resolved, synchronously check if it implements a trait using the
233    /// type information in `RawVc::TaskCell` (we don't actually need to read the cell!).
234    pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
235        match self {
236            RawVc::TaskCell(_task_id, cell_id) => {
237                get_value_type(cell_id.type_id).has_trait(&trait_id)
238            }
239            _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
240        }
241    }
242
243    /// For a cell that's already resolved, synchronously check if it is a given type using the type
244    /// information in `RawVc::TaskCell` (we don't actually need to read the cell!).
245    pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
246        match self {
247            RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
248            _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
249        }
250    }
251}
252
253/// This implementation of `CollectiblesSource` assumes that `self` is a `RawVc::TaskOutput`.
254impl CollectiblesSource for RawVc {
255    fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
256        let RawVc::TaskOutput(task_id) = self else {
257            panic!(
258                "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
259                 RawVc::TaskOutput"
260            );
261        };
262        let tt = turbo_tasks();
263        let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
264        map.into_iter()
265            .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
266            .collect()
267    }
268
269    fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
270        let RawVc::TaskOutput(task_id) = self else {
271            panic!(
272                "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
273                 RawVc::TaskOutput"
274            );
275        };
276        let tt = turbo_tasks();
277        let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
278        tt.unemit_collectibles(T::get_trait_type_id(), &map);
279        map.into_iter()
280            .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
281            .collect()
282    }
283
284    fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
285        let RawVc::TaskOutput(task_id) = self else {
286            panic!(
287                "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
288                 RawVc::TaskOutput"
289            );
290        };
291        let tt = turbo_tasks();
292        let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
293        tt.unemit_collectibles(T::get_trait_type_id(), &map);
294    }
295}
296
297/// A future wrapper that suppresses the top-level task eventual consistency check
298/// during each [`poll`][Future::poll] call. The suppression is applied via
299/// [`sync_scope`][tokio::task_local!] so it is only active during the synchronous
300/// execution of the inner future's `poll`, and is never held across await points.
301struct SuppressTopLevelTaskCheckFuture<F> {
302    inner: F,
303}
304
305impl<F: Future> Future for SuppressTopLevelTaskCheckFuture<F> {
306    type Output = F::Output;
307
308    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
309        // SAFETY: we are only projecting the pin to the inner field, not moving it
310        let inner = unsafe { self.map_unchecked_mut(|this| &mut this.inner) };
311        if cfg!(debug_assertions) {
312            SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, || inner.poll(cx))
313        } else {
314            inner.poll(cx)
315        }
316    }
317}
318
319pub struct ReadRawVcFuture {
320    current: RawVc,
321    read_output_options: ReadOutputOptions,
322    read_cell_options: ReadCellOptions,
323    is_serializable_cell_content_unknown: bool,
324    /// This flag redundant with `read_output_options`, but `read_output_options` is mutated during
325    /// the read. This flag indicates that the initial read was strongly consistent.
326    strongly_consistent: bool,
327    listener: Option<EventListener>,
328}
329
330impl ReadRawVcFuture {
331    pub(crate) fn new(vc: RawVc, is_serializable_cell_content: Option<bool>) -> Self {
332        ReadRawVcFuture {
333            current: vc,
334            read_output_options: ReadOutputOptions::default(),
335            read_cell_options: ReadCellOptions {
336                is_serializable_cell_content: is_serializable_cell_content.unwrap_or(false),
337                ..Default::default()
338            },
339            is_serializable_cell_content_unknown: is_serializable_cell_content.is_none(),
340            strongly_consistent: false,
341            listener: None,
342        }
343    }
344
345    /// Make reads strongly consistent.
346    pub fn strongly_consistent(mut self) -> Self {
347        self.read_output_options.consistency = ReadConsistency::Strong;
348        self.strongly_consistent = true;
349        self
350    }
351
352    /// Track the value as a dependency with an key.
353    pub fn track_with_key(mut self, key: u64) -> Self {
354        self.read_output_options.tracking = ReadTracking::Tracked;
355        self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) };
356        self
357    }
358
359    /// This will not track the value as dependency, but will still track the error as dependency,
360    /// if there is an error.
361    ///
362    /// INVALIDATION: Be careful with this, it will not track dependencies, so
363    /// using it could break cache invalidation.
364    pub fn untracked(mut self) -> Self {
365        self.read_output_options.tracking = ReadTracking::TrackOnlyError;
366        self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError;
367        self
368    }
369
370    /// Hint that this is the final read of the cell content.
371    pub fn final_read_hint(mut self) -> Self {
372        self.read_cell_options.final_read_hint = true;
373        self
374    }
375}
376
377impl Future for ReadRawVcFuture {
378    type Output = Result<TypedCellContent>;
379
380    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
381        // SAFETY: we are not moving self
382        let this = unsafe { self.get_unchecked_mut() };
383
384        // Extract the closure to avoid deep nesting
385        let poll_fn = |tt: &Arc<dyn TurboTasksApi>| -> Poll<Self::Output> {
386            'outer: loop {
387                if let Some(listener) = &mut this.listener {
388                    // SAFETY: listener is from previous pinned this
389                    let listener = unsafe { Pin::new_unchecked(listener) };
390                    if listener.poll(cx).is_pending() {
391                        return Poll::Pending;
392                    }
393                    this.listener = None;
394                }
395                let mut listener = match this.current {
396                    RawVc::TaskOutput(task) => {
397                        let read_result = tt.try_read_task_output(task, this.read_output_options);
398                        match read_result {
399                            Ok(Ok(vc)) => {
400                                // turbo-tasks-backend doesn't currently have any sort of
401                                // "transaction" or global lock mechanism to group together chains
402                                // of `TaskOutput`/`TaskCell` reads.
403                                //
404                                // If we ignore the theoretical TOCTOU issues, we no longer need to
405                                // read strongly consistent, as any Vc returned from the first task
406                                // will be inside of the scope of the first task. So it's already
407                                // strongly consistent.
408                                this.read_output_options.consistency = ReadConsistency::Eventual;
409                                this.current = vc;
410                                continue 'outer;
411                            }
412                            Ok(Err(listener)) => listener,
413                            Err(err) => return Poll::Ready(Err(err)),
414                        }
415                    }
416                    RawVc::TaskCell(task, index) => {
417                        if this.is_serializable_cell_content_unknown {
418                            let value_type = registry::get_value_type(index.type_id);
419                            this.read_cell_options.is_serializable_cell_content =
420                                value_type.bincode.is_some();
421                        }
422                        let read_result =
423                            tt.try_read_task_cell(task, index, this.read_cell_options);
424                        match read_result {
425                            Ok(Ok(content)) => {
426                                // SAFETY: Constructor ensures that T and U are binary identical
427                                return Poll::Ready(Ok(content));
428                            }
429                            Ok(Err(listener)) => listener,
430                            Err(err) => return Poll::Ready(Err(err)),
431                        }
432                    }
433                    RawVc::LocalOutput(execution_id, local_output_id, ..) => {
434                        debug_assert_eq!(
435                            this.read_output_options.consistency,
436                            ReadConsistency::Eventual
437                        );
438                        let read_result = tt.try_read_local_output(execution_id, local_output_id);
439                        match read_result {
440                            Ok(Ok(vc)) => {
441                                this.current = vc;
442                                continue 'outer;
443                            }
444                            Ok(Err(listener)) => listener,
445                            Err(err) => return Poll::Ready(Err(err)),
446                        }
447                    }
448                };
449                // SAFETY: listener is from previous pinned this
450                match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
451                    Poll::Ready(_) => continue,
452                    Poll::Pending => {
453                        this.listener = Some(listener);
454                        return Poll::Pending;
455                    }
456                };
457            }
458        };
459
460        fn suppress_top_level_task_check<R>(strongly_consistent: bool, f: impl FnOnce() -> R) -> R {
461            if cfg!(debug_assertions) && strongly_consistent {
462                // Temporarily suppress the top-level task check
463                SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, f)
464            } else {
465                f()
466            }
467        }
468
469        // HACK: Temporarily suppress top-level task check if doing strongly consistent read.
470        //
471        // This masks a bug: There's an unlikely TOCTOU race condition in `poll_fn`. Because the
472        // strongly consistent read isn't a single atomic operation, any inner `TaskOutput` or
473        // `TaskCell` could get mutated after the strongly consistent read of the outer
474        // `TaskOutput`.
475        suppress_top_level_task_check(this.strongly_consistent, || with_turbo_tasks(poll_fn))
476    }
477}
478
479impl Unpin for ReadRawVcFuture {}