Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    pin::Pin,
8    sync::{
9        Arc, Mutex, RwLock, Weak,
10        atomic::{AtomicBool, AtomicUsize, Ordering},
11    },
12    time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25
26use crate::{
27    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
28    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
29    VcValueTrait, VcValueType,
30    backend::{
31        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
32        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
33    },
34    capture_future::CaptureFuture,
35    event::{Event, EventListener},
36    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
37    id_factory::IdFactoryWithReuse,
38    keyed::KeyedEq,
39    macro_helpers::NativeFunction,
40    magic_any::MagicAny,
41    message_queue::{CompilationEvent, CompilationEventQueue},
42    priority_runner::{Executor, JoinHandle, PriorityRunner},
43    raw_vc::{CellId, RawVc},
44    registry,
45    serialization_invalidation::SerializationInvalidator,
46    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactory, StaticOrArc},
50};
51
52/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
53/// tasks from function calls.
54pub trait TurboTasksCallApi: Sync + Send {
55    /// Calls a native function with arguments. Resolves arguments when needed
56    /// with a wrapper task.
57    fn dynamic_call(
58        &self,
59        native_fn: &'static NativeFunction,
60        this: Option<RawVc>,
61        arg: Box<dyn MagicAny>,
62        persistence: TaskPersistence,
63    ) -> RawVc;
64    /// Call a native function with arguments.
65    /// All inputs must be resolved.
66    fn native_call(
67        &self,
68        native_fn: &'static NativeFunction,
69        this: Option<RawVc>,
70        arg: Box<dyn MagicAny>,
71        persistence: TaskPersistence,
72    ) -> RawVc;
73    /// Calls a trait method with arguments. First input is the `self` object.
74    /// Uses a wrapper task to resolve
75    fn trait_call(
76        &self,
77        trait_method: &'static TraitMethod,
78        this: RawVc,
79        arg: Box<dyn MagicAny>,
80        persistence: TaskPersistence,
81    ) -> RawVc;
82
83    fn run(
84        &self,
85        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
86    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
87    fn run_once(
88        &self,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
91    fn run_once_with_reason(
92        &self,
93        reason: StaticOrArc<dyn InvalidationReason>,
94        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
95    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
96    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
97
98    /// Sends a compilation event to subscribers.
99    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
100
101    /// Returns a human-readable name for the given task.
102    fn get_task_name(&self, task: TaskId) -> String;
103}
104
105/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
106/// context. Returned by the [`turbo_tasks`] helper function.
107///
108/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
109/// parameter.
110pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
111    fn invalidate(&self, task: TaskId);
112    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
113
114    fn invalidate_serialization(&self, task: TaskId);
115
116    fn try_read_task_output(
117        &self,
118        task: TaskId,
119        options: ReadOutputOptions,
120    ) -> Result<Result<RawVc, EventListener>>;
121
122    fn try_read_task_cell(
123        &self,
124        task: TaskId,
125        index: CellId,
126        options: ReadCellOptions,
127    ) -> Result<Result<TypedCellContent, EventListener>>;
128
129    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
130    /// task points to.
131    ///
132    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
133    /// recursively or in a loop.
134    ///
135    /// This does not accept a consistency argument, as you cannot control consistency of a read of
136    /// an operation owned by your own task. Strongly consistent reads are only allowed on
137    /// [`OperationVc`]s, which should never be local tasks.
138    ///
139    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
140    /// task to depend on itself.
141    ///
142    /// [`OperationVc`]: crate::OperationVc
143    fn try_read_local_output(
144        &self,
145        execution_id: ExecutionId,
146        local_task_id: LocalTaskId,
147    ) -> Result<Result<RawVc, EventListener>>;
148
149    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
150
151    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
152    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
153    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
154
155    /// INVALIDATION: Be careful with this, it will not track dependencies, so
156    /// using it could break cache invalidation.
157    fn try_read_own_task_cell(
158        &self,
159        current_task: TaskId,
160        index: CellId,
161        options: ReadCellOptions,
162    ) -> Result<TypedCellContent>;
163
164    fn read_own_task_cell(
165        &self,
166        task: TaskId,
167        index: CellId,
168        options: ReadCellOptions,
169    ) -> Result<TypedCellContent>;
170    fn update_own_task_cell(
171        &self,
172        task: TaskId,
173        index: CellId,
174        is_serializable_cell_content: bool,
175        content: CellContent,
176        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
177        verification_mode: VerificationMode,
178    );
179    fn mark_own_task_as_finished(&self, task: TaskId);
180    fn mark_own_task_as_session_dependent(&self, task: TaskId);
181
182    fn connect_task(&self, task: TaskId);
183
184    /// Wraps the given future in the current task.
185    ///
186    /// Beware: this method is not safe to use in production code. It is only intended for use in
187    /// tests and for debugging purposes.
188    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
189
190    fn task_statistics(&self) -> &TaskStatisticsApi;
191
192    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
193
194    fn subscribe_to_compilation_events(
195        &self,
196        event_types: Option<Vec<String>>,
197    ) -> Receiver<Arc<dyn CompilationEvent>>;
198
199    // Returns true if TurboTasks is configured to track dependencies.
200    fn is_tracking_dependencies(&self) -> bool;
201}
202
203/// A wrapper around a value that is unused.
204pub struct Unused<T> {
205    inner: T,
206}
207
208impl<T> Unused<T> {
209    /// Creates a new unused value.
210    ///
211    /// # Safety
212    ///
213    /// The wrapped value must not be used.
214    pub unsafe fn new_unchecked(inner: T) -> Self {
215        Self { inner }
216    }
217
218    /// Get the inner value, without consuming the `Unused` wrapper.
219    ///
220    /// # Safety
221    ///
222    /// The user need to make sure that the value stays unused.
223    pub unsafe fn get_unchecked(&self) -> &T {
224        &self.inner
225    }
226
227    /// Unwraps the value, consuming the `Unused` wrapper.
228    pub fn into(self) -> T {
229        self.inner
230    }
231}
232
233/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
234pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
235    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
236
237    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
238    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
239    /// # Safety
240    ///
241    /// The caller must ensure that the task id is not used anymore.
242    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
243    /// # Safety
244    ///
245    /// The caller must ensure that the task id is not used anymore.
246    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
247
248    /// Schedule a task for execution.
249    fn schedule(&self, task: TaskId, priority: TaskPriority);
250
251    /// Returns the priority of the current task.
252    fn get_current_task_priority(&self) -> TaskPriority;
253
254    /// Schedule a foreground backend job for execution.
255    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
256
257    /// Schedule a background backend job for execution.
258    ///
259    /// Background jobs are not counted towards activeness of the system. The system is considered
260    /// idle even with active background jobs.
261    fn schedule_backend_background_job(&self, job: B::BackendJob);
262
263    /// Returns the duration from the start of the program to the given instant.
264    fn program_duration_until(&self, instant: Instant) -> Duration;
265
266    /// Returns true if the system is idle.
267    fn is_idle(&self) -> bool;
268
269    /// Returns a reference to the backend.
270    fn backend(&self) -> &B;
271}
272
273#[allow(clippy::manual_non_exhaustive)]
274pub struct UpdateInfo {
275    pub duration: Duration,
276    pub tasks: usize,
277    pub reasons: InvalidationReasonSet,
278    #[allow(dead_code)]
279    placeholder_for_future_fields: (),
280}
281
282#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
283pub enum TaskPersistence {
284    /// Tasks that may be persisted across sessions using serialization.
285    Persistent,
286
287    /// Tasks that will be persisted in memory for the life of this session, but won't persist
288    /// between sessions.
289    ///
290    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
291    /// type [`TransientValue`][crate::value::TransientValue] or
292    /// [`TransientInstance`][crate::value::TransientInstance].
293    Transient,
294}
295
296impl Display for TaskPersistence {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        match self {
299            TaskPersistence::Persistent => write!(f, "persistent"),
300            TaskPersistence::Transient => write!(f, "transient"),
301        }
302    }
303}
304
305#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
306pub enum ReadConsistency {
307    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
308    /// may later trigger re-computation.
309    #[default]
310    Eventual,
311    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
312    /// the cost of slower reads.
313    ///
314    /// Top-level code that returns data to the user should use strongly consistent reads.
315    Strong,
316}
317
318#[derive(Clone, Copy, Debug, Eq, PartialEq)]
319pub enum ReadCellTracking {
320    /// Reads are tracked as dependencies of the current task.
321    Tracked {
322        /// The key used for the dependency
323        key: Option<u64>,
324    },
325    /// The read is only tracked when there is an error, otherwise it is untracked.
326    ///
327    /// INVALIDATION: Be careful with this, it will not track dependencies, so
328    /// using it could break cache invalidation.
329    TrackOnlyError,
330    /// The read is not tracked as a dependency of the current task.
331    ///
332    /// INVALIDATION: Be careful with this, it will not track dependencies, so
333    /// using it could break cache invalidation.
334    Untracked,
335}
336
337impl ReadCellTracking {
338    pub fn should_track(&self, is_err: bool) -> bool {
339        match self {
340            ReadCellTracking::Tracked { .. } => true,
341            ReadCellTracking::TrackOnlyError => is_err,
342            ReadCellTracking::Untracked => false,
343        }
344    }
345
346    pub fn key(&self) -> Option<u64> {
347        match self {
348            ReadCellTracking::Tracked { key } => *key,
349            ReadCellTracking::TrackOnlyError => None,
350            ReadCellTracking::Untracked => None,
351        }
352    }
353}
354
355impl Default for ReadCellTracking {
356    fn default() -> Self {
357        ReadCellTracking::Tracked { key: None }
358    }
359}
360
361impl Display for ReadCellTracking {
362    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363        match self {
364            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
365            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
366            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
367            ReadCellTracking::Untracked => write!(f, "untracked"),
368        }
369    }
370}
371
372#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
373pub enum ReadTracking {
374    /// Reads are tracked as dependencies of the current task.
375    #[default]
376    Tracked,
377    /// The read is only tracked when there is an error, otherwise it is untracked.
378    ///
379    /// INVALIDATION: Be careful with this, it will not track dependencies, so
380    /// using it could break cache invalidation.
381    TrackOnlyError,
382    /// The read is not tracked as a dependency of the current task.
383    ///
384    /// INVALIDATION: Be careful with this, it will not track dependencies, so
385    /// using it could break cache invalidation.
386    Untracked,
387}
388
389impl ReadTracking {
390    pub fn should_track(&self, is_err: bool) -> bool {
391        match self {
392            ReadTracking::Tracked => true,
393            ReadTracking::TrackOnlyError => is_err,
394            ReadTracking::Untracked => false,
395        }
396    }
397}
398
399impl Display for ReadTracking {
400    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401        match self {
402            ReadTracking::Tracked => write!(f, "tracked"),
403            ReadTracking::TrackOnlyError => write!(f, "track only error"),
404            ReadTracking::Untracked => write!(f, "untracked"),
405        }
406    }
407}
408
409#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
410pub enum TaskPriority {
411    #[default]
412    Initial,
413    Invalidation {
414        priority: Reverse<u32>,
415    },
416}
417
418impl TaskPriority {
419    pub fn invalidation(priority: u32) -> Self {
420        Self::Invalidation {
421            priority: Reverse(priority),
422        }
423    }
424
425    pub fn initial() -> Self {
426        Self::Initial
427    }
428
429    pub fn leaf() -> Self {
430        Self::Invalidation {
431            priority: Reverse(0),
432        }
433    }
434
435    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
436        match self {
437            TaskPriority::Initial => parent_priority,
438            TaskPriority::Invalidation { priority } => {
439                if let TaskPriority::Invalidation {
440                    priority: parent_priority,
441                } = parent_priority
442                    && priority.0 < parent_priority.0
443                {
444                    Self::Invalidation {
445                        priority: Reverse(parent_priority.0.saturating_add(1)),
446                    }
447                } else {
448                    *self
449                }
450            }
451        }
452    }
453}
454
455impl Display for TaskPriority {
456    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457        match self {
458            TaskPriority::Initial => write!(f, "initial"),
459            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
460        }
461    }
462}
463
464enum ScheduledTask {
465    Task {
466        task_id: TaskId,
467        span: Span,
468    },
469    LocalTask {
470        ty: LocalTaskSpec,
471        persistence: TaskPersistence,
472        local_task_id: LocalTaskId,
473        global_task_state: Arc<RwLock<CurrentTaskState>>,
474        span: Span,
475    },
476}
477
478pub struct TurboTasks<B: Backend + 'static> {
479    this: Weak<Self>,
480    backend: B,
481    task_id_factory: IdFactoryWithReuse<TaskId>,
482    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
483    execution_id_factory: IdFactory<ExecutionId>,
484    stopped: AtomicBool,
485    currently_scheduled_foreground_jobs: AtomicUsize,
486    currently_scheduled_background_jobs: AtomicUsize,
487    scheduled_tasks: AtomicUsize,
488    priority_runner:
489        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
490    start: Mutex<Option<Instant>>,
491    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
492    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
493    event_foreground_start: Event,
494    /// Event that is triggered when all foreground jobs are done
495    /// (currently_scheduled_foreground_jobs becomes zero)
496    event_foreground_done: Event,
497    /// Event that is triggered when all background jobs are done
498    event_background_done: Event,
499    program_start: Instant,
500    compilation_events: CompilationEventQueue,
501}
502
503type LocalTaskTracker = Option<
504    FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
505>;
506
507/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
508/// all share the same non-local task state.
509///
510/// A non-local task is one that:
511///
512/// - Has a unique task id.
513/// - Is potentially cached.
514/// - The backend is aware of.
515struct CurrentTaskState {
516    task_id: Option<TaskId>,
517    execution_id: ExecutionId,
518    priority: TaskPriority,
519
520    /// True if the current task has state in cells (interior mutability).
521    /// Only tracked when verify_determinism feature is enabled.
522    #[cfg(feature = "verify_determinism")]
523    stateful: bool,
524
525    /// True if the current task uses an external invalidator
526    has_invalidator: bool,
527
528    /// True if we're in a top-level task (e.g. `.run_once(...)` or `.run(...)`).
529    /// Eventually consistent reads are not allowed in top-level tasks.
530    in_top_level_task: bool,
531
532    /// Tracks how many cells of each type has been allocated so far during this task execution.
533    /// When a task is re-executed, the cell count may not match the existing cell vec length.
534    ///
535    /// This is taken (and becomes `None`) during teardown of a task.
536    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
537
538    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
539    local_tasks: Vec<LocalTask>,
540
541    /// Tracks currently running local tasks, and defers cleanup of the global task until those
542    /// complete. Also used by `spawn_detached_for_testing`.
543    local_task_tracker: LocalTaskTracker,
544}
545
546impl CurrentTaskState {
547    fn new(
548        task_id: TaskId,
549        execution_id: ExecutionId,
550        priority: TaskPriority,
551        in_top_level_task: bool,
552    ) -> Self {
553        Self {
554            task_id: Some(task_id),
555            execution_id,
556            priority,
557            #[cfg(feature = "verify_determinism")]
558            stateful: false,
559            has_invalidator: false,
560            in_top_level_task,
561            cell_counters: Some(AutoMap::default()),
562            local_tasks: Vec::new(),
563            local_task_tracker: None,
564        }
565    }
566
567    fn new_temporary(
568        execution_id: ExecutionId,
569        priority: TaskPriority,
570        in_top_level_task: bool,
571    ) -> Self {
572        Self {
573            task_id: None,
574            execution_id,
575            priority,
576            #[cfg(feature = "verify_determinism")]
577            stateful: false,
578            has_invalidator: false,
579            in_top_level_task,
580            cell_counters: None,
581            local_tasks: Vec::new(),
582            local_task_tracker: None,
583        }
584    }
585
586    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
587        if self.execution_id != expected_execution_id {
588            panic!(
589                "Local tasks can only be scheduled/awaited within the same execution of the \
590                 parent task that created them"
591            );
592        }
593    }
594
595    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
596        self.local_tasks.push(local_task);
597        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
598        if cfg!(debug_assertions) {
599            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
600        } else {
601            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
602        }
603    }
604
605    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
606        // local task ids are one-indexed (they use NonZeroU32)
607        &self.local_tasks[(*local_task_id as usize) - 1]
608    }
609
610    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
611        &mut self.local_tasks[(*local_task_id as usize) - 1]
612    }
613}
614
615// TODO implement our own thread pool and make these thread locals instead
616task_local! {
617    /// The current TurboTasks instance
618    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
619
620    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
621
622    /// Temporarily suppresses the eventual consistency check in top-level tasks.
623    /// This is used by strongly consistent reads to allow them to succeed in top-level tasks.
624    /// This is NOT shared across local tasks (unlike CURRENT_TASK_STATE), so it's safe
625    /// to set/unset without race conditions.
626    pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
627}
628
629impl<B: Backend + 'static> TurboTasks<B> {
630    // TODO better lifetime management for turbo tasks
631    // consider using unsafe for the task_local turbo tasks
632    // that should be safe as long tasks can't outlife turbo task
633    // so we probably want to make sure that all tasks are joined
634    // when trying to drop turbo tasks
635    pub fn new(backend: B) -> Arc<Self> {
636        let task_id_factory = IdFactoryWithReuse::new(
637            TaskId::MIN,
638            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
639        );
640        let transient_task_id_factory =
641            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
642        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
643        let this = Arc::new_cyclic(|this| Self {
644            this: this.clone(),
645            backend,
646            task_id_factory,
647            transient_task_id_factory,
648            execution_id_factory,
649            stopped: AtomicBool::new(false),
650            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
651            currently_scheduled_background_jobs: AtomicUsize::new(0),
652            scheduled_tasks: AtomicUsize::new(0),
653            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
654            start: Default::default(),
655            aggregated_update: Default::default(),
656            event_foreground_done: Event::new(|| {
657                || "TurboTasks::event_foreground_done".to_string()
658            }),
659            event_foreground_start: Event::new(|| {
660                || "TurboTasks::event_foreground_start".to_string()
661            }),
662            event_background_done: Event::new(|| {
663                || "TurboTasks::event_background_done".to_string()
664            }),
665            program_start: Instant::now(),
666            compilation_events: CompilationEventQueue::default(),
667        });
668        this.backend.startup(&*this);
669        this
670    }
671
672    pub fn pin(&self) -> Arc<Self> {
673        self.this.upgrade().unwrap()
674    }
675
676    /// Creates a new root task
677    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
678    where
679        T: ?Sized,
680        F: Fn() -> Fut + Send + Sync + Clone + 'static,
681        Fut: Future<Output = Result<Vc<T>>> + Send,
682    {
683        let id = self.backend.create_transient_task(
684            TransientTaskType::Root(Box::new(move || {
685                let functor = functor.clone();
686                Box::pin(async move {
687                    let raw_vc = functor().await?.node;
688                    raw_vc.to_non_local().await
689                })
690            })),
691            self,
692        );
693        self.schedule(id, TaskPriority::initial());
694        id
695    }
696
697    pub fn dispose_root_task(&self, task_id: TaskId) {
698        self.backend.dispose_root_task(task_id, self);
699    }
700
701    // TODO make sure that all dependencies settle before reading them
702    /// Creates a new root task, that is only executed once.
703    /// Dependencies will not invalidate the task.
704    #[track_caller]
705    fn spawn_once_task<T, Fut>(&self, future: Fut)
706    where
707        T: ?Sized,
708        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
709    {
710        let id = self.backend.create_transient_task(
711            TransientTaskType::Once(Box::pin(async move {
712                let raw_vc = future.await?.node;
713                raw_vc.to_non_local().await
714            })),
715            self,
716        );
717        self.schedule(id, TaskPriority::initial());
718    }
719
720    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
721        &self,
722        future: impl Future<Output = Result<T>> + Send + 'static,
723    ) -> Result<T> {
724        let (tx, rx) = tokio::sync::oneshot::channel();
725        self.spawn_once_task(async move {
726            mark_top_level_task();
727            let result = future.await;
728            tx.send(result)
729                .map_err(|_| anyhow!("unable to send result"))?;
730            Ok(Completion::new())
731        });
732
733        rx.await?
734    }
735
736    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
737    pub async fn run<T: TraceRawVcs + Send + 'static>(
738        &self,
739        future: impl Future<Output = Result<T>> + Send + 'static,
740    ) -> Result<T, TurboTasksExecutionError> {
741        self.begin_foreground_job();
742        // it's okay for execution ids to overflow and wrap, they're just used for an assert
743        let execution_id = self.execution_id_factory.wrapping_get();
744        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
745            execution_id,
746            TaskPriority::initial(),
747            true, // in_top_level_task
748        )));
749
750        let result = TURBO_TASKS
751            .scope(
752                self.pin(),
753                CURRENT_TASK_STATE.scope(current_task_state, async {
754                    let result = CaptureFuture::new(future).await;
755
756                    // wait for all spawned local tasks using `local` to finish
757                    wait_for_local_tasks().await;
758
759                    match result {
760                        Ok(Ok(raw_vc)) => Ok(raw_vc),
761                        Ok(Err(err)) => Err(err.into()),
762                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
763                    }
764                }),
765            )
766            .await;
767        self.finish_foreground_job();
768        result
769    }
770
771    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
772        let this = self.pin();
773        tokio::spawn(async move {
774            this.pin()
775                .run_once(async move {
776                    this.finish_foreground_job();
777                    future.await;
778                    this.begin_foreground_job();
779                    Ok(())
780                })
781                .await
782                .unwrap()
783        });
784    }
785
786    pub(crate) fn native_call(
787        &self,
788        native_fn: &'static NativeFunction,
789        this: Option<RawVc>,
790        arg: Box<dyn MagicAny>,
791        persistence: TaskPersistence,
792    ) -> RawVc {
793        let task_type = CachedTaskType {
794            native_fn,
795            this,
796            arg,
797        };
798        RawVc::TaskOutput(match persistence {
799            TaskPersistence::Transient => self.backend.get_or_create_transient_task(
800                task_type,
801                current_task_if_available("turbo_function calls"),
802                self,
803            ),
804            TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
805                task_type,
806                current_task_if_available("turbo_function calls"),
807                self,
808            ),
809        })
810    }
811
812    pub fn dynamic_call(
813        &self,
814        native_fn: &'static NativeFunction,
815        this: Option<RawVc>,
816        arg: Box<dyn MagicAny>,
817        persistence: TaskPersistence,
818    ) -> RawVc {
819        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
820            return self.native_call(native_fn, this, arg, persistence);
821        }
822        let task_type = LocalTaskSpec {
823            task_type: LocalTaskType::ResolveNative { native_fn },
824            this,
825            arg,
826        };
827        self.schedule_local_task(task_type, persistence)
828    }
829
830    pub fn trait_call(
831        &self,
832        trait_method: &'static TraitMethod,
833        this: RawVc,
834        arg: Box<dyn MagicAny>,
835        persistence: TaskPersistence,
836    ) -> RawVc {
837        // avoid creating a wrapper task if self is already resolved
838        // for resolved cells we already know the value type so we can lookup the
839        // function
840        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
841            match registry::get_value_type(type_id).get_trait_method(trait_method) {
842                Some(native_fn) => {
843                    let arg = native_fn.arg_meta.filter_owned(arg);
844                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
845                }
846                None => {
847                    // We are destined to fail at this point, but we just retry resolution in the
848                    // local task since we cannot report an error from here.
849                    // TODO: A panic seems appropriate since the immediate caller is to blame
850                }
851            }
852        }
853
854        // create a wrapper task to resolve all inputs
855        let task_type = LocalTaskSpec {
856            task_type: LocalTaskType::ResolveTrait { trait_method },
857            this: Some(this),
858            arg,
859        };
860
861        self.schedule_local_task(task_type, persistence)
862    }
863
864    #[track_caller]
865    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
866        self.begin_foreground_job();
867        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
868
869        self.priority_runner.schedule(
870            &self.pin(),
871            ScheduledTask::Task {
872                task_id,
873                span: Span::current(),
874            },
875            priority,
876        );
877    }
878
879    fn schedule_local_task(
880        &self,
881        ty: LocalTaskSpec,
882        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
883        persistence: TaskPersistence,
884    ) -> RawVc {
885        let task_type = ty.task_type;
886        let (global_task_state, execution_id, priority, local_task_id) =
887            CURRENT_TASK_STATE.with(|gts| {
888                let mut gts_write = gts.write().unwrap();
889                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
890                    done_event: Event::new(move || {
891                        move || format!("LocalTask({task_type})::done_event")
892                    }),
893                });
894                (
895                    Arc::clone(gts),
896                    gts_write.execution_id,
897                    gts_write.priority,
898                    local_task_id,
899                )
900            });
901
902        let future = self.priority_runner.schedule_with_join_handle(
903            &self.pin(),
904            ScheduledTask::LocalTask {
905                ty,
906                persistence,
907                local_task_id,
908                global_task_state: global_task_state.clone(),
909                span: Span::current(),
910            },
911            priority,
912        );
913        global_task_state
914            .write()
915            .unwrap()
916            .local_task_tracker
917            .get_or_insert_default()
918            .push(Either::Left(future));
919
920        RawVc::LocalOutput(execution_id, local_task_id, persistence)
921    }
922
923    fn begin_foreground_job(&self) {
924        if self
925            .currently_scheduled_foreground_jobs
926            .fetch_add(1, Ordering::AcqRel)
927            == 0
928        {
929            *self.start.lock().unwrap() = Some(Instant::now());
930            self.event_foreground_start.notify(usize::MAX);
931            self.backend.idle_end(self);
932        }
933    }
934
935    fn finish_foreground_job(&self) {
936        if self
937            .currently_scheduled_foreground_jobs
938            .fetch_sub(1, Ordering::AcqRel)
939            == 1
940        {
941            self.backend.idle_start(self);
942            // That's not super race-condition-safe, but it's only for
943            // statistical reasons
944            let total = self.scheduled_tasks.load(Ordering::Acquire);
945            self.scheduled_tasks.store(0, Ordering::Release);
946            if let Some(start) = *self.start.lock().unwrap() {
947                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
948                if let Some(update) = update.as_mut() {
949                    update.0 += start.elapsed();
950                    update.1 += total;
951                } else {
952                    *update = Some((start.elapsed(), total));
953                }
954            }
955            self.event_foreground_done.notify(usize::MAX);
956        }
957    }
958
959    fn begin_background_job(&self) {
960        self.currently_scheduled_background_jobs
961            .fetch_add(1, Ordering::Relaxed);
962    }
963
964    fn finish_background_job(&self) {
965        if self
966            .currently_scheduled_background_jobs
967            .fetch_sub(1, Ordering::Relaxed)
968            == 1
969        {
970            self.event_background_done.notify(usize::MAX);
971        }
972    }
973
974    pub fn get_in_progress_count(&self) -> usize {
975        self.currently_scheduled_foreground_jobs
976            .load(Ordering::Acquire)
977    }
978
979    /// Waits for the given task to finish executing. This works by performing an untracked read,
980    /// and discarding the value of the task output.
981    ///
982    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
983    /// before all dependencies have completely settled.
984    ///
985    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
986    /// to fully settle before returning.
987    ///
988    /// As this function is typically called in top-level code that waits for results to be ready
989    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
990    pub async fn wait_task_completion(
991        &self,
992        id: TaskId,
993        consistency: ReadConsistency,
994    ) -> Result<()> {
995        read_task_output(
996            self,
997            id,
998            ReadOutputOptions {
999                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
1000                tracking: ReadTracking::Untracked,
1001                consistency,
1002            },
1003        )
1004        .await?;
1005        Ok(())
1006    }
1007
1008    /// Returns [UpdateInfo] with all updates aggregated over a given duration
1009    /// (`aggregation`). Will wait until an update happens.
1010    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
1011        self.aggregated_update_info(aggregation, Duration::MAX)
1012            .await
1013            .unwrap()
1014    }
1015
1016    /// Returns [UpdateInfo] with all updates aggregated over a given duration
1017    /// (`aggregation`). Will only return None when the timeout is reached while
1018    /// waiting for the first update.
1019    pub async fn aggregated_update_info(
1020        &self,
1021        aggregation: Duration,
1022        timeout: Duration,
1023    ) -> Option<UpdateInfo> {
1024        let listener = self
1025            .event_foreground_done
1026            .listen_with_note(|| || "wait for update info".to_string());
1027        let wait_for_finish = {
1028            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1029            if aggregation.is_zero() {
1030                if let Some((duration, tasks)) = update.take() {
1031                    return Some(UpdateInfo {
1032                        duration,
1033                        tasks,
1034                        reasons: take(reason_set),
1035                        placeholder_for_future_fields: (),
1036                    });
1037                } else {
1038                    true
1039                }
1040            } else {
1041                update.is_none()
1042            }
1043        };
1044        if wait_for_finish {
1045            if timeout == Duration::MAX {
1046                // wait for finish
1047                listener.await;
1048            } else {
1049                // wait for start, then wait for finish or timeout
1050                let start_listener = self
1051                    .event_foreground_start
1052                    .listen_with_note(|| || "wait for update info".to_string());
1053                if self
1054                    .currently_scheduled_foreground_jobs
1055                    .load(Ordering::Acquire)
1056                    == 0
1057                {
1058                    start_listener.await;
1059                } else {
1060                    drop(start_listener);
1061                }
1062                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1063                    // Timeout
1064                    return None;
1065                }
1066            }
1067        }
1068        if !aggregation.is_zero() {
1069            loop {
1070                select! {
1071                    () = tokio::time::sleep(aggregation) => {
1072                        break;
1073                    }
1074                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1075                        // Resets the sleep
1076                    }
1077                }
1078            }
1079        }
1080        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1081        if let Some((duration, tasks)) = update.take() {
1082            Some(UpdateInfo {
1083                duration,
1084                tasks,
1085                reasons: take(reason_set),
1086                placeholder_for_future_fields: (),
1087            })
1088        } else {
1089            panic!("aggregated_update_info must not called concurrently")
1090        }
1091    }
1092
1093    pub async fn wait_background_done(&self) {
1094        let listener = self.event_background_done.listen();
1095        if self
1096            .currently_scheduled_background_jobs
1097            .load(Ordering::Acquire)
1098            != 0
1099        {
1100            listener.await;
1101        }
1102    }
1103
1104    pub async fn stop_and_wait(&self) {
1105        turbo_tasks_future_scope(self.pin(), async move {
1106            self.backend.stopping(self);
1107            self.stopped.store(true, Ordering::Release);
1108            {
1109                let listener = self
1110                    .event_foreground_done
1111                    .listen_with_note(|| || "wait for stop".to_string());
1112                if self
1113                    .currently_scheduled_foreground_jobs
1114                    .load(Ordering::Acquire)
1115                    != 0
1116                {
1117                    listener.await;
1118                }
1119            }
1120            {
1121                let listener = self.event_background_done.listen();
1122                if self
1123                    .currently_scheduled_background_jobs
1124                    .load(Ordering::Acquire)
1125                    != 0
1126                {
1127                    listener.await;
1128                }
1129            }
1130            self.backend.stop(self);
1131        })
1132        .await;
1133    }
1134
1135    #[track_caller]
1136    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1137    where
1138        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1139        T::CallOnceFuture: Send,
1140    {
1141        let mut this = self.pin();
1142        this.begin_foreground_job();
1143        tokio::spawn(
1144            TURBO_TASKS
1145                .scope(this.clone(), async move {
1146                    if !this.stopped.load(Ordering::Acquire) {
1147                        this = func(this.clone()).await;
1148                    }
1149                    this.finish_foreground_job();
1150                })
1151                .in_current_span(),
1152        );
1153    }
1154
1155    #[track_caller]
1156    pub(crate) fn schedule_background_job<T>(&self, func: T)
1157    where
1158        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1159        T::CallOnceFuture: Send,
1160    {
1161        let mut this = self.pin();
1162        self.begin_background_job();
1163        tokio::spawn(
1164            TURBO_TASKS
1165                .scope(this.clone(), async move {
1166                    if !this.stopped.load(Ordering::Acquire) {
1167                        this = func(this).await;
1168                    }
1169                    this.finish_background_job();
1170                })
1171                .in_current_span(),
1172        );
1173    }
1174
1175    fn finish_current_task_state(&self) -> FinishedTaskState {
1176        CURRENT_TASK_STATE.with(|cell| {
1177            let current_task_state = &*cell.write().unwrap();
1178            FinishedTaskState {
1179                #[cfg(feature = "verify_determinism")]
1180                stateful: current_task_state.stateful,
1181                has_invalidator: current_task_state.has_invalidator,
1182            }
1183        })
1184    }
1185
1186    pub fn backend(&self) -> &B {
1187        &self.backend
1188    }
1189}
1190
1191struct TurboTasksExecutor;
1192
1193impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1194    type Future = impl Future<Output = ()> + Send + 'static;
1195
1196    fn execute(
1197        &self,
1198        this: &Arc<TurboTasks<B>>,
1199        scheduled_task: ScheduledTask,
1200        priority: TaskPriority,
1201    ) -> Self::Future {
1202        match scheduled_task {
1203            ScheduledTask::Task { task_id, span } => {
1204                let this2 = this.clone();
1205                let this = this.clone();
1206                let future = async move {
1207                    let mut schedule_again = true;
1208                    while schedule_again {
1209                        // it's okay for execution ids to overflow and wrap, they're just used for
1210                        // an assert
1211                        let execution_id = this.execution_id_factory.wrapping_get();
1212                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1213                            task_id,
1214                            execution_id,
1215                            priority,
1216                            false, // in_top_level_task
1217                        )));
1218                        let single_execution_future = async {
1219                            if this.stopped.load(Ordering::Acquire) {
1220                                this.backend.task_execution_canceled(task_id, &*this);
1221                                return false;
1222                            }
1223
1224                            let Some(TaskExecutionSpec { future, span }) = this
1225                                .backend
1226                                .try_start_task_execution(task_id, priority, &*this)
1227                            else {
1228                                return false;
1229                            };
1230
1231                            async {
1232                                let result = CaptureFuture::new(future).await;
1233
1234                                // wait for all spawned local tasks using `local` to finish
1235                                wait_for_local_tasks().await;
1236
1237                                let result = match result {
1238                                    Ok(Ok(raw_vc)) => Ok(raw_vc),
1239                                    Ok(Err(err)) => Err(err.into()),
1240                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1241                                };
1242
1243                                let finihed_state = this.finish_current_task_state();
1244                                let cell_counters = CURRENT_TASK_STATE
1245                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1246                                this.backend.task_execution_completed(
1247                                    task_id,
1248                                    result,
1249                                    &cell_counters,
1250                                    #[cfg(feature = "verify_determinism")]
1251                                    finihed_state.stateful,
1252                                    finihed_state.has_invalidator,
1253                                    &*this,
1254                                )
1255                            }
1256                            .instrument(span)
1257                            .await
1258                        };
1259                        schedule_again = CURRENT_TASK_STATE
1260                            .scope(current_task_state, single_execution_future)
1261                            .await;
1262                    }
1263                    this.finish_foreground_job();
1264                };
1265
1266                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1267            }
1268            ScheduledTask::LocalTask {
1269                ty,
1270                persistence,
1271                local_task_id,
1272                global_task_state,
1273                span,
1274            } => {
1275                let this2 = this.clone();
1276                let this = this.clone();
1277                let task_type = ty.task_type;
1278                let future = async move {
1279                    let span = match &ty.task_type {
1280                        LocalTaskType::ResolveNative { native_fn } => {
1281                            native_fn.resolve_span(priority)
1282                        }
1283                        LocalTaskType::ResolveTrait { trait_method } => {
1284                            trait_method.resolve_span(priority)
1285                        }
1286                    };
1287                    async move {
1288                        let result = match ty.task_type {
1289                            LocalTaskType::ResolveNative { native_fn } => {
1290                                LocalTaskType::run_resolve_native(
1291                                    native_fn,
1292                                    ty.this,
1293                                    &*ty.arg,
1294                                    persistence,
1295                                    this,
1296                                )
1297                                .await
1298                            }
1299                            LocalTaskType::ResolveTrait { trait_method } => {
1300                                LocalTaskType::run_resolve_trait(
1301                                    trait_method,
1302                                    ty.this.unwrap(),
1303                                    &*ty.arg,
1304                                    persistence,
1305                                    this,
1306                                )
1307                                .await
1308                            }
1309                        };
1310
1311                        let output = match result {
1312                            Ok(raw_vc) => OutputContent::Link(raw_vc),
1313                            Err(err) => OutputContent::Error(
1314                                TurboTasksExecutionError::from(err)
1315                                    .with_local_task_context(task_type.to_string()),
1316                            ),
1317                        };
1318
1319                        let local_task = LocalTask::Done { output };
1320
1321                        let done_event = CURRENT_TASK_STATE.with(move |gts| {
1322                            let mut gts_write = gts.write().unwrap();
1323                            let scheduled_task = std::mem::replace(
1324                                gts_write.get_mut_local_task(local_task_id),
1325                                local_task,
1326                            );
1327                            let LocalTask::Scheduled { done_event } = scheduled_task else {
1328                                panic!("local task finished, but was not in the scheduled state?");
1329                            };
1330                            done_event
1331                        });
1332                        done_event.notify(usize::MAX)
1333                    }
1334                    .instrument(span)
1335                    .await
1336                };
1337                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1338
1339                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1340            }
1341        }
1342    }
1343}
1344
1345struct FinishedTaskState {
1346    /// True if the task has state in cells (interior mutability).
1347    /// Only tracked when verify_determinism feature is enabled.
1348    #[cfg(feature = "verify_determinism")]
1349    stateful: bool,
1350
1351    /// True if the task uses an external invalidator
1352    has_invalidator: bool,
1353}
1354
1355impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1356    fn dynamic_call(
1357        &self,
1358        native_fn: &'static NativeFunction,
1359        this: Option<RawVc>,
1360        arg: Box<dyn MagicAny>,
1361        persistence: TaskPersistence,
1362    ) -> RawVc {
1363        self.dynamic_call(native_fn, this, arg, persistence)
1364    }
1365    fn native_call(
1366        &self,
1367        native_fn: &'static NativeFunction,
1368        this: Option<RawVc>,
1369        arg: Box<dyn MagicAny>,
1370        persistence: TaskPersistence,
1371    ) -> RawVc {
1372        self.native_call(native_fn, this, arg, persistence)
1373    }
1374    fn trait_call(
1375        &self,
1376        trait_method: &'static TraitMethod,
1377        this: RawVc,
1378        arg: Box<dyn MagicAny>,
1379        persistence: TaskPersistence,
1380    ) -> RawVc {
1381        self.trait_call(trait_method, this, arg, persistence)
1382    }
1383
1384    #[track_caller]
1385    fn run(
1386        &self,
1387        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1388    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1389        let this = self.pin();
1390        Box::pin(async move { this.run(future).await })
1391    }
1392
1393    #[track_caller]
1394    fn run_once(
1395        &self,
1396        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1397    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1398        let this = self.pin();
1399        Box::pin(async move { this.run_once(future).await })
1400    }
1401
1402    #[track_caller]
1403    fn run_once_with_reason(
1404        &self,
1405        reason: StaticOrArc<dyn InvalidationReason>,
1406        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1407    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1408        {
1409            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1410            reason_set.insert(reason);
1411        }
1412        let this = self.pin();
1413        Box::pin(async move { this.run_once(future).await })
1414    }
1415
1416    #[track_caller]
1417    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1418        self.start_once_process(future)
1419    }
1420
1421    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1422        if let Err(e) = self.compilation_events.send(event) {
1423            tracing::warn!("Failed to send compilation event: {e}");
1424        }
1425    }
1426
1427    fn get_task_name(&self, task: TaskId) -> String {
1428        self.backend.get_task_name(task, self)
1429    }
1430}
1431
1432impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1433    #[instrument(level = "info", skip_all, name = "invalidate")]
1434    fn invalidate(&self, task: TaskId) {
1435        self.backend.invalidate_task(task, self);
1436    }
1437
1438    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1439    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1440        {
1441            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1442            reason_set.insert(reason);
1443        }
1444        self.backend.invalidate_task(task, self);
1445    }
1446
1447    fn invalidate_serialization(&self, task: TaskId) {
1448        self.backend.invalidate_serialization(task, self);
1449    }
1450
1451    #[track_caller]
1452    fn try_read_task_output(
1453        &self,
1454        task: TaskId,
1455        options: ReadOutputOptions,
1456    ) -> Result<Result<RawVc, EventListener>> {
1457        if options.consistency == ReadConsistency::Eventual {
1458            debug_assert_not_in_top_level_task("read_task_output");
1459        }
1460        self.backend.try_read_task_output(
1461            task,
1462            current_task_if_available("reading Vcs"),
1463            options,
1464            self,
1465        )
1466    }
1467
1468    #[track_caller]
1469    fn try_read_task_cell(
1470        &self,
1471        task: TaskId,
1472        index: CellId,
1473        options: ReadCellOptions,
1474    ) -> Result<Result<TypedCellContent, EventListener>> {
1475        let reader = current_task_if_available("reading Vcs");
1476        self.backend
1477            .try_read_task_cell(task, index, reader, options, self)
1478    }
1479
1480    fn try_read_own_task_cell(
1481        &self,
1482        current_task: TaskId,
1483        index: CellId,
1484        options: ReadCellOptions,
1485    ) -> Result<TypedCellContent> {
1486        self.backend
1487            .try_read_own_task_cell(current_task, index, options, self)
1488    }
1489
1490    #[track_caller]
1491    fn try_read_local_output(
1492        &self,
1493        execution_id: ExecutionId,
1494        local_task_id: LocalTaskId,
1495    ) -> Result<Result<RawVc, EventListener>> {
1496        debug_assert_not_in_top_level_task("read_local_output");
1497        CURRENT_TASK_STATE.with(|gts| {
1498            let gts_read = gts.read().unwrap();
1499
1500            // Local Vcs are local to their parent task's current execution, and do not exist
1501            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1502            // marker trait. This assertion exists to handle any potential escapes that the
1503            // compile-time checks cannot capture.
1504            gts_read.assert_execution_id(execution_id);
1505
1506            match gts_read.get_local_task(local_task_id) {
1507                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1508                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1509            }
1510        })
1511    }
1512
1513    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1514        // TODO: Add assert_not_in_top_level_task("read_task_collectibles") check here.
1515        // Collectible reads are eventually consistent.
1516        self.backend.read_task_collectibles(
1517            task,
1518            trait_id,
1519            current_task_if_available("reading collectibles"),
1520            self,
1521        )
1522    }
1523
1524    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1525        self.backend.emit_collectible(
1526            trait_type,
1527            collectible,
1528            current_task("emitting collectible"),
1529            self,
1530        );
1531    }
1532
1533    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1534        self.backend.unemit_collectible(
1535            trait_type,
1536            collectible,
1537            count,
1538            current_task("emitting collectible"),
1539            self,
1540        );
1541    }
1542
1543    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1544        for (&collectible, &count) in collectibles {
1545            if count > 0 {
1546                self.backend.unemit_collectible(
1547                    trait_type,
1548                    collectible,
1549                    count as u32,
1550                    current_task("emitting collectible"),
1551                    self,
1552                );
1553            }
1554        }
1555    }
1556
1557    fn read_own_task_cell(
1558        &self,
1559        task: TaskId,
1560        index: CellId,
1561        options: ReadCellOptions,
1562    ) -> Result<TypedCellContent> {
1563        self.try_read_own_task_cell(task, index, options)
1564    }
1565
1566    fn update_own_task_cell(
1567        &self,
1568        task: TaskId,
1569        index: CellId,
1570        is_serializable_cell_content: bool,
1571        content: CellContent,
1572        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1573        verification_mode: VerificationMode,
1574    ) {
1575        self.backend.update_task_cell(
1576            task,
1577            index,
1578            is_serializable_cell_content,
1579            content,
1580            updated_key_hashes,
1581            verification_mode,
1582            self,
1583        );
1584    }
1585
1586    fn connect_task(&self, task: TaskId) {
1587        self.backend
1588            .connect_task(task, current_task_if_available("connecting task"), self);
1589    }
1590
1591    fn mark_own_task_as_finished(&self, task: TaskId) {
1592        self.backend.mark_own_task_as_finished(task, self);
1593    }
1594
1595    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1596        self.backend.mark_own_task_as_session_dependent(task, self);
1597    }
1598
1599    /// Creates a future that inherits the current task id and task state. The current global task
1600    /// will wait for this future to be dropped before exiting.
1601    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1602        // this is similar to what happens for a local task, except that we keep the local task's
1603        // state as well.
1604        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1605        let fut = tokio::spawn(TURBO_TASKS.scope(
1606            turbo_tasks(),
1607            CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1608        ));
1609        let fut = Box::pin(async move {
1610            fut.await.unwrap();
1611        });
1612        let mut ts = global_task_state.write().unwrap();
1613        ts.local_task_tracker
1614            .get_or_insert_default()
1615            .push(Either::Right(fut));
1616    }
1617
1618    fn task_statistics(&self) -> &TaskStatisticsApi {
1619        self.backend.task_statistics()
1620    }
1621
1622    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1623        let this = self.pin();
1624        Box::pin(async move {
1625            this.stop_and_wait().await;
1626        })
1627    }
1628
1629    fn subscribe_to_compilation_events(
1630        &self,
1631        event_types: Option<Vec<String>>,
1632    ) -> Receiver<Arc<dyn CompilationEvent>> {
1633        self.compilation_events.subscribe(event_types)
1634    }
1635
1636    fn is_tracking_dependencies(&self) -> bool {
1637        self.backend.is_tracking_dependencies()
1638    }
1639}
1640
1641impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1642    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1643        self.pin()
1644    }
1645    fn backend(&self) -> &B {
1646        &self.backend
1647    }
1648
1649    #[track_caller]
1650    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1651        self.schedule_background_job(async move |this| {
1652            this.backend.run_backend_job(job, &*this).await;
1653            this
1654        })
1655    }
1656
1657    #[track_caller]
1658    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1659        self.schedule_foreground_job(async move |this| {
1660            this.backend.run_backend_job(job, &*this).await;
1661            this
1662        })
1663    }
1664
1665    #[track_caller]
1666    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1667        self.schedule(task, priority)
1668    }
1669
1670    fn get_current_task_priority(&self) -> TaskPriority {
1671        CURRENT_TASK_STATE
1672            .try_with(|task_state| task_state.read().unwrap().priority)
1673            .unwrap_or(TaskPriority::initial())
1674    }
1675
1676    fn program_duration_until(&self, instant: Instant) -> Duration {
1677        instant - self.program_start
1678    }
1679
1680    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1681        // SAFETY: This is a fresh id from the factory
1682        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1683    }
1684
1685    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1686        // SAFETY: This is a fresh id from the factory
1687        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1688    }
1689
1690    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1691        unsafe { self.task_id_factory.reuse(id.into()) }
1692    }
1693
1694    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1695        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1696    }
1697
1698    fn is_idle(&self) -> bool {
1699        self.currently_scheduled_foreground_jobs
1700            .load(Ordering::Acquire)
1701            == 0
1702    }
1703}
1704
1705async fn wait_for_local_tasks() {
1706    while let Some(mut ltt) =
1707        CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1708    {
1709        use futures::StreamExt;
1710        while ltt.next().await.is_some() {}
1711    }
1712}
1713
1714pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1715    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1716        Ok(id) => id,
1717        Err(_) => panic!(
1718            "{from} can only be used in the context of a turbo_tasks task execution or \
1719             turbo_tasks run"
1720        ),
1721    }
1722}
1723
1724pub(crate) fn current_task(from: &str) -> TaskId {
1725    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1726        Ok(Some(id)) => id,
1727        Ok(None) | Err(_) => {
1728            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1729        }
1730    }
1731}
1732
1733#[track_caller]
1734fn debug_assert_not_in_top_level_task(operation: &str) {
1735    if !cfg!(debug_assertions) {
1736        return;
1737    }
1738
1739    // HACK: We set this inside of `ReadRawVcFuture` to suppress warnings about an internal
1740    // consistency bug
1741    let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1742        .try_with(|&suppressed| suppressed)
1743        .unwrap_or(false);
1744    if suppressed {
1745        return;
1746    }
1747
1748    let in_top_level = CURRENT_TASK_STATE
1749        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1750        .unwrap_or(false);
1751    if in_top_level {
1752        panic!(
1753            "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1754             Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1755             reads to avoid leaking inconsistent return values."
1756        );
1757    }
1758}
1759
1760pub async fn run<T: Send + 'static>(
1761    tt: Arc<dyn TurboTasksApi>,
1762    future: impl Future<Output = Result<T>> + Send + 'static,
1763) -> Result<T> {
1764    let (tx, rx) = tokio::sync::oneshot::channel();
1765
1766    tt.run(Box::pin(async move {
1767        let result = future.await?;
1768        tx.send(result)
1769            .map_err(|_| anyhow!("unable to send result"))?;
1770        Ok(())
1771    }))
1772    .await?;
1773
1774    Ok(rx.await?)
1775}
1776
1777pub async fn run_once<T: Send + 'static>(
1778    tt: Arc<dyn TurboTasksApi>,
1779    future: impl Future<Output = Result<T>> + Send + 'static,
1780) -> Result<T> {
1781    let (tx, rx) = tokio::sync::oneshot::channel();
1782
1783    tt.run_once(Box::pin(async move {
1784        let result = future.await?;
1785        tx.send(result)
1786            .map_err(|_| anyhow!("unable to send result"))?;
1787        Ok(())
1788    }))
1789    .await?;
1790
1791    Ok(rx.await?)
1792}
1793
1794pub async fn run_once_with_reason<T: Send + 'static>(
1795    tt: Arc<dyn TurboTasksApi>,
1796    reason: impl InvalidationReason,
1797    future: impl Future<Output = Result<T>> + Send + 'static,
1798) -> Result<T> {
1799    let (tx, rx) = tokio::sync::oneshot::channel();
1800
1801    tt.run_once_with_reason(
1802        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1803        Box::pin(async move {
1804            let result = future.await?;
1805            tx.send(result)
1806                .map_err(|_| anyhow!("unable to send result"))?;
1807            Ok(())
1808        }),
1809    )
1810    .await?;
1811
1812    Ok(rx.await?)
1813}
1814
1815/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1816pub fn dynamic_call(
1817    func: &'static NativeFunction,
1818    this: Option<RawVc>,
1819    arg: Box<dyn MagicAny>,
1820    persistence: TaskPersistence,
1821) -> RawVc {
1822    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1823}
1824
1825/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1826pub fn trait_call(
1827    trait_method: &'static TraitMethod,
1828    this: RawVc,
1829    arg: Box<dyn MagicAny>,
1830    persistence: TaskPersistence,
1831) -> RawVc {
1832    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1833}
1834
1835pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1836    TURBO_TASKS.with(|arc| arc.clone())
1837}
1838
1839pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1840    TURBO_TASKS.with(Arc::downgrade)
1841}
1842
1843pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1844    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1845}
1846
1847pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1848    TURBO_TASKS.with(|arc| func(arc))
1849}
1850
1851pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1852    TURBO_TASKS.sync_scope(tt, f)
1853}
1854
1855pub fn turbo_tasks_future_scope<T>(
1856    tt: Arc<dyn TurboTasksApi>,
1857    f: impl Future<Output = T>,
1858) -> impl Future<Output = T> {
1859    TURBO_TASKS.scope(tt, f)
1860}
1861
1862pub fn with_turbo_tasks_for_testing<T>(
1863    tt: Arc<dyn TurboTasksApi>,
1864    current_task: TaskId,
1865    execution_id: ExecutionId,
1866    f: impl Future<Output = T>,
1867) -> impl Future<Output = T> {
1868    TURBO_TASKS.scope(
1869        tt,
1870        CURRENT_TASK_STATE.scope(
1871            Arc::new(RwLock::new(CurrentTaskState::new(
1872                current_task,
1873                execution_id,
1874                TaskPriority::initial(),
1875                false, // in_top_level_task
1876            ))),
1877            f,
1878        ),
1879    )
1880}
1881
1882/// Spawns the given future within the context of the current task.
1883///
1884/// Beware: this method is not safe to use in production code. It is only
1885/// intended for use in tests and for debugging purposes.
1886pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1887    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1888}
1889
1890pub fn current_task_for_testing() -> Option<TaskId> {
1891    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1892}
1893
1894/// Marks the current task as dirty when restored from filesystem cache.
1895pub fn mark_session_dependent() {
1896    with_turbo_tasks(|tt| {
1897        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1898    });
1899}
1900
1901/// Marks the current task as finished. This excludes it from waiting for
1902/// strongly consistency.
1903pub fn mark_finished() {
1904    with_turbo_tasks(|tt| {
1905        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1906    });
1907}
1908
1909/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1910/// serialization of the current task cells.
1911///
1912/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1913/// since State allocation implies interior mutability.
1914pub fn get_serialization_invalidator() -> SerializationInvalidator {
1915    CURRENT_TASK_STATE.with(|cell| {
1916        let CurrentTaskState {
1917            task_id,
1918            #[cfg(feature = "verify_determinism")]
1919            stateful,
1920            ..
1921        } = &mut *cell.write().unwrap();
1922        #[cfg(feature = "verify_determinism")]
1923        {
1924            *stateful = true;
1925        }
1926        let Some(task_id) = *task_id else {
1927            panic!(
1928                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1929                 task execution"
1930            );
1931        };
1932        SerializationInvalidator::new(task_id)
1933    })
1934}
1935
1936pub fn mark_invalidator() {
1937    CURRENT_TASK_STATE.with(|cell| {
1938        let CurrentTaskState {
1939            has_invalidator, ..
1940        } = &mut *cell.write().unwrap();
1941        *has_invalidator = true;
1942    })
1943}
1944
1945/// Marks the current task as stateful. This is used to indicate that the task
1946/// has interior mutability (e.g., via State or TransientState), which means
1947/// the task may produce different outputs even with the same inputs.
1948///
1949/// Only has an effect when the `verify_determinism` feature is enabled.
1950pub fn mark_stateful() {
1951    #[cfg(feature = "verify_determinism")]
1952    {
1953        CURRENT_TASK_STATE.with(|cell| {
1954            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1955            *stateful = true;
1956        })
1957    }
1958    // No-op when verify_determinism is not enabled
1959}
1960
1961/// Marks the current task context as being in a top-level task. When in a top-level task,
1962/// eventually consistent reads will panic. It is almost always a mistake to perform an eventually
1963/// consistent read at the top-level of the application.
1964pub fn mark_top_level_task() {
1965    if cfg!(debug_assertions) {
1966        CURRENT_TASK_STATE.with(|cell| {
1967            cell.write().unwrap().in_top_level_task = true;
1968        })
1969    }
1970}
1971
1972/// Unmarks the current task context as being in a top-level task. The opposite of
1973/// [`mark_top_level_task`].
1974///
1975/// This utility can be okay in unit tests, where we're observing the internal behavior of
1976/// turbo-tasks, but otherwise, it is probably a mistake to call this function.
1977///
1978/// Calling this will allow eventually-consistent reads at the top-level, potentially exposing
1979/// incomplete computations and internal errors caused by eventual consistency that would've been
1980/// caught when the function was re-run. A strongly-consistent read re-runs parts of a task until
1981/// all of the dependencies have settled.
1982pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1983    if cfg!(debug_assertions) {
1984        CURRENT_TASK_STATE.with(|cell| {
1985            cell.write().unwrap().in_top_level_task = false;
1986        })
1987    }
1988}
1989
1990pub fn prevent_gc() {
1991    // TODO implement garbage collection
1992}
1993
1994pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1995    with_turbo_tasks(|tt| {
1996        let raw_vc = collectible.node.node;
1997        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1998    })
1999}
2000
2001pub(crate) async fn read_task_output(
2002    this: &dyn TurboTasksApi,
2003    id: TaskId,
2004    options: ReadOutputOptions,
2005) -> Result<RawVc> {
2006    loop {
2007        match this.try_read_task_output(id, options)? {
2008            Ok(result) => return Ok(result),
2009            Err(listener) => listener.await,
2010        }
2011    }
2012}
2013
2014/// A reference to a task's cell with methods that allow updating the contents
2015/// of the cell.
2016///
2017/// Mutations should not outside of the task that that owns this cell. Doing so
2018/// is a logic error, and may lead to incorrect caching behavior.
2019#[derive(Clone, Copy)]
2020pub struct CurrentCellRef {
2021    current_task: TaskId,
2022    index: CellId,
2023    is_serializable_cell_content: bool,
2024}
2025
2026type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2027
2028impl CurrentCellRef {
2029    /// Updates the cell if the given `functor` returns a value.
2030    fn conditional_update<T>(
2031        &self,
2032        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>)>,
2033    ) where
2034        T: VcValueType,
2035    {
2036        self.conditional_update_with_shared_reference(|old_shared_reference| {
2037            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2038            let (new_value, updated_key_hashes) = functor(old_ref)?;
2039            Some((
2040                SharedReference::new(triomphe::Arc::new(new_value)),
2041                updated_key_hashes,
2042            ))
2043        })
2044    }
2045
2046    /// Updates the cell if the given `functor` returns a `SharedReference`.
2047    fn conditional_update_with_shared_reference(
2048        &self,
2049        functor: impl FnOnce(
2050            Option<&SharedReference>,
2051        ) -> Option<(SharedReference, Option<SmallVec<[u64; 2]>>)>,
2052    ) {
2053        let tt = turbo_tasks();
2054        let cell_content = tt
2055            .read_own_task_cell(
2056                self.current_task,
2057                self.index,
2058                ReadCellOptions {
2059                    // INVALIDATION: Reading our own cell must be untracked
2060                    tracking: ReadCellTracking::Untracked,
2061                    is_serializable_cell_content: self.is_serializable_cell_content,
2062                    final_read_hint: false,
2063                },
2064            )
2065            .ok();
2066        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2067        if let Some((update, updated_key_hashes)) = update {
2068            tt.update_own_task_cell(
2069                self.current_task,
2070                self.index,
2071                self.is_serializable_cell_content,
2072                CellContent(Some(update)),
2073                updated_key_hashes,
2074                VerificationMode::EqualityCheck,
2075            )
2076        }
2077    }
2078
2079    /// Replace the current cell's content with `new_value` if the current content is not equal by
2080    /// value with the existing content.
2081    ///
2082    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
2083    ///
2084    /// Take this example of a custom equality implementation on a transparent wrapper type:
2085    ///
2086    /// ```
2087    /// #[turbo_tasks::value(transparent, eq = "manual")]
2088    /// struct Wrapper(Vec<u32>);
2089    ///
2090    /// impl PartialEq for Wrapper {
2091    ///     fn eq(&self, other: Wrapper) {
2092    ///         // Example: order doesn't matter for equality
2093    ///         let (mut this, mut other) = (self.clone(), other.clone());
2094    ///         this.sort_unstable();
2095    ///         other.sort_unstable();
2096    ///         this == other
2097    ///     }
2098    /// }
2099    ///
2100    /// impl Eq for Wrapper {}
2101    /// ```
2102    ///
2103    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2104    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2105    ///
2106    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2107    /// just forwards to the inner value's [`PartialEq`].
2108    ///
2109    /// If you already have a `SharedReference`, consider calling
2110    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2111    /// object.
2112    pub fn compare_and_update<T>(&self, new_value: T)
2113    where
2114        T: PartialEq + VcValueType,
2115    {
2116        self.conditional_update(|old_value| {
2117            if let Some(old_value) = old_value
2118                && old_value == &new_value
2119            {
2120                return None;
2121            }
2122            Some((new_value, None))
2123        });
2124    }
2125
2126    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2127    /// equal by value with the existing content.
2128    ///
2129    /// If you already have a `SharedReference`, this is a faster version of
2130    /// [`CurrentCellRef::compare_and_update`].
2131    ///
2132    /// The value should be stored in [`SharedReference`] using the type `T`.
2133    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2134    where
2135        T: VcValueType + PartialEq,
2136    {
2137        self.conditional_update_with_shared_reference(|old_sr| {
2138            if let Some(old_sr) = old_sr {
2139                let old_value = extract_sr_value::<T>(old_sr);
2140                let new_value = extract_sr_value::<T>(&new_shared_reference);
2141                if old_value == new_value {
2142                    return None;
2143                }
2144            }
2145            Some((new_shared_reference, None))
2146        });
2147    }
2148
2149    /// See [`Self::compare_and_update`], but selectively update individual keys.
2150    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2151    where
2152        T: PartialEq + VcValueType,
2153        VcReadTarget<T>: KeyedEq,
2154        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2155    {
2156        self.conditional_update(|old_value| {
2157            let Some(old_value) = old_value else {
2158                return Some((new_value, None));
2159            };
2160            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2161            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2162            let updated_keys = old_value.different_keys(new_value_ref);
2163            if updated_keys.is_empty() {
2164                return None;
2165            }
2166            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2167            let updated_key_hashes = updated_keys
2168                .into_iter()
2169                .map(|key| FxBuildHasher.hash_one(key))
2170                .collect();
2171            Some((new_value, Some(updated_key_hashes)))
2172        });
2173    }
2174
2175    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2176    /// keys.
2177    pub fn keyed_compare_and_update_with_shared_reference<T>(
2178        &self,
2179        new_shared_reference: SharedReference,
2180    ) where
2181        T: VcValueType + PartialEq,
2182        VcReadTarget<T>: KeyedEq,
2183        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2184    {
2185        self.conditional_update_with_shared_reference(|old_sr| {
2186            let Some(old_sr) = old_sr else {
2187                return Some((new_shared_reference, None));
2188            };
2189            let old_value = extract_sr_value::<T>(old_sr);
2190            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2191            let new_value = extract_sr_value::<T>(&new_shared_reference);
2192            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2193            let updated_keys = old_value.different_keys(new_value);
2194            if updated_keys.is_empty() {
2195                return None;
2196            }
2197            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2198            let updated_key_hashes = updated_keys
2199                .into_iter()
2200                .map(|key| FxBuildHasher.hash_one(key))
2201                .collect();
2202            Some((new_shared_reference, Some(updated_key_hashes)))
2203        });
2204    }
2205
2206    /// Unconditionally updates the content of the cell.
2207    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2208    where
2209        T: VcValueType,
2210    {
2211        let tt = turbo_tasks();
2212        tt.update_own_task_cell(
2213            self.current_task,
2214            self.index,
2215            self.is_serializable_cell_content,
2216            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2217            None,
2218            verification_mode,
2219        )
2220    }
2221
2222    /// A faster version of [`Self::update`] if you already have a
2223    /// [`SharedReference`].
2224    ///
2225    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2226    /// by identity, no update is performed.
2227    ///
2228    /// The value should be stored in [`SharedReference`] using the type `T`.
2229    pub fn update_with_shared_reference(
2230        &self,
2231        shared_ref: SharedReference,
2232        verification_mode: VerificationMode,
2233    ) {
2234        let tt = turbo_tasks();
2235        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2236            let content = tt
2237                .read_own_task_cell(
2238                    self.current_task,
2239                    self.index,
2240                    ReadCellOptions {
2241                        // INVALIDATION: Reading our own cell must be untracked
2242                        tracking: ReadCellTracking::Untracked,
2243                        is_serializable_cell_content: self.is_serializable_cell_content,
2244                        final_read_hint: false,
2245                    },
2246                )
2247                .ok();
2248            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2249                // pointer equality (not value equality)
2250                shared_ref_exp != shared_ref
2251            } else {
2252                true
2253            }
2254        } else {
2255            true
2256        };
2257        if update {
2258            tt.update_own_task_cell(
2259                self.current_task,
2260                self.index,
2261                self.is_serializable_cell_content,
2262                CellContent(Some(shared_ref)),
2263                None,
2264                verification_mode,
2265            )
2266        }
2267    }
2268}
2269
2270impl From<CurrentCellRef> for RawVc {
2271    fn from(cell: CurrentCellRef) -> Self {
2272        RawVc::TaskCell(cell.current_task, cell.index)
2273    }
2274}
2275
2276fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2277    sr.0.downcast_ref::<T>()
2278        .expect("cannot update SharedReference of different type")
2279}
2280
2281pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2282    find_cell_by_id(T::get_value_type_id(), T::has_serialization())
2283}
2284
2285pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
2286    CURRENT_TASK_STATE.with(|ts| {
2287        let current_task = current_task("celling turbo_tasks values");
2288        let mut ts = ts.write().unwrap();
2289        let map = ts.cell_counters.as_mut().unwrap();
2290        let current_index = map.entry(ty).or_default();
2291        let index = *current_index;
2292        *current_index += 1;
2293        CurrentCellRef {
2294            current_task,
2295            index: CellId { type_id: ty, index },
2296            is_serializable_cell_content,
2297        }
2298    })
2299}
2300
2301pub(crate) async fn read_local_output(
2302    this: &dyn TurboTasksApi,
2303    execution_id: ExecutionId,
2304    local_task_id: LocalTaskId,
2305) -> Result<RawVc> {
2306    loop {
2307        match this.try_read_local_output(execution_id, local_task_id)? {
2308            Ok(raw_vc) => return Ok(raw_vc),
2309            Err(event_listener) => event_listener.await,
2310        }
2311    }
2312}