turbo_tasks/
raw_vc.rs

1use std::{fmt::Display, future::Future, pin::Pin, task::Poll};
2
3use anyhow::Result;
4use auto_hash_map::AutoSet;
5use serde::{Deserialize, Serialize};
6use thiserror::Error;
7
8use crate::{
9    CollectiblesSource, ReadCellOptions, ReadConsistency, ResolvedVc, TaskId, TaskPersistence,
10    TraitTypeId, ValueType, ValueTypeId, VcValueTrait,
11    backend::{CellContent, TypedCellContent},
12    event::EventListener,
13    id::{ExecutionId, LocalTaskId},
14    manager::{read_local_output, read_task_cell, read_task_output, with_turbo_tasks},
15    registry::{self, get_value_type},
16    turbo_tasks,
17};
18
19#[derive(Error, Debug)]
20pub enum ResolveTypeError {
21    #[error("no content in the cell")]
22    NoContent,
23    #[error("the content in the cell has no type")]
24    UntypedContent,
25    #[error("content is not available as task execution failed")]
26    TaskError { source: anyhow::Error },
27    #[error("reading the cell content failed")]
28    ReadError { source: anyhow::Error },
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
32pub struct CellId {
33    pub type_id: ValueTypeId,
34    pub index: u32,
35}
36
37impl Display for CellId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(
40            f,
41            "{}#{}",
42            registry::get_value_type(self.type_id).name,
43            self.index
44        )
45    }
46}
47
48/// A type-erased representation of [`Vc`][crate::Vc].
49///
50/// Type erasure reduces the [monomorphization] (and therefore binary size and compilation time)
51/// required to support [`Vc`][crate::Vc].
52///
53/// This type is heavily used within the [`Backend`][crate::backend::Backend] trait, but should
54/// otherwise be treated as an internal implementation detail of `turbo-tasks`.
55///
56/// [monomorphization]: https://doc.rust-lang.org/book/ch10-01-syntax.html#performance-of-code-using-generics
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
58pub enum RawVc {
59    /// The synchronous return value of a task (after argument resolution). This is the
60    /// representation used by [`OperationVc`][crate::OperationVc].
61    TaskOutput(TaskId),
62    /// A pointer to a specific [`Vc::cell`][crate::Vc::cell] or `.cell()` call within a task. This
63    /// is the representation used by [`ResolvedVc`].
64    ///
65    /// [`CellId`] contains the [`ValueTypeId`], which can be useful for efficient downcasting.
66    TaskCell(TaskId, CellId),
67    /// The synchronous return value of a local task. This is created when a function is called
68    /// with unresolved arguments or more explicitly with
69    /// [`#[turbo_tasks::function(local)]`][crate::function].
70    ///
71    /// Local outputs are only valid within the context of their parent "non-local" task. Turbo
72    /// Task's APIs are designed to prevent escapes of local [`Vc`]s, but [`ExecutionId`] is used
73    /// for a fallback runtime assertion.
74    LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
75}
76
77impl RawVc {
78    pub fn is_resolved(&self) -> bool {
79        match self {
80            RawVc::TaskOutput(..) => false,
81            RawVc::TaskCell(..) => true,
82            RawVc::LocalOutput(..) => false,
83        }
84    }
85
86    pub fn is_local(&self) -> bool {
87        match self {
88            RawVc::TaskOutput(..) => false,
89            RawVc::TaskCell(..) => false,
90            RawVc::LocalOutput(..) => true,
91        }
92    }
93
94    /// Returns `true` if the task this `RawVc` reads from cannot be serialized and will not be
95    /// stored in the persistent cache.
96    ///
97    /// See [`TaskPersistence`] for more details.
98    pub fn is_transient(&self) -> bool {
99        match self {
100            RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
101            RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
102        }
103    }
104
105    pub(crate) fn into_read(self) -> ReadRawVcFuture {
106        // returns a custom future to have something concrete and sized
107        // this avoids boxing in IntoFuture
108        ReadRawVcFuture::new(self)
109    }
110
111    pub(crate) async fn resolve_trait(
112        self,
113        trait_type: TraitTypeId,
114    ) -> Result<Option<RawVc>, ResolveTypeError> {
115        self.resolve_type_inner(|value_type_id| {
116            let value_type = get_value_type(value_type_id);
117            (value_type.has_trait(&trait_type), Some(value_type))
118        })
119        .await
120    }
121
122    pub(crate) async fn resolve_value(
123        self,
124        value_type: ValueTypeId,
125    ) -> Result<Option<RawVc>, ResolveTypeError> {
126        self.resolve_type_inner(|cell_value_type| (cell_value_type == value_type, None))
127            .await
128    }
129
130    /// Helper for `resolve_trait` and `resolve_value`.
131    ///
132    /// After finding a cell, returns `Ok(Some(...))` when `conditional` returns
133    /// `true`, and `Ok(None)` when `conditional` returns `false`.
134    ///
135    /// As an optimization, `conditional` may return the `&'static ValueType` to
136    /// avoid a potential extra lookup later.
137    async fn resolve_type_inner(
138        self,
139        conditional: impl FnOnce(ValueTypeId) -> (bool, Option<&'static ValueType>),
140    ) -> Result<Option<RawVc>, ResolveTypeError> {
141        let tt = turbo_tasks();
142        tt.notify_scheduled_tasks();
143        let mut current = self;
144        loop {
145            match current {
146                RawVc::TaskOutput(task) => {
147                    current = read_task_output(&*tt, task, ReadConsistency::Eventual)
148                        .await
149                        .map_err(|source| ResolveTypeError::TaskError { source })?;
150                }
151                RawVc::TaskCell(task, index) => {
152                    let content = read_task_cell(&*tt, task, index, ReadCellOptions::default())
153                        .await
154                        .map_err(|source| ResolveTypeError::ReadError { source })?;
155                    if let TypedCellContent(value_type, CellContent(Some(_))) = content {
156                        return Ok(if conditional(value_type).0 {
157                            Some(RawVc::TaskCell(task, index))
158                        } else {
159                            None
160                        });
161                    } else {
162                        return Err(ResolveTypeError::NoContent);
163                    }
164                }
165                RawVc::LocalOutput(execution_id, local_task_id, ..) => {
166                    current = read_local_output(&*tt, execution_id, local_task_id)
167                        .await
168                        .map_err(|source| ResolveTypeError::TaskError { source })?;
169                }
170            }
171        }
172    }
173
174    /// See [`crate::Vc::resolve`].
175    pub(crate) async fn resolve(self) -> Result<RawVc> {
176        self.resolve_inner(ReadConsistency::Eventual).await
177    }
178
179    /// See [`crate::Vc::resolve_strongly_consistent`].
180    pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
181        self.resolve_inner(ReadConsistency::Strong).await
182    }
183
184    async fn resolve_inner(self, mut consistency: ReadConsistency) -> Result<RawVc> {
185        let tt = turbo_tasks();
186        let mut current = self;
187        let mut notified = false;
188        loop {
189            match current {
190                RawVc::TaskOutput(task) => {
191                    if !notified {
192                        tt.notify_scheduled_tasks();
193                        notified = true;
194                    }
195                    current = read_task_output(&*tt, task, consistency).await?;
196                    // We no longer need to read strongly consistent, as any Vc returned
197                    // from the first task will be inside of the scope of the first
198                    // task. So it's already strongly consistent.
199                    consistency = ReadConsistency::Eventual;
200                }
201                RawVc::TaskCell(_, _) => return Ok(current),
202                RawVc::LocalOutput(execution_id, local_task_id, ..) => {
203                    debug_assert_eq!(consistency, ReadConsistency::Eventual);
204                    current = read_local_output(&*tt, execution_id, local_task_id).await?;
205                }
206            }
207        }
208    }
209
210    /// Convert a potentially local `RawVc` into a non-local `RawVc`. This is a subset of resolution
211    /// resolution, because the returned `RawVc` can be a `TaskOutput`.
212    pub(crate) async fn to_non_local(self) -> Result<RawVc> {
213        let tt = turbo_tasks();
214        let mut current = self;
215        loop {
216            match current {
217                RawVc::LocalOutput(execution_id, local_task_id, ..) => {
218                    current = read_local_output(&*tt, execution_id, local_task_id).await?;
219                }
220                non_local => return Ok(non_local),
221            }
222        }
223    }
224
225    pub(crate) fn connect(&self) {
226        let RawVc::TaskOutput(task_id) = self else {
227            panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
228        };
229        let tt = turbo_tasks();
230        tt.connect_task(*task_id);
231    }
232
233    pub fn try_get_task_id(&self) -> Option<TaskId> {
234        match self {
235            RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
236            RawVc::LocalOutput(..) => None,
237        }
238    }
239
240    pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
241        match self {
242            RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
243            RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
244        }
245    }
246
247    /// For a cell that's already resolved, synchronously check if it implements a trait using the
248    /// type information in `RawVc::TaskCell` (we don't actualy need to read the cell!).
249    pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
250        match self {
251            RawVc::TaskCell(_task_id, cell_id) => {
252                get_value_type(cell_id.type_id).has_trait(&trait_id)
253            }
254            _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
255        }
256    }
257
258    /// For a cell that's already resolved, synchronously check if it is a given type using the type
259    /// information in `RawVc::TaskCell` (we don't actualy need to read the cell!).
260    pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
261        match self {
262            RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
263            _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
264        }
265    }
266}
267
268/// This implementation of `CollectiblesSource` assumes that `self` is a `RawVc::TaskOutput`.
269impl CollectiblesSource for RawVc {
270    fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
271        let RawVc::TaskOutput(task_id) = self else {
272            panic!(
273                "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
274                 RawVc::TaskOutput"
275            );
276        };
277        let tt = turbo_tasks();
278        tt.notify_scheduled_tasks();
279        let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
280        map.into_iter()
281            .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
282            .collect()
283    }
284
285    fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
286        let RawVc::TaskOutput(task_id) = self else {
287            panic!(
288                "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
289                 RawVc::TaskOutput"
290            );
291        };
292        let tt = turbo_tasks();
293        tt.notify_scheduled_tasks();
294        let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
295        tt.unemit_collectibles(T::get_trait_type_id(), &map);
296        map.into_iter()
297            .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
298            .collect()
299    }
300}
301
302pub struct ReadRawVcFuture {
303    consistency: ReadConsistency,
304    current: RawVc,
305    untracked: bool,
306    read_cell_options: ReadCellOptions,
307    listener: Option<EventListener>,
308}
309
310impl ReadRawVcFuture {
311    pub(crate) fn new(vc: RawVc) -> Self {
312        ReadRawVcFuture {
313            consistency: ReadConsistency::Eventual,
314            current: vc,
315            untracked: false,
316            read_cell_options: ReadCellOptions::default(),
317            listener: None,
318        }
319    }
320
321    pub fn strongly_consistent(mut self) -> Self {
322        self.consistency = ReadConsistency::Strong;
323        self
324    }
325
326    /// INVALIDATION: Be careful with this, it will not track dependencies, so
327    /// using it could break cache invalidation.
328    pub fn untracked(mut self) -> Self {
329        self.untracked = true;
330        self
331    }
332
333    pub fn final_read_hint(mut self) -> Self {
334        self.read_cell_options.final_read_hint = true;
335        self
336    }
337}
338
339impl Future for ReadRawVcFuture {
340    type Output = Result<TypedCellContent>;
341
342    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
343        with_turbo_tasks(|tt| {
344            tt.notify_scheduled_tasks();
345            // SAFETY: we are not moving this
346            let this = unsafe { self.get_unchecked_mut() };
347            'outer: loop {
348                if let Some(listener) = &mut this.listener {
349                    // SAFETY: listener is from previous pinned this
350                    let listener = unsafe { Pin::new_unchecked(listener) };
351                    if listener.poll(cx).is_pending() {
352                        return Poll::Pending;
353                    }
354                    this.listener = None;
355                }
356                let mut listener = match this.current {
357                    RawVc::TaskOutput(task) => {
358                        let read_result = if this.untracked {
359                            tt.try_read_task_output_untracked(task, this.consistency)
360                        } else {
361                            tt.try_read_task_output(task, this.consistency)
362                        };
363                        match read_result {
364                            Ok(Ok(vc)) => {
365                                // We no longer need to read strongly consistent, as any Vc returned
366                                // from the first task will be inside of the scope of the first
367                                // task. So it's already strongly consistent.
368                                this.consistency = ReadConsistency::Eventual;
369                                this.current = vc;
370                                continue 'outer;
371                            }
372                            Ok(Err(listener)) => listener,
373                            Err(err) => return Poll::Ready(Err(err)),
374                        }
375                    }
376                    RawVc::TaskCell(task, index) => {
377                        let read_result = if this.untracked {
378                            tt.try_read_task_cell_untracked(task, index, this.read_cell_options)
379                        } else {
380                            tt.try_read_task_cell(task, index, this.read_cell_options)
381                        };
382                        match read_result {
383                            Ok(Ok(content)) => {
384                                // SAFETY: Constructor ensures that T and U are binary identical
385                                return Poll::Ready(Ok(content));
386                            }
387                            Ok(Err(listener)) => listener,
388                            Err(err) => return Poll::Ready(Err(err)),
389                        }
390                    }
391                    RawVc::LocalOutput(execution_id, local_output_id, ..) => {
392                        debug_assert_eq!(this.consistency, ReadConsistency::Eventual);
393                        let read_result = tt.try_read_local_output(execution_id, local_output_id);
394                        match read_result {
395                            Ok(Ok(vc)) => {
396                                this.current = vc;
397                                continue 'outer;
398                            }
399                            Ok(Err(listener)) => listener,
400                            Err(err) => return Poll::Ready(Err(err)),
401                        }
402                    }
403                };
404                // SAFETY: listener is from previous pinned this
405                match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
406                    Poll::Ready(_) => continue,
407                    Poll::Pending => {
408                        this.listener = Some(listener);
409                        return Poll::Pending;
410                    }
411                };
412            }
413        })
414    }
415}
416
417impl Unpin for ReadRawVcFuture {}