Skip to main content

turbo_tasks/
raw_vc.rs

1use std::{
2    fmt::{Debug, Display},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6    task::Poll,
7};
8
9use anyhow::Result;
10use auto_hash_map::AutoSet;
11use bincode::{Decode, Encode};
12use serde::{Deserialize, Serialize};
13
14use crate::{
15    CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
16    TaskPersistence, TraitTypeId, ValueTypeId, VcValueTrait,
17    backend::TypedCellContent,
18    event::EventListener,
19    id::{ExecutionId, LocalTaskId},
20    manager::{
21        ReadCellTracking, ReadTracking, SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK,
22        TurboTasksApi, read_local_output, read_task_output, with_turbo_tasks,
23    },
24    registry::{self, get_value_type},
25    turbo_tasks,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
29pub struct CellId {
30    pub type_id: ValueTypeId,
31    pub index: u32,
32}
33
34impl Display for CellId {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(
37            f,
38            "{}#{}",
39            registry::get_value_type(self.type_id).ty.name,
40            self.index
41        )
42    }
43}
44
45/// A type-erased representation of [`Vc`].
46///
47/// Type erasure reduces the [monomorphization] (and therefore binary size and compilation time)
48/// required to support [`Vc`].
49///
50/// This type is heavily used within the [`Backend`][crate::backend::Backend] trait, but should
51/// otherwise be treated as an internal implementation detail of `turbo-tasks`.
52///
53/// [`Vc`]: crate::Vc
54/// [monomorphization]: https://doc.rust-lang.org/book/ch10-01-syntax.html#performance-of-code-using-generics
55#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
56pub enum RawVc {
57    /// The synchronous return value of a task (after argument resolution). This is the
58    /// representation used by [`OperationVc`][crate::OperationVc].
59    TaskOutput(TaskId),
60    /// A pointer to a specific [`Vc::cell`][crate::Vc::cell] or `.cell()` call within a task. This
61    /// is the representation used by [`ResolvedVc`].
62    ///
63    /// [`CellId`] contains the [`ValueTypeId`], which can be useful for efficient downcasting.
64    TaskCell(TaskId, CellId),
65    /// The synchronous return value of a local task. This is created when a function is called
66    /// with unresolved arguments or more explicitly with
67    /// [`#[turbo_tasks::function(local)]`][crate::function].
68    ///
69    /// Local outputs are only valid within the context of their parent "non-local" task. Turbo
70    /// Task's APIs are designed to prevent escapes of local [`Vc`]s, but [`ExecutionId`] is used
71    /// for a fallback runtime assertion.
72    ///
73    /// [`Vc`]: crate::Vc
74    LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
75}
76
77impl Debug for RawVc {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            RawVc::TaskOutput(task_id) => f
81                .debug_tuple("RawVc::TaskOutput")
82                .field(&**task_id)
83                .finish(),
84            RawVc::TaskCell(task_id, cell_id) => f
85                .debug_tuple("RawVc::TaskCell")
86                .field(&**task_id)
87                .field(&cell_id.to_string())
88                .finish(),
89            RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
90                .debug_tuple("RawVc::LocalOutput")
91                .field(&**execution_id)
92                .field(&**local_task_id)
93                .field(task_persistence)
94                .finish(),
95        }
96    }
97}
98
99impl Display for RawVc {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            RawVc::TaskOutput(task_id) => write!(f, "output of task {}", **task_id),
103            RawVc::TaskCell(task_id, cell_id) => {
104                write!(f, "{} of task {}", cell_id, **task_id)
105            }
106            RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => write!(
107                f,
108                "output of local task {} ({}, {})",
109                **local_task_id, **execution_id, task_persistence
110            ),
111        }
112    }
113}
114
115impl RawVc {
116    pub fn is_resolved(&self) -> bool {
117        match self {
118            RawVc::TaskOutput(..) => false,
119            RawVc::TaskCell(..) => true,
120            RawVc::LocalOutput(..) => false,
121        }
122    }
123
124    pub fn is_local(&self) -> bool {
125        match self {
126            RawVc::TaskOutput(..) => false,
127            RawVc::TaskCell(..) => false,
128            RawVc::LocalOutput(..) => true,
129        }
130    }
131
132    /// Returns `true` if the task this `RawVc` reads from cannot be serialized and will not be
133    /// stored in the filesystem cache.
134    ///
135    /// See [`TaskPersistence`] for more details.
136    pub fn is_transient(&self) -> bool {
137        match self {
138            RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
139            RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
140        }
141    }
142
143    pub(crate) fn into_read(self, is_serializable_cell_content: bool) -> ReadRawVcFuture {
144        // returns a custom future to have something concrete and sized
145        // this avoids boxing in IntoFuture
146        ReadRawVcFuture::new(self, Some(is_serializable_cell_content))
147    }
148
149    pub(crate) fn into_read_with_unknown_is_serializable_cell_content(self) -> ReadRawVcFuture {
150        // returns a custom future to have something concrete and sized
151        // this avoids boxing in IntoFuture
152        ReadRawVcFuture::new(self, None)
153    }
154
155    /// See [`crate::Vc::resolve`].
156    pub(crate) async fn resolve(self) -> Result<RawVc> {
157        self.resolve_inner(ReadOutputOptions {
158            consistency: ReadConsistency::Eventual,
159            ..Default::default()
160        })
161        .await
162    }
163
164    /// See [`crate::Vc::resolve_strongly_consistent`].
165    pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
166        SuppressTopLevelTaskCheckFuture {
167            inner: self.resolve_inner(ReadOutputOptions {
168                consistency: ReadConsistency::Strong,
169                ..Default::default()
170            }),
171        }
172        .await
173    }
174
175    async fn resolve_inner(self, mut options: ReadOutputOptions) -> Result<RawVc> {
176        let tt = turbo_tasks();
177        let mut current = self;
178        loop {
179            match current {
180                RawVc::TaskOutput(task) => {
181                    current = read_task_output(&*tt, task, options).await?;
182                    // We no longer need to read strongly consistent, as any Vc returned
183                    // from the first task will be inside of the scope of the first
184                    // task. So it's already strongly consistent.
185                    options.consistency = ReadConsistency::Eventual;
186                }
187                RawVc::TaskCell(_, _) => return Ok(current),
188                RawVc::LocalOutput(execution_id, local_task_id, ..) => {
189                    debug_assert_eq!(options.consistency, ReadConsistency::Eventual);
190                    current = read_local_output(&*tt, execution_id, local_task_id).await?;
191                }
192            }
193        }
194    }
195
196    /// Convert a potentially local `RawVc` into a non-local `RawVc`. This is a subset of resolution
197    /// resolution, because the returned `RawVc` can be a `TaskOutput`.
198    pub(crate) async fn to_non_local(self) -> Result<RawVc> {
199        Ok(match self {
200            RawVc::LocalOutput(execution_id, local_task_id, ..) => {
201                let tt = turbo_tasks();
202                let local_output = read_local_output(&*tt, execution_id, local_task_id).await?;
203                debug_assert!(
204                    !matches!(local_output, RawVc::LocalOutput(_, _, _)),
205                    "a LocalOutput cannot point at other LocalOutputs"
206                );
207                local_output
208            }
209            non_local => non_local,
210        })
211    }
212
213    pub(crate) fn connect(&self) {
214        let RawVc::TaskOutput(task_id) = self else {
215            panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
216        };
217        let tt = turbo_tasks();
218        tt.connect_task(*task_id);
219    }
220
221    pub fn try_get_task_id(&self) -> Option<TaskId> {
222        match self {
223            RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
224            RawVc::LocalOutput(..) => None,
225        }
226    }
227
228    pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
229        match self {
230            RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
231            RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
232        }
233    }
234
235    /// For a cell that's already resolved, synchronously check if it implements a trait using the
236    /// type information in `RawVc::TaskCell` (we don't actually need to read the cell!).
237    pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
238        match self {
239            RawVc::TaskCell(_task_id, cell_id) => {
240                get_value_type(cell_id.type_id).has_trait(&trait_id)
241            }
242            _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
243        }
244    }
245
246    /// For a cell that's already resolved, synchronously check if it is a given type using the type
247    /// information in `RawVc::TaskCell` (we don't actually need to read the cell!).
248    pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
249        match self {
250            RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
251            _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
252        }
253    }
254}
255
256/// This implementation of `CollectiblesSource` assumes that `self` is a `RawVc::TaskOutput`.
257impl CollectiblesSource for RawVc {
258    fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
259        let RawVc::TaskOutput(task_id) = self else {
260            panic!(
261                "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
262                 RawVc::TaskOutput"
263            );
264        };
265        let tt = turbo_tasks();
266        let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
267        map.into_iter()
268            .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
269            .collect()
270    }
271
272    fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
273        let RawVc::TaskOutput(task_id) = self else {
274            panic!(
275                "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
276                 RawVc::TaskOutput"
277            );
278        };
279        let tt = turbo_tasks();
280        let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
281        tt.unemit_collectibles(T::get_trait_type_id(), &map);
282        map.into_iter()
283            .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
284            .collect()
285    }
286
287    fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
288        let RawVc::TaskOutput(task_id) = self else {
289            panic!(
290                "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
291                 RawVc::TaskOutput"
292            );
293        };
294        let tt = turbo_tasks();
295        let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
296        tt.unemit_collectibles(T::get_trait_type_id(), &map);
297    }
298}
299
300/// A future wrapper that suppresses the top-level task eventual consistency check
301/// during each [`poll`][Future::poll] call. The suppression is applied via
302/// [`sync_scope`][tokio::task_local!] so it is only active during the synchronous
303/// execution of the inner future's `poll`, and is never held across await points.
304struct SuppressTopLevelTaskCheckFuture<F> {
305    inner: F,
306}
307
308impl<F: Future> Future for SuppressTopLevelTaskCheckFuture<F> {
309    type Output = F::Output;
310
311    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
312        // SAFETY: we are only projecting the pin to the inner field, not moving it
313        let inner = unsafe { self.map_unchecked_mut(|this| &mut this.inner) };
314        if cfg!(debug_assertions) {
315            SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, || inner.poll(cx))
316        } else {
317            inner.poll(cx)
318        }
319    }
320}
321
322#[must_use]
323pub struct ReadRawVcFuture {
324    current: RawVc,
325    read_output_options: ReadOutputOptions,
326    read_cell_options: ReadCellOptions,
327    is_serializable_cell_content_unknown: bool,
328    /// This flag redundant with `read_output_options`, but `read_output_options` is mutated during
329    /// the read. This flag indicates that the initial read was strongly consistent.
330    strongly_consistent: bool,
331    listener: Option<EventListener>,
332}
333
334impl ReadRawVcFuture {
335    pub(crate) fn new(vc: RawVc, is_serializable_cell_content: Option<bool>) -> Self {
336        ReadRawVcFuture {
337            current: vc,
338            read_output_options: ReadOutputOptions::default(),
339            read_cell_options: ReadCellOptions {
340                is_serializable_cell_content: is_serializable_cell_content.unwrap_or(false),
341                ..Default::default()
342            },
343            is_serializable_cell_content_unknown: is_serializable_cell_content.is_none(),
344            strongly_consistent: false,
345            listener: None,
346        }
347    }
348
349    /// Make reads strongly consistent.
350    pub fn strongly_consistent(mut self) -> Self {
351        self.read_output_options.consistency = ReadConsistency::Strong;
352        self.strongly_consistent = true;
353        self
354    }
355
356    /// Track the value as a dependency with an key.
357    pub fn track_with_key(mut self, key: u64) -> Self {
358        self.read_output_options.tracking = ReadTracking::Tracked;
359        self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) };
360        self
361    }
362
363    /// This will not track the value as dependency, but will still track the error as dependency,
364    /// if there is an error.
365    ///
366    /// INVALIDATION: Be careful with this, it will not track dependencies, so
367    /// using it could break cache invalidation.
368    pub fn untracked(mut self) -> Self {
369        self.read_output_options.tracking = ReadTracking::TrackOnlyError;
370        self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError;
371        self
372    }
373
374    /// Hint that this is the final read of the cell content.
375    pub fn final_read_hint(mut self) -> Self {
376        self.read_cell_options.final_read_hint = true;
377        self
378    }
379}
380
381impl Future for ReadRawVcFuture {
382    type Output = Result<TypedCellContent>;
383
384    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
385        // SAFETY: we are not moving self
386        let this = unsafe { self.get_unchecked_mut() };
387
388        // Extract the closure to avoid deep nesting
389        let poll_fn = |tt: &Arc<dyn TurboTasksApi>| -> Poll<Self::Output> {
390            'outer: loop {
391                if let Some(listener) = &mut this.listener {
392                    // SAFETY: listener is from previous pinned this
393                    let listener = unsafe { Pin::new_unchecked(listener) };
394                    if listener.poll(cx).is_pending() {
395                        return Poll::Pending;
396                    }
397                    this.listener = None;
398                }
399                let mut listener = match this.current {
400                    RawVc::TaskOutput(task) => {
401                        let read_result = tt.try_read_task_output(task, this.read_output_options);
402                        match read_result {
403                            Ok(Ok(vc)) => {
404                                // turbo-tasks-backend doesn't currently have any sort of
405                                // "transaction" or global lock mechanism to group together chains
406                                // of `TaskOutput`/`TaskCell` reads.
407                                //
408                                // If we ignore the theoretical TOCTOU issues, we no longer need to
409                                // read strongly consistent, as any Vc returned from the first task
410                                // will be inside of the scope of the first task. So it's already
411                                // strongly consistent.
412                                this.read_output_options.consistency = ReadConsistency::Eventual;
413                                this.current = vc;
414                                continue 'outer;
415                            }
416                            Ok(Err(listener)) => listener,
417                            Err(err) => return Poll::Ready(Err(err)),
418                        }
419                    }
420                    RawVc::TaskCell(task, index) => {
421                        if this.is_serializable_cell_content_unknown {
422                            let value_type = registry::get_value_type(index.type_id);
423                            this.read_cell_options.is_serializable_cell_content =
424                                value_type.bincode.is_some();
425                        }
426                        let read_result =
427                            tt.try_read_task_cell(task, index, this.read_cell_options);
428                        match read_result {
429                            Ok(Ok(content)) => {
430                                // SAFETY: Constructor ensures that T and U are binary identical
431                                return Poll::Ready(Ok(content));
432                            }
433                            Ok(Err(listener)) => listener,
434                            Err(err) => return Poll::Ready(Err(err)),
435                        }
436                    }
437                    RawVc::LocalOutput(execution_id, local_output_id, ..) => {
438                        debug_assert_eq!(
439                            this.read_output_options.consistency,
440                            ReadConsistency::Eventual
441                        );
442                        let read_result = tt.try_read_local_output(execution_id, local_output_id);
443                        match read_result {
444                            Ok(Ok(vc)) => {
445                                this.current = vc;
446                                continue 'outer;
447                            }
448                            Ok(Err(listener)) => listener,
449                            Err(err) => return Poll::Ready(Err(err)),
450                        }
451                    }
452                };
453                // SAFETY: listener is from previous pinned this
454                match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
455                    Poll::Ready(_) => continue,
456                    Poll::Pending => {
457                        this.listener = Some(listener);
458                        return Poll::Pending;
459                    }
460                };
461            }
462        };
463
464        fn suppress_top_level_task_check<R>(strongly_consistent: bool, f: impl FnOnce() -> R) -> R {
465            if cfg!(debug_assertions) && strongly_consistent {
466                // Temporarily suppress the top-level task check
467                SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, f)
468            } else {
469                f()
470            }
471        }
472
473        // HACK: Temporarily suppress top-level task check if doing strongly consistent read.
474        //
475        // This masks a bug: There's an unlikely TOCTOU race condition in `poll_fn`. Because the
476        // strongly consistent read isn't a single atomic operation, any inner `TaskOutput` or
477        // `TaskCell` could get mutated after the strongly consistent read of the outer
478        // `TaskOutput`.
479        suppress_top_level_task_check(this.strongly_consistent, || with_turbo_tasks(poll_fn))
480    }
481}
482
483impl Unpin for ReadRawVcFuture {}