Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    pin::Pin,
8    sync::{
9        Arc, Mutex, RwLock, Weak,
10        atomic::{AtomicBool, AtomicUsize, Ordering},
11    },
12    time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
26
27use crate::{
28    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
29    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
30    VcValueTrait, VcValueType,
31    backend::{
32        Backend, CachedTaskType, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec,
33        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
34    },
35    capture_future::CaptureFuture,
36    event::{Event, EventListener},
37    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
38    id_factory::IdFactoryWithReuse,
39    keyed::KeyedEq,
40    macro_helpers::NativeFunction,
41    magic_any::MagicAny,
42    message_queue::{CompilationEvent, CompilationEventQueue},
43    priority_runner::{Executor, JoinHandle, PriorityRunner},
44    raw_vc::{CellId, RawVc},
45    registry,
46    serialization_invalidation::SerializationInvalidator,
47    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
48    task_statistics::TaskStatisticsApi,
49    trace::TraceRawVcs,
50    util::{IdFactory, StaticOrArc},
51};
52
53/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
54/// tasks from function calls.
55pub trait TurboTasksCallApi: Sync + Send {
56    /// Calls a native function with arguments. Resolves arguments when needed
57    /// with a wrapper task.
58    fn dynamic_call(
59        &self,
60        native_fn: &'static NativeFunction,
61        this: Option<RawVc>,
62        arg: Box<dyn MagicAny>,
63        persistence: TaskPersistence,
64    ) -> RawVc;
65    /// Call a native function with arguments.
66    /// All inputs must be resolved.
67    fn native_call(
68        &self,
69        native_fn: &'static NativeFunction,
70        this: Option<RawVc>,
71        arg: Box<dyn MagicAny>,
72        persistence: TaskPersistence,
73    ) -> RawVc;
74    /// Calls a trait method with arguments. First input is the `self` object.
75    /// Uses a wrapper task to resolve
76    fn trait_call(
77        &self,
78        trait_method: &'static TraitMethod,
79        this: RawVc,
80        arg: Box<dyn MagicAny>,
81        persistence: TaskPersistence,
82    ) -> RawVc;
83
84    fn run(
85        &self,
86        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
87    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
88    fn run_once(
89        &self,
90        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
91    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
92    fn run_once_with_reason(
93        &self,
94        reason: StaticOrArc<dyn InvalidationReason>,
95        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
96    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
97    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
98
99    /// Sends a compilation event to subscribers.
100    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
101
102    /// Returns a human-readable name for the given task.
103    fn get_task_name(&self, task: TaskId) -> String;
104}
105
106/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
107/// context. Returned by the [`turbo_tasks`] helper function.
108///
109/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
110/// parameter.
111pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
112    fn invalidate(&self, task: TaskId);
113    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
114
115    fn invalidate_serialization(&self, task: TaskId);
116
117    fn try_read_task_output(
118        &self,
119        task: TaskId,
120        options: ReadOutputOptions,
121    ) -> Result<Result<RawVc, EventListener>>;
122
123    fn try_read_task_cell(
124        &self,
125        task: TaskId,
126        index: CellId,
127        options: ReadCellOptions,
128    ) -> Result<Result<TypedCellContent, EventListener>>;
129
130    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
131    /// task points to.
132    ///
133    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
134    /// recursively or in a loop.
135    ///
136    /// This does not accept a consistency argument, as you cannot control consistency of a read of
137    /// an operation owned by your own task. Strongly consistent reads are only allowed on
138    /// [`OperationVc`]s, which should never be local tasks.
139    ///
140    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
141    /// task to depend on itself.
142    ///
143    /// [`OperationVc`]: crate::OperationVc
144    fn try_read_local_output(
145        &self,
146        execution_id: ExecutionId,
147        local_task_id: LocalTaskId,
148    ) -> Result<Result<RawVc, EventListener>>;
149
150    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
151
152    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
153    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
154    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
155
156    /// INVALIDATION: Be careful with this, it will not track dependencies, so
157    /// using it could break cache invalidation.
158    fn try_read_own_task_cell(
159        &self,
160        current_task: TaskId,
161        index: CellId,
162        options: ReadCellOptions,
163    ) -> Result<TypedCellContent>;
164
165    fn read_own_task_cell(
166        &self,
167        task: TaskId,
168        index: CellId,
169        options: ReadCellOptions,
170    ) -> Result<TypedCellContent>;
171    fn update_own_task_cell(
172        &self,
173        task: TaskId,
174        index: CellId,
175        is_serializable_cell_content: bool,
176        content: CellContent,
177        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
178        content_hash: Option<CellHash>,
179        verification_mode: VerificationMode,
180    );
181    fn mark_own_task_as_finished(&self, task: TaskId);
182    fn mark_own_task_as_session_dependent(&self, task: TaskId);
183
184    fn connect_task(&self, task: TaskId);
185
186    /// Wraps the given future in the current task.
187    ///
188    /// Beware: this method is not safe to use in production code. It is only intended for use in
189    /// tests and for debugging purposes.
190    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
191
192    fn task_statistics(&self) -> &TaskStatisticsApi;
193
194    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
195
196    fn subscribe_to_compilation_events(
197        &self,
198        event_types: Option<Vec<String>>,
199    ) -> Receiver<Arc<dyn CompilationEvent>>;
200
201    // Returns true if TurboTasks is configured to track dependencies.
202    fn is_tracking_dependencies(&self) -> bool;
203}
204
205/// A wrapper around a value that is unused.
206pub struct Unused<T> {
207    inner: T,
208}
209
210impl<T> Unused<T> {
211    /// Creates a new unused value.
212    ///
213    /// # Safety
214    ///
215    /// The wrapped value must not be used.
216    pub unsafe fn new_unchecked(inner: T) -> Self {
217        Self { inner }
218    }
219
220    /// Get the inner value, without consuming the `Unused` wrapper.
221    ///
222    /// # Safety
223    ///
224    /// The user need to make sure that the value stays unused.
225    pub unsafe fn get_unchecked(&self) -> &T {
226        &self.inner
227    }
228
229    /// Unwraps the value, consuming the `Unused` wrapper.
230    pub fn into(self) -> T {
231        self.inner
232    }
233}
234
235/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
236pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
237    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
238
239    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
240    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
241    /// # Safety
242    ///
243    /// The caller must ensure that the task id is not used anymore.
244    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
245    /// # Safety
246    ///
247    /// The caller must ensure that the task id is not used anymore.
248    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
249
250    /// Schedule a task for execution.
251    fn schedule(&self, task: TaskId, priority: TaskPriority);
252
253    /// Returns the priority of the current task.
254    fn get_current_task_priority(&self) -> TaskPriority;
255
256    /// Schedule a foreground backend job for execution.
257    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
258
259    /// Schedule a background backend job for execution.
260    ///
261    /// Background jobs are not counted towards activeness of the system. The system is considered
262    /// idle even with active background jobs.
263    fn schedule_backend_background_job(&self, job: B::BackendJob);
264
265    /// Returns the duration from the start of the program to the given instant.
266    fn program_duration_until(&self, instant: Instant) -> Duration;
267
268    /// Returns true if the system is idle.
269    fn is_idle(&self) -> bool;
270
271    /// Returns a reference to the backend.
272    fn backend(&self) -> &B;
273}
274
275#[allow(clippy::manual_non_exhaustive)]
276pub struct UpdateInfo {
277    pub duration: Duration,
278    pub tasks: usize,
279    pub reasons: InvalidationReasonSet,
280    #[allow(dead_code)]
281    placeholder_for_future_fields: (),
282}
283
284#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
285pub enum TaskPersistence {
286    /// Tasks that may be persisted across sessions using serialization.
287    Persistent,
288
289    /// Tasks that will be persisted in memory for the life of this session, but won't persist
290    /// between sessions.
291    ///
292    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
293    /// type [`TransientValue`][crate::value::TransientValue] or
294    /// [`TransientInstance`][crate::value::TransientInstance].
295    Transient,
296}
297
298impl Display for TaskPersistence {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        match self {
301            TaskPersistence::Persistent => write!(f, "persistent"),
302            TaskPersistence::Transient => write!(f, "transient"),
303        }
304    }
305}
306
307#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
308pub enum ReadConsistency {
309    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
310    /// may later trigger re-computation.
311    #[default]
312    Eventual,
313    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
314    /// the cost of slower reads.
315    ///
316    /// Top-level code that returns data to the user should use strongly consistent reads.
317    Strong,
318}
319
320#[derive(Clone, Copy, Debug, Eq, PartialEq)]
321pub enum ReadCellTracking {
322    /// Reads are tracked as dependencies of the current task.
323    Tracked {
324        /// The key used for the dependency
325        key: Option<u64>,
326    },
327    /// The read is only tracked when there is an error, otherwise it is untracked.
328    ///
329    /// INVALIDATION: Be careful with this, it will not track dependencies, so
330    /// using it could break cache invalidation.
331    TrackOnlyError,
332    /// The read is not tracked as a dependency of the current task.
333    ///
334    /// INVALIDATION: Be careful with this, it will not track dependencies, so
335    /// using it could break cache invalidation.
336    Untracked,
337}
338
339impl ReadCellTracking {
340    pub fn should_track(&self, is_err: bool) -> bool {
341        match self {
342            ReadCellTracking::Tracked { .. } => true,
343            ReadCellTracking::TrackOnlyError => is_err,
344            ReadCellTracking::Untracked => false,
345        }
346    }
347
348    pub fn key(&self) -> Option<u64> {
349        match self {
350            ReadCellTracking::Tracked { key } => *key,
351            ReadCellTracking::TrackOnlyError => None,
352            ReadCellTracking::Untracked => None,
353        }
354    }
355}
356
357impl Default for ReadCellTracking {
358    fn default() -> Self {
359        ReadCellTracking::Tracked { key: None }
360    }
361}
362
363impl Display for ReadCellTracking {
364    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365        match self {
366            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
367            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
368            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
369            ReadCellTracking::Untracked => write!(f, "untracked"),
370        }
371    }
372}
373
374#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
375pub enum ReadTracking {
376    /// Reads are tracked as dependencies of the current task.
377    #[default]
378    Tracked,
379    /// The read is only tracked when there is an error, otherwise it is untracked.
380    ///
381    /// INVALIDATION: Be careful with this, it will not track dependencies, so
382    /// using it could break cache invalidation.
383    TrackOnlyError,
384    /// The read is not tracked as a dependency of the current task.
385    ///
386    /// INVALIDATION: Be careful with this, it will not track dependencies, so
387    /// using it could break cache invalidation.
388    Untracked,
389}
390
391impl ReadTracking {
392    pub fn should_track(&self, is_err: bool) -> bool {
393        match self {
394            ReadTracking::Tracked => true,
395            ReadTracking::TrackOnlyError => is_err,
396            ReadTracking::Untracked => false,
397        }
398    }
399}
400
401impl Display for ReadTracking {
402    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403        match self {
404            ReadTracking::Tracked => write!(f, "tracked"),
405            ReadTracking::TrackOnlyError => write!(f, "track only error"),
406            ReadTracking::Untracked => write!(f, "untracked"),
407        }
408    }
409}
410
411#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
412pub enum TaskPriority {
413    #[default]
414    Initial,
415    Invalidation {
416        priority: Reverse<u32>,
417    },
418}
419
420impl TaskPriority {
421    pub fn invalidation(priority: u32) -> Self {
422        Self::Invalidation {
423            priority: Reverse(priority),
424        }
425    }
426
427    pub fn initial() -> Self {
428        Self::Initial
429    }
430
431    pub fn leaf() -> Self {
432        Self::Invalidation {
433            priority: Reverse(0),
434        }
435    }
436
437    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
438        match self {
439            TaskPriority::Initial => parent_priority,
440            TaskPriority::Invalidation { priority } => {
441                if let TaskPriority::Invalidation {
442                    priority: parent_priority,
443                } = parent_priority
444                    && priority.0 < parent_priority.0
445                {
446                    Self::Invalidation {
447                        priority: Reverse(parent_priority.0.saturating_add(1)),
448                    }
449                } else {
450                    *self
451                }
452            }
453        }
454    }
455}
456
457impl Display for TaskPriority {
458    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459        match self {
460            TaskPriority::Initial => write!(f, "initial"),
461            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
462        }
463    }
464}
465
466enum ScheduledTask {
467    Task {
468        task_id: TaskId,
469        span: Span,
470    },
471    LocalTask {
472        ty: LocalTaskSpec,
473        persistence: TaskPersistence,
474        local_task_id: LocalTaskId,
475        global_task_state: Arc<RwLock<CurrentTaskState>>,
476        span: Span,
477    },
478}
479
480pub struct TurboTasks<B: Backend + 'static> {
481    this: Weak<Self>,
482    backend: B,
483    task_id_factory: IdFactoryWithReuse<TaskId>,
484    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
485    execution_id_factory: IdFactory<ExecutionId>,
486    stopped: AtomicBool,
487    currently_scheduled_foreground_jobs: AtomicUsize,
488    currently_scheduled_background_jobs: AtomicUsize,
489    scheduled_tasks: AtomicUsize,
490    priority_runner:
491        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
492    start: Mutex<Option<Instant>>,
493    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
494    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
495    event_foreground_start: Event,
496    /// Event that is triggered when all foreground jobs are done
497    /// (currently_scheduled_foreground_jobs becomes zero)
498    event_foreground_done: Event,
499    /// Event that is triggered when all background jobs are done
500    event_background_done: Event,
501    program_start: Instant,
502    compilation_events: CompilationEventQueue,
503}
504
505type LocalTaskTracker = Option<
506    FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
507>;
508
509/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
510/// all share the same non-local task state.
511///
512/// A non-local task is one that:
513///
514/// - Has a unique task id.
515/// - Is potentially cached.
516/// - The backend is aware of.
517struct CurrentTaskState {
518    task_id: Option<TaskId>,
519    execution_id: ExecutionId,
520    priority: TaskPriority,
521
522    /// True if the current task has state in cells (interior mutability).
523    /// Only tracked when verify_determinism feature is enabled.
524    #[cfg(feature = "verify_determinism")]
525    stateful: bool,
526
527    /// True if the current task uses an external invalidator
528    has_invalidator: bool,
529
530    /// True if we're in a top-level task (e.g. `.run_once(...)` or `.run(...)`).
531    /// Eventually consistent reads are not allowed in top-level tasks.
532    in_top_level_task: bool,
533
534    /// Tracks how many cells of each type has been allocated so far during this task execution.
535    /// When a task is re-executed, the cell count may not match the existing cell vec length.
536    ///
537    /// This is taken (and becomes `None`) during teardown of a task.
538    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
539
540    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
541    local_tasks: Vec<LocalTask>,
542
543    /// Tracks currently running local tasks, and defers cleanup of the global task until those
544    /// complete. Also used by `spawn_detached_for_testing`.
545    local_task_tracker: LocalTaskTracker,
546}
547
548impl CurrentTaskState {
549    fn new(
550        task_id: TaskId,
551        execution_id: ExecutionId,
552        priority: TaskPriority,
553        in_top_level_task: bool,
554    ) -> Self {
555        Self {
556            task_id: Some(task_id),
557            execution_id,
558            priority,
559            #[cfg(feature = "verify_determinism")]
560            stateful: false,
561            has_invalidator: false,
562            in_top_level_task,
563            cell_counters: Some(AutoMap::default()),
564            local_tasks: Vec::new(),
565            local_task_tracker: None,
566        }
567    }
568
569    fn new_temporary(
570        execution_id: ExecutionId,
571        priority: TaskPriority,
572        in_top_level_task: bool,
573    ) -> Self {
574        Self {
575            task_id: None,
576            execution_id,
577            priority,
578            #[cfg(feature = "verify_determinism")]
579            stateful: false,
580            has_invalidator: false,
581            in_top_level_task,
582            cell_counters: None,
583            local_tasks: Vec::new(),
584            local_task_tracker: None,
585        }
586    }
587
588    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
589        if self.execution_id != expected_execution_id {
590            panic!(
591                "Local tasks can only be scheduled/awaited within the same execution of the \
592                 parent task that created them"
593            );
594        }
595    }
596
597    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
598        self.local_tasks.push(local_task);
599        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
600        if cfg!(debug_assertions) {
601            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
602        } else {
603            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
604        }
605    }
606
607    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
608        // local task ids are one-indexed (they use NonZeroU32)
609        &self.local_tasks[(*local_task_id as usize) - 1]
610    }
611
612    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
613        &mut self.local_tasks[(*local_task_id as usize) - 1]
614    }
615}
616
617// TODO implement our own thread pool and make these thread locals instead
618task_local! {
619    /// The current TurboTasks instance
620    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
621
622    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
623
624    /// Temporarily suppresses the eventual consistency check in top-level tasks.
625    /// This is used by strongly consistent reads to allow them to succeed in top-level tasks.
626    /// This is NOT shared across local tasks (unlike CURRENT_TASK_STATE), so it's safe
627    /// to set/unset without race conditions.
628    pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
629}
630
631impl<B: Backend + 'static> TurboTasks<B> {
632    // TODO better lifetime management for turbo tasks
633    // consider using unsafe for the task_local turbo tasks
634    // that should be safe as long tasks can't outlife turbo task
635    // so we probably want to make sure that all tasks are joined
636    // when trying to drop turbo tasks
637    pub fn new(backend: B) -> Arc<Self> {
638        let task_id_factory = IdFactoryWithReuse::new(
639            TaskId::MIN,
640            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
641        );
642        let transient_task_id_factory =
643            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
644        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
645        let this = Arc::new_cyclic(|this| Self {
646            this: this.clone(),
647            backend,
648            task_id_factory,
649            transient_task_id_factory,
650            execution_id_factory,
651            stopped: AtomicBool::new(false),
652            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
653            currently_scheduled_background_jobs: AtomicUsize::new(0),
654            scheduled_tasks: AtomicUsize::new(0),
655            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
656            start: Default::default(),
657            aggregated_update: Default::default(),
658            event_foreground_done: Event::new(|| {
659                || "TurboTasks::event_foreground_done".to_string()
660            }),
661            event_foreground_start: Event::new(|| {
662                || "TurboTasks::event_foreground_start".to_string()
663            }),
664            event_background_done: Event::new(|| {
665                || "TurboTasks::event_background_done".to_string()
666            }),
667            program_start: Instant::now(),
668            compilation_events: CompilationEventQueue::default(),
669        });
670        this.backend.startup(&*this);
671        this
672    }
673
674    pub fn pin(&self) -> Arc<Self> {
675        self.this.upgrade().unwrap()
676    }
677
678    /// Creates a new root task
679    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
680    where
681        T: ?Sized,
682        F: Fn() -> Fut + Send + Sync + Clone + 'static,
683        Fut: Future<Output = Result<Vc<T>>> + Send,
684    {
685        let id = self.backend.create_transient_task(
686            TransientTaskType::Root(Box::new(move || {
687                let functor = functor.clone();
688                Box::pin(async move {
689                    mark_top_level_task();
690                    let raw_vc = functor().await?.node;
691                    raw_vc.to_non_local().await
692                })
693            })),
694            self,
695        );
696        self.schedule(id, TaskPriority::initial());
697        id
698    }
699
700    pub fn dispose_root_task(&self, task_id: TaskId) {
701        self.backend.dispose_root_task(task_id, self);
702    }
703
704    // TODO make sure that all dependencies settle before reading them
705    /// Creates a new root task, that is only executed once.
706    /// Dependencies will not invalidate the task.
707    #[track_caller]
708    fn spawn_once_task<T, Fut>(&self, future: Fut)
709    where
710        T: ?Sized,
711        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
712    {
713        let id = self.backend.create_transient_task(
714            TransientTaskType::Once(Box::pin(async move {
715                mark_top_level_task();
716                let raw_vc = future.await?.node;
717                raw_vc.to_non_local().await
718            })),
719            self,
720        );
721        self.schedule(id, TaskPriority::initial());
722    }
723
724    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
725        &self,
726        future: impl Future<Output = Result<T>> + Send + 'static,
727    ) -> Result<T> {
728        let (tx, rx) = tokio::sync::oneshot::channel();
729        self.spawn_once_task(async move {
730            mark_top_level_task();
731            let result = future.await;
732            tx.send(result)
733                .map_err(|_| anyhow!("unable to send result"))?;
734            Ok(Completion::new())
735        });
736
737        rx.await?
738    }
739
740    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
741    pub async fn run<T: TraceRawVcs + Send + 'static>(
742        &self,
743        future: impl Future<Output = Result<T>> + Send + 'static,
744    ) -> Result<T, TurboTasksExecutionError> {
745        self.begin_foreground_job();
746        // it's okay for execution ids to overflow and wrap, they're just used for an assert
747        let execution_id = self.execution_id_factory.wrapping_get();
748        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
749            execution_id,
750            TaskPriority::initial(),
751            true, // in_top_level_task
752        )));
753
754        let result = TURBO_TASKS
755            .scope(
756                self.pin(),
757                CURRENT_TASK_STATE.scope(current_task_state, async {
758                    let result = CaptureFuture::new(future).await;
759
760                    // wait for all spawned local tasks using `local` to finish
761                    wait_for_local_tasks().await;
762
763                    match result {
764                        Ok(Ok(raw_vc)) => Ok(raw_vc),
765                        Ok(Err(err)) => Err(err.into()),
766                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
767                    }
768                }),
769            )
770            .await;
771        self.finish_foreground_job();
772        result
773    }
774
775    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
776        let this = self.pin();
777        tokio::spawn(async move {
778            this.pin()
779                .run_once(async move {
780                    this.finish_foreground_job();
781                    future.await;
782                    this.begin_foreground_job();
783                    Ok(())
784                })
785                .await
786                .unwrap()
787        });
788    }
789
790    pub(crate) fn native_call(
791        &self,
792        native_fn: &'static NativeFunction,
793        this: Option<RawVc>,
794        arg: Box<dyn MagicAny>,
795        persistence: TaskPersistence,
796    ) -> RawVc {
797        let task_type = CachedTaskType {
798            native_fn,
799            this,
800            arg,
801        };
802        RawVc::TaskOutput(match persistence {
803            TaskPersistence::Transient => self.backend.get_or_create_transient_task(
804                task_type,
805                current_task_if_available("turbo_function calls"),
806                self,
807            ),
808            TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
809                task_type,
810                current_task_if_available("turbo_function calls"),
811                self,
812            ),
813        })
814    }
815
816    pub fn dynamic_call(
817        &self,
818        native_fn: &'static NativeFunction,
819        this: Option<RawVc>,
820        arg: Box<dyn MagicAny>,
821        persistence: TaskPersistence,
822    ) -> RawVc {
823        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
824            return self.native_call(native_fn, this, arg, persistence);
825        }
826        let task_type = LocalTaskSpec {
827            task_type: LocalTaskType::ResolveNative { native_fn },
828            this,
829            arg,
830        };
831        self.schedule_local_task(task_type, persistence)
832    }
833
834    pub fn trait_call(
835        &self,
836        trait_method: &'static TraitMethod,
837        this: RawVc,
838        arg: Box<dyn MagicAny>,
839        persistence: TaskPersistence,
840    ) -> RawVc {
841        // avoid creating a wrapper task if self is already resolved
842        // for resolved cells we already know the value type so we can lookup the
843        // function
844        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
845            match registry::get_value_type(type_id).get_trait_method(trait_method) {
846                Some(native_fn) => {
847                    let arg = native_fn.arg_meta.filter_owned(arg);
848                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
849                }
850                None => {
851                    // We are destined to fail at this point, but we just retry resolution in the
852                    // local task since we cannot report an error from here.
853                    // TODO: A panic seems appropriate since the immediate caller is to blame
854                }
855            }
856        }
857
858        // create a wrapper task to resolve all inputs
859        let task_type = LocalTaskSpec {
860            task_type: LocalTaskType::ResolveTrait { trait_method },
861            this: Some(this),
862            arg,
863        };
864
865        self.schedule_local_task(task_type, persistence)
866    }
867
868    #[track_caller]
869    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
870        self.begin_foreground_job();
871        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
872
873        self.priority_runner.schedule(
874            &self.pin(),
875            ScheduledTask::Task {
876                task_id,
877                span: Span::current(),
878            },
879            priority,
880        );
881    }
882
883    fn schedule_local_task(
884        &self,
885        ty: LocalTaskSpec,
886        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
887        persistence: TaskPersistence,
888    ) -> RawVc {
889        let task_type = ty.task_type;
890        let (global_task_state, execution_id, priority, local_task_id) =
891            CURRENT_TASK_STATE.with(|gts| {
892                let mut gts_write = gts.write().unwrap();
893                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
894                    done_event: Event::new(move || {
895                        move || format!("LocalTask({task_type})::done_event")
896                    }),
897                });
898                (
899                    Arc::clone(gts),
900                    gts_write.execution_id,
901                    gts_write.priority,
902                    local_task_id,
903                )
904            });
905
906        let future = self.priority_runner.schedule_with_join_handle(
907            &self.pin(),
908            ScheduledTask::LocalTask {
909                ty,
910                persistence,
911                local_task_id,
912                global_task_state: global_task_state.clone(),
913                span: Span::current(),
914            },
915            priority,
916        );
917        global_task_state
918            .write()
919            .unwrap()
920            .local_task_tracker
921            .get_or_insert_default()
922            .push(Either::Left(future));
923
924        RawVc::LocalOutput(execution_id, local_task_id, persistence)
925    }
926
927    fn begin_foreground_job(&self) {
928        if self
929            .currently_scheduled_foreground_jobs
930            .fetch_add(1, Ordering::AcqRel)
931            == 0
932        {
933            *self.start.lock().unwrap() = Some(Instant::now());
934            self.event_foreground_start.notify(usize::MAX);
935            self.backend.idle_end(self);
936        }
937    }
938
939    fn finish_foreground_job(&self) {
940        if self
941            .currently_scheduled_foreground_jobs
942            .fetch_sub(1, Ordering::AcqRel)
943            == 1
944        {
945            self.backend.idle_start(self);
946            // That's not super race-condition-safe, but it's only for
947            // statistical reasons
948            let total = self.scheduled_tasks.load(Ordering::Acquire);
949            self.scheduled_tasks.store(0, Ordering::Release);
950            if let Some(start) = *self.start.lock().unwrap() {
951                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
952                if let Some(update) = update.as_mut() {
953                    update.0 += start.elapsed();
954                    update.1 += total;
955                } else {
956                    *update = Some((start.elapsed(), total));
957                }
958            }
959            self.event_foreground_done.notify(usize::MAX);
960        }
961    }
962
963    fn begin_background_job(&self) {
964        self.currently_scheduled_background_jobs
965            .fetch_add(1, Ordering::Relaxed);
966    }
967
968    fn finish_background_job(&self) {
969        if self
970            .currently_scheduled_background_jobs
971            .fetch_sub(1, Ordering::Relaxed)
972            == 1
973        {
974            self.event_background_done.notify(usize::MAX);
975        }
976    }
977
978    pub fn get_in_progress_count(&self) -> usize {
979        self.currently_scheduled_foreground_jobs
980            .load(Ordering::Acquire)
981    }
982
983    /// Waits for the given task to finish executing. This works by performing an untracked read,
984    /// and discarding the value of the task output.
985    ///
986    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
987    /// before all dependencies have completely settled.
988    ///
989    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
990    /// to fully settle before returning.
991    ///
992    /// As this function is typically called in top-level code that waits for results to be ready
993    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
994    pub async fn wait_task_completion(
995        &self,
996        id: TaskId,
997        consistency: ReadConsistency,
998    ) -> Result<()> {
999        read_task_output(
1000            self,
1001            id,
1002            ReadOutputOptions {
1003                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
1004                tracking: ReadTracking::Untracked,
1005                consistency,
1006            },
1007        )
1008        .await?;
1009        Ok(())
1010    }
1011
1012    /// Returns [UpdateInfo] with all updates aggregated over a given duration
1013    /// (`aggregation`). Will wait until an update happens.
1014    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
1015        self.aggregated_update_info(aggregation, Duration::MAX)
1016            .await
1017            .unwrap()
1018    }
1019
1020    /// Returns [UpdateInfo] with all updates aggregated over a given duration
1021    /// (`aggregation`). Will only return None when the timeout is reached while
1022    /// waiting for the first update.
1023    pub async fn aggregated_update_info(
1024        &self,
1025        aggregation: Duration,
1026        timeout: Duration,
1027    ) -> Option<UpdateInfo> {
1028        let listener = self
1029            .event_foreground_done
1030            .listen_with_note(|| || "wait for update info".to_string());
1031        let wait_for_finish = {
1032            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1033            if aggregation.is_zero() {
1034                if let Some((duration, tasks)) = update.take() {
1035                    return Some(UpdateInfo {
1036                        duration,
1037                        tasks,
1038                        reasons: take(reason_set),
1039                        placeholder_for_future_fields: (),
1040                    });
1041                } else {
1042                    true
1043                }
1044            } else {
1045                update.is_none()
1046            }
1047        };
1048        if wait_for_finish {
1049            if timeout == Duration::MAX {
1050                // wait for finish
1051                listener.await;
1052            } else {
1053                // wait for start, then wait for finish or timeout
1054                let start_listener = self
1055                    .event_foreground_start
1056                    .listen_with_note(|| || "wait for update info".to_string());
1057                if self
1058                    .currently_scheduled_foreground_jobs
1059                    .load(Ordering::Acquire)
1060                    == 0
1061                {
1062                    start_listener.await;
1063                } else {
1064                    drop(start_listener);
1065                }
1066                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1067                    // Timeout
1068                    return None;
1069                }
1070            }
1071        }
1072        if !aggregation.is_zero() {
1073            loop {
1074                select! {
1075                    () = tokio::time::sleep(aggregation) => {
1076                        break;
1077                    }
1078                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1079                        // Resets the sleep
1080                    }
1081                }
1082            }
1083        }
1084        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1085        if let Some((duration, tasks)) = update.take() {
1086            Some(UpdateInfo {
1087                duration,
1088                tasks,
1089                reasons: take(reason_set),
1090                placeholder_for_future_fields: (),
1091            })
1092        } else {
1093            panic!("aggregated_update_info must not called concurrently")
1094        }
1095    }
1096
1097    pub async fn wait_background_done(&self) {
1098        let listener = self.event_background_done.listen();
1099        if self
1100            .currently_scheduled_background_jobs
1101            .load(Ordering::Acquire)
1102            != 0
1103        {
1104            listener.await;
1105        }
1106    }
1107
1108    pub async fn stop_and_wait(&self) {
1109        turbo_tasks_future_scope(self.pin(), async move {
1110            self.backend.stopping(self);
1111            self.stopped.store(true, Ordering::Release);
1112            {
1113                let listener = self
1114                    .event_foreground_done
1115                    .listen_with_note(|| || "wait for stop".to_string());
1116                if self
1117                    .currently_scheduled_foreground_jobs
1118                    .load(Ordering::Acquire)
1119                    != 0
1120                {
1121                    listener.await;
1122                }
1123            }
1124            {
1125                let listener = self.event_background_done.listen();
1126                if self
1127                    .currently_scheduled_background_jobs
1128                    .load(Ordering::Acquire)
1129                    != 0
1130                {
1131                    listener.await;
1132                }
1133            }
1134            self.backend.stop(self);
1135        })
1136        .await;
1137    }
1138
1139    #[track_caller]
1140    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1141    where
1142        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1143        T::CallOnceFuture: Send,
1144    {
1145        let mut this = self.pin();
1146        this.begin_foreground_job();
1147        tokio::spawn(
1148            TURBO_TASKS
1149                .scope(this.clone(), async move {
1150                    if !this.stopped.load(Ordering::Acquire) {
1151                        this = func(this.clone()).await;
1152                    }
1153                    this.finish_foreground_job();
1154                })
1155                .in_current_span(),
1156        );
1157    }
1158
1159    #[track_caller]
1160    pub(crate) fn schedule_background_job<T>(&self, func: T)
1161    where
1162        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1163        T::CallOnceFuture: Send,
1164    {
1165        let mut this = self.pin();
1166        self.begin_background_job();
1167        tokio::spawn(
1168            TURBO_TASKS
1169                .scope(this.clone(), async move {
1170                    if !this.stopped.load(Ordering::Acquire) {
1171                        this = func(this).await;
1172                    }
1173                    this.finish_background_job();
1174                })
1175                .in_current_span(),
1176        );
1177    }
1178
1179    fn finish_current_task_state(&self) -> FinishedTaskState {
1180        CURRENT_TASK_STATE.with(|cell| {
1181            let current_task_state = &*cell.write().unwrap();
1182            FinishedTaskState {
1183                #[cfg(feature = "verify_determinism")]
1184                stateful: current_task_state.stateful,
1185                has_invalidator: current_task_state.has_invalidator,
1186            }
1187        })
1188    }
1189
1190    pub fn backend(&self) -> &B {
1191        &self.backend
1192    }
1193}
1194
1195struct TurboTasksExecutor;
1196
1197impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1198    type Future = impl Future<Output = ()> + Send + 'static;
1199
1200    fn execute(
1201        &self,
1202        this: &Arc<TurboTasks<B>>,
1203        scheduled_task: ScheduledTask,
1204        priority: TaskPriority,
1205    ) -> Self::Future {
1206        match scheduled_task {
1207            ScheduledTask::Task { task_id, span } => {
1208                let this2 = this.clone();
1209                let this = this.clone();
1210                let future = async move {
1211                    let mut schedule_again = true;
1212                    while schedule_again {
1213                        // it's okay for execution ids to overflow and wrap, they're just used for
1214                        // an assert
1215                        let execution_id = this.execution_id_factory.wrapping_get();
1216                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1217                            task_id,
1218                            execution_id,
1219                            priority,
1220                            false, // in_top_level_task
1221                        )));
1222                        let single_execution_future = async {
1223                            if this.stopped.load(Ordering::Acquire) {
1224                                this.backend.task_execution_canceled(task_id, &*this);
1225                                return false;
1226                            }
1227
1228                            let Some(TaskExecutionSpec { future, span }) = this
1229                                .backend
1230                                .try_start_task_execution(task_id, priority, &*this)
1231                            else {
1232                                return false;
1233                            };
1234
1235                            async {
1236                                let result = CaptureFuture::new(future).await;
1237
1238                                // wait for all spawned local tasks using `local` to finish
1239                                wait_for_local_tasks().await;
1240
1241                                let result = match result {
1242                                    Ok(Ok(raw_vc)) => Ok(raw_vc),
1243                                    Ok(Err(err)) => Err(err.into()),
1244                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1245                                };
1246
1247                                let finished_state = this.finish_current_task_state();
1248                                let cell_counters = CURRENT_TASK_STATE
1249                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1250                                this.backend.task_execution_completed(
1251                                    task_id,
1252                                    result,
1253                                    &cell_counters,
1254                                    #[cfg(feature = "verify_determinism")]
1255                                    finished_state.stateful,
1256                                    finished_state.has_invalidator,
1257                                    &*this,
1258                                )
1259                            }
1260                            .instrument(span)
1261                            .await
1262                        };
1263                        schedule_again = CURRENT_TASK_STATE
1264                            .scope(current_task_state, single_execution_future)
1265                            .await;
1266                    }
1267                    this.finish_foreground_job();
1268                };
1269
1270                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1271            }
1272            ScheduledTask::LocalTask {
1273                ty,
1274                persistence,
1275                local_task_id,
1276                global_task_state,
1277                span,
1278            } => {
1279                let this2 = this.clone();
1280                let this = this.clone();
1281                let task_type = ty.task_type;
1282                let future = async move {
1283                    let span = match &ty.task_type {
1284                        LocalTaskType::ResolveNative { native_fn } => {
1285                            native_fn.resolve_span(priority)
1286                        }
1287                        LocalTaskType::ResolveTrait { trait_method } => {
1288                            trait_method.resolve_span(priority)
1289                        }
1290                    };
1291                    async move {
1292                        let result = match ty.task_type {
1293                            LocalTaskType::ResolveNative { native_fn } => {
1294                                LocalTaskType::run_resolve_native(
1295                                    native_fn,
1296                                    ty.this,
1297                                    &*ty.arg,
1298                                    persistence,
1299                                    this,
1300                                )
1301                                .await
1302                            }
1303                            LocalTaskType::ResolveTrait { trait_method } => {
1304                                LocalTaskType::run_resolve_trait(
1305                                    trait_method,
1306                                    ty.this.unwrap(),
1307                                    &*ty.arg,
1308                                    persistence,
1309                                    this,
1310                                )
1311                                .await
1312                            }
1313                        };
1314
1315                        let output = match result {
1316                            Ok(raw_vc) => OutputContent::Link(raw_vc),
1317                            Err(err) => OutputContent::Error(
1318                                TurboTasksExecutionError::from(err)
1319                                    .with_local_task_context(task_type.to_string()),
1320                            ),
1321                        };
1322
1323                        let local_task = LocalTask::Done { output };
1324
1325                        let done_event = CURRENT_TASK_STATE.with(move |gts| {
1326                            let mut gts_write = gts.write().unwrap();
1327                            let scheduled_task = std::mem::replace(
1328                                gts_write.get_mut_local_task(local_task_id),
1329                                local_task,
1330                            );
1331                            let LocalTask::Scheduled { done_event } = scheduled_task else {
1332                                panic!("local task finished, but was not in the scheduled state?");
1333                            };
1334                            done_event
1335                        });
1336                        done_event.notify(usize::MAX)
1337                    }
1338                    .instrument(span)
1339                    .await
1340                };
1341                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1342
1343                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1344            }
1345        }
1346    }
1347}
1348
1349struct FinishedTaskState {
1350    /// True if the task has state in cells (interior mutability).
1351    /// Only tracked when verify_determinism feature is enabled.
1352    #[cfg(feature = "verify_determinism")]
1353    stateful: bool,
1354
1355    /// True if the task uses an external invalidator
1356    has_invalidator: bool,
1357}
1358
1359impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1360    fn dynamic_call(
1361        &self,
1362        native_fn: &'static NativeFunction,
1363        this: Option<RawVc>,
1364        arg: Box<dyn MagicAny>,
1365        persistence: TaskPersistence,
1366    ) -> RawVc {
1367        self.dynamic_call(native_fn, this, arg, persistence)
1368    }
1369    fn native_call(
1370        &self,
1371        native_fn: &'static NativeFunction,
1372        this: Option<RawVc>,
1373        arg: Box<dyn MagicAny>,
1374        persistence: TaskPersistence,
1375    ) -> RawVc {
1376        self.native_call(native_fn, this, arg, persistence)
1377    }
1378    fn trait_call(
1379        &self,
1380        trait_method: &'static TraitMethod,
1381        this: RawVc,
1382        arg: Box<dyn MagicAny>,
1383        persistence: TaskPersistence,
1384    ) -> RawVc {
1385        self.trait_call(trait_method, this, arg, persistence)
1386    }
1387
1388    #[track_caller]
1389    fn run(
1390        &self,
1391        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1392    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1393        let this = self.pin();
1394        Box::pin(async move { this.run(future).await })
1395    }
1396
1397    #[track_caller]
1398    fn run_once(
1399        &self,
1400        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1401    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1402        let this = self.pin();
1403        Box::pin(async move { this.run_once(future).await })
1404    }
1405
1406    #[track_caller]
1407    fn run_once_with_reason(
1408        &self,
1409        reason: StaticOrArc<dyn InvalidationReason>,
1410        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1411    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1412        {
1413            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1414            reason_set.insert(reason);
1415        }
1416        let this = self.pin();
1417        Box::pin(async move { this.run_once(future).await })
1418    }
1419
1420    #[track_caller]
1421    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1422        self.start_once_process(future)
1423    }
1424
1425    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1426        if let Err(e) = self.compilation_events.send(event) {
1427            tracing::warn!("Failed to send compilation event: {e}");
1428        }
1429    }
1430
1431    fn get_task_name(&self, task: TaskId) -> String {
1432        self.backend.get_task_name(task, self)
1433    }
1434}
1435
1436impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1437    #[instrument(level = "info", skip_all, name = "invalidate")]
1438    fn invalidate(&self, task: TaskId) {
1439        self.backend.invalidate_task(task, self);
1440    }
1441
1442    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1443    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1444        {
1445            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1446            reason_set.insert(reason);
1447        }
1448        self.backend.invalidate_task(task, self);
1449    }
1450
1451    fn invalidate_serialization(&self, task: TaskId) {
1452        self.backend.invalidate_serialization(task, self);
1453    }
1454
1455    #[track_caller]
1456    fn try_read_task_output(
1457        &self,
1458        task: TaskId,
1459        options: ReadOutputOptions,
1460    ) -> Result<Result<RawVc, EventListener>> {
1461        if options.consistency == ReadConsistency::Eventual {
1462            debug_assert_not_in_top_level_task("read_task_output");
1463        }
1464        self.backend.try_read_task_output(
1465            task,
1466            current_task_if_available("reading Vcs"),
1467            options,
1468            self,
1469        )
1470    }
1471
1472    #[track_caller]
1473    fn try_read_task_cell(
1474        &self,
1475        task: TaskId,
1476        index: CellId,
1477        options: ReadCellOptions,
1478    ) -> Result<Result<TypedCellContent, EventListener>> {
1479        let reader = current_task_if_available("reading Vcs");
1480        self.backend
1481            .try_read_task_cell(task, index, reader, options, self)
1482    }
1483
1484    fn try_read_own_task_cell(
1485        &self,
1486        current_task: TaskId,
1487        index: CellId,
1488        options: ReadCellOptions,
1489    ) -> Result<TypedCellContent> {
1490        self.backend
1491            .try_read_own_task_cell(current_task, index, options, self)
1492    }
1493
1494    #[track_caller]
1495    fn try_read_local_output(
1496        &self,
1497        execution_id: ExecutionId,
1498        local_task_id: LocalTaskId,
1499    ) -> Result<Result<RawVc, EventListener>> {
1500        debug_assert_not_in_top_level_task("read_local_output");
1501        CURRENT_TASK_STATE.with(|gts| {
1502            let gts_read = gts.read().unwrap();
1503
1504            // Local Vcs are local to their parent task's current execution, and do not exist
1505            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1506            // marker trait. This assertion exists to handle any potential escapes that the
1507            // compile-time checks cannot capture.
1508            gts_read.assert_execution_id(execution_id);
1509
1510            match gts_read.get_local_task(local_task_id) {
1511                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1512                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1513            }
1514        })
1515    }
1516
1517    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1518        // TODO: Add assert_not_in_top_level_task("read_task_collectibles") check here.
1519        // Collectible reads are eventually consistent.
1520        self.backend.read_task_collectibles(
1521            task,
1522            trait_id,
1523            current_task_if_available("reading collectibles"),
1524            self,
1525        )
1526    }
1527
1528    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1529        self.backend.emit_collectible(
1530            trait_type,
1531            collectible,
1532            current_task("emitting collectible"),
1533            self,
1534        );
1535    }
1536
1537    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1538        self.backend.unemit_collectible(
1539            trait_type,
1540            collectible,
1541            count,
1542            current_task("emitting collectible"),
1543            self,
1544        );
1545    }
1546
1547    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1548        for (&collectible, &count) in collectibles {
1549            if count > 0 {
1550                self.backend.unemit_collectible(
1551                    trait_type,
1552                    collectible,
1553                    count as u32,
1554                    current_task("emitting collectible"),
1555                    self,
1556                );
1557            }
1558        }
1559    }
1560
1561    fn read_own_task_cell(
1562        &self,
1563        task: TaskId,
1564        index: CellId,
1565        options: ReadCellOptions,
1566    ) -> Result<TypedCellContent> {
1567        self.try_read_own_task_cell(task, index, options)
1568    }
1569
1570    fn update_own_task_cell(
1571        &self,
1572        task: TaskId,
1573        index: CellId,
1574        is_serializable_cell_content: bool,
1575        content: CellContent,
1576        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1577        content_hash: Option<CellHash>,
1578        verification_mode: VerificationMode,
1579    ) {
1580        self.backend.update_task_cell(
1581            task,
1582            index,
1583            is_serializable_cell_content,
1584            content,
1585            updated_key_hashes,
1586            content_hash,
1587            verification_mode,
1588            self,
1589        );
1590    }
1591
1592    fn connect_task(&self, task: TaskId) {
1593        self.backend
1594            .connect_task(task, current_task_if_available("connecting task"), self);
1595    }
1596
1597    fn mark_own_task_as_finished(&self, task: TaskId) {
1598        self.backend.mark_own_task_as_finished(task, self);
1599    }
1600
1601    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1602        self.backend.mark_own_task_as_session_dependent(task, self);
1603    }
1604
1605    /// Creates a future that inherits the current task id and task state. The current global task
1606    /// will wait for this future to be dropped before exiting.
1607    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1608        // this is similar to what happens for a local task, except that we keep the local task's
1609        // state as well.
1610        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1611        let fut = tokio::spawn(TURBO_TASKS.scope(
1612            turbo_tasks(),
1613            CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1614        ));
1615        let fut = Box::pin(async move {
1616            fut.await.unwrap();
1617        });
1618        let mut ts = global_task_state.write().unwrap();
1619        ts.local_task_tracker
1620            .get_or_insert_default()
1621            .push(Either::Right(fut));
1622    }
1623
1624    fn task_statistics(&self) -> &TaskStatisticsApi {
1625        self.backend.task_statistics()
1626    }
1627
1628    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1629        let this = self.pin();
1630        Box::pin(async move {
1631            this.stop_and_wait().await;
1632        })
1633    }
1634
1635    fn subscribe_to_compilation_events(
1636        &self,
1637        event_types: Option<Vec<String>>,
1638    ) -> Receiver<Arc<dyn CompilationEvent>> {
1639        self.compilation_events.subscribe(event_types)
1640    }
1641
1642    fn is_tracking_dependencies(&self) -> bool {
1643        self.backend.is_tracking_dependencies()
1644    }
1645}
1646
1647impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1648    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1649        self.pin()
1650    }
1651    fn backend(&self) -> &B {
1652        &self.backend
1653    }
1654
1655    #[track_caller]
1656    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1657        self.schedule_background_job(async move |this| {
1658            this.backend.run_backend_job(job, &*this).await;
1659            this
1660        })
1661    }
1662
1663    #[track_caller]
1664    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1665        self.schedule_foreground_job(async move |this| {
1666            this.backend.run_backend_job(job, &*this).await;
1667            this
1668        })
1669    }
1670
1671    #[track_caller]
1672    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1673        self.schedule(task, priority)
1674    }
1675
1676    fn get_current_task_priority(&self) -> TaskPriority {
1677        CURRENT_TASK_STATE
1678            .try_with(|task_state| task_state.read().unwrap().priority)
1679            .unwrap_or(TaskPriority::initial())
1680    }
1681
1682    fn program_duration_until(&self, instant: Instant) -> Duration {
1683        instant - self.program_start
1684    }
1685
1686    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1687        // SAFETY: This is a fresh id from the factory
1688        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1689    }
1690
1691    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1692        // SAFETY: This is a fresh id from the factory
1693        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1694    }
1695
1696    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1697        unsafe { self.task_id_factory.reuse(id.into()) }
1698    }
1699
1700    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1701        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1702    }
1703
1704    fn is_idle(&self) -> bool {
1705        self.currently_scheduled_foreground_jobs
1706            .load(Ordering::Acquire)
1707            == 0
1708    }
1709}
1710
1711async fn wait_for_local_tasks() {
1712    while let Some(mut ltt) =
1713        CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1714    {
1715        use futures::StreamExt;
1716        while ltt.next().await.is_some() {}
1717    }
1718}
1719
1720pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1721    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1722        Ok(id) => id,
1723        Err(_) => panic!(
1724            "{from} can only be used in the context of a turbo_tasks task execution or \
1725             turbo_tasks run"
1726        ),
1727    }
1728}
1729
1730pub(crate) fn current_task(from: &str) -> TaskId {
1731    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1732        Ok(Some(id)) => id,
1733        Ok(None) | Err(_) => {
1734            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1735        }
1736    }
1737}
1738
1739/// Panics if we're not in a top-level task (e.g. [`run_once`]). Some function calls should only
1740/// happen in a top-level task (e.g. [`Effects::apply`][crate::Effects::apply]).
1741#[track_caller]
1742pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1743    if !cfg!(debug_assertions) {
1744        return;
1745    }
1746
1747    let in_top_level = CURRENT_TASK_STATE
1748        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1749        .unwrap_or(true);
1750    if !in_top_level {
1751        panic!("{message}");
1752    }
1753}
1754
1755#[track_caller]
1756pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1757    if !cfg!(debug_assertions) {
1758        return;
1759    }
1760
1761    // HACK: We set this inside of `ReadRawVcFuture` to suppress warnings about an internal
1762    // consistency bug
1763    let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1764        .try_with(|&suppressed| suppressed)
1765        .unwrap_or(false);
1766    if suppressed {
1767        return;
1768    }
1769
1770    let in_top_level = CURRENT_TASK_STATE
1771        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1772        .unwrap_or(false);
1773    if in_top_level {
1774        panic!(
1775            "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1776             Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1777             reads to avoid leaking inconsistent return values."
1778        );
1779    }
1780}
1781
1782pub async fn run<T: Send + 'static>(
1783    tt: Arc<dyn TurboTasksApi>,
1784    future: impl Future<Output = Result<T>> + Send + 'static,
1785) -> Result<T> {
1786    let (tx, rx) = tokio::sync::oneshot::channel();
1787
1788    tt.run(Box::pin(async move {
1789        let result = future.await?;
1790        tx.send(result)
1791            .map_err(|_| anyhow!("unable to send result"))?;
1792        Ok(())
1793    }))
1794    .await?;
1795
1796    Ok(rx.await?)
1797}
1798
1799pub async fn run_once<T: Send + 'static>(
1800    tt: Arc<dyn TurboTasksApi>,
1801    future: impl Future<Output = Result<T>> + Send + 'static,
1802) -> Result<T> {
1803    let (tx, rx) = tokio::sync::oneshot::channel();
1804
1805    tt.run_once(Box::pin(async move {
1806        let result = future.await?;
1807        tx.send(result)
1808            .map_err(|_| anyhow!("unable to send result"))?;
1809        Ok(())
1810    }))
1811    .await?;
1812
1813    Ok(rx.await?)
1814}
1815
1816pub async fn run_once_with_reason<T: Send + 'static>(
1817    tt: Arc<dyn TurboTasksApi>,
1818    reason: impl InvalidationReason,
1819    future: impl Future<Output = Result<T>> + Send + 'static,
1820) -> Result<T> {
1821    let (tx, rx) = tokio::sync::oneshot::channel();
1822
1823    tt.run_once_with_reason(
1824        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1825        Box::pin(async move {
1826            let result = future.await?;
1827            tx.send(result)
1828                .map_err(|_| anyhow!("unable to send result"))?;
1829            Ok(())
1830        }),
1831    )
1832    .await?;
1833
1834    Ok(rx.await?)
1835}
1836
1837/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1838pub fn dynamic_call(
1839    func: &'static NativeFunction,
1840    this: Option<RawVc>,
1841    arg: Box<dyn MagicAny>,
1842    persistence: TaskPersistence,
1843) -> RawVc {
1844    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1845}
1846
1847/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1848pub fn trait_call(
1849    trait_method: &'static TraitMethod,
1850    this: RawVc,
1851    arg: Box<dyn MagicAny>,
1852    persistence: TaskPersistence,
1853) -> RawVc {
1854    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1855}
1856
1857pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1858    TURBO_TASKS.with(|arc| arc.clone())
1859}
1860
1861pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1862    TURBO_TASKS.with(Arc::downgrade)
1863}
1864
1865pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1866    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1867}
1868
1869pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1870    TURBO_TASKS.with(|arc| func(arc))
1871}
1872
1873pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1874    TURBO_TASKS.sync_scope(tt, f)
1875}
1876
1877pub fn turbo_tasks_future_scope<T>(
1878    tt: Arc<dyn TurboTasksApi>,
1879    f: impl Future<Output = T>,
1880) -> impl Future<Output = T> {
1881    TURBO_TASKS.scope(tt, f)
1882}
1883
1884pub fn with_turbo_tasks_for_testing<T>(
1885    tt: Arc<dyn TurboTasksApi>,
1886    current_task: TaskId,
1887    execution_id: ExecutionId,
1888    f: impl Future<Output = T>,
1889) -> impl Future<Output = T> {
1890    TURBO_TASKS.scope(
1891        tt,
1892        CURRENT_TASK_STATE.scope(
1893            Arc::new(RwLock::new(CurrentTaskState::new(
1894                current_task,
1895                execution_id,
1896                TaskPriority::initial(),
1897                false, // in_top_level_task
1898            ))),
1899            f,
1900        ),
1901    )
1902}
1903
1904/// Spawns the given future within the context of the current task.
1905///
1906/// Beware: this method is not safe to use in production code. It is only
1907/// intended for use in tests and for debugging purposes.
1908pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1909    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1910}
1911
1912pub fn current_task_for_testing() -> Option<TaskId> {
1913    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1914}
1915
1916/// Marks the current task as dirty when restored from filesystem cache.
1917pub fn mark_session_dependent() {
1918    with_turbo_tasks(|tt| {
1919        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1920    });
1921}
1922
1923/// Marks the current task as finished. This excludes it from waiting for
1924/// strongly consistency.
1925pub fn mark_finished() {
1926    with_turbo_tasks(|tt| {
1927        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1928    });
1929}
1930
1931/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1932/// serialization of the current task cells.
1933///
1934/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1935/// since State allocation implies interior mutability.
1936pub fn get_serialization_invalidator() -> SerializationInvalidator {
1937    CURRENT_TASK_STATE.with(|cell| {
1938        let CurrentTaskState {
1939            task_id,
1940            #[cfg(feature = "verify_determinism")]
1941            stateful,
1942            ..
1943        } = &mut *cell.write().unwrap();
1944        #[cfg(feature = "verify_determinism")]
1945        {
1946            *stateful = true;
1947        }
1948        let Some(task_id) = *task_id else {
1949            panic!(
1950                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1951                 task execution"
1952            );
1953        };
1954        SerializationInvalidator::new(task_id)
1955    })
1956}
1957
1958pub fn mark_invalidator() {
1959    CURRENT_TASK_STATE.with(|cell| {
1960        let CurrentTaskState {
1961            has_invalidator, ..
1962        } = &mut *cell.write().unwrap();
1963        *has_invalidator = true;
1964    })
1965}
1966
1967/// Marks the current task as stateful. This is used to indicate that the task
1968/// has interior mutability (e.g., via State or TransientState), which means
1969/// the task may produce different outputs even with the same inputs.
1970///
1971/// Only has an effect when the `verify_determinism` feature is enabled.
1972pub fn mark_stateful() {
1973    #[cfg(feature = "verify_determinism")]
1974    {
1975        CURRENT_TASK_STATE.with(|cell| {
1976            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1977            *stateful = true;
1978        })
1979    }
1980    // No-op when verify_determinism is not enabled
1981}
1982
1983/// Marks the current task context as being in a top-level task. When in a top-level task,
1984/// eventually consistent reads will panic. It is almost always a mistake to perform an eventually
1985/// consistent read at the top-level of the application.
1986pub fn mark_top_level_task() {
1987    if cfg!(debug_assertions) {
1988        CURRENT_TASK_STATE.with(|cell| {
1989            cell.write().unwrap().in_top_level_task = true;
1990        })
1991    }
1992}
1993
1994/// Unmarks the current task context as being in a top-level task. The opposite of
1995/// [`mark_top_level_task`].
1996///
1997/// This utility can be okay in unit tests, where we're observing the internal behavior of
1998/// turbo-tasks, but otherwise, it is probably a mistake to call this function.
1999///
2000/// Calling this will allow eventually-consistent reads at the top-level, potentially exposing
2001/// incomplete computations and internal errors caused by eventual consistency that would've been
2002/// caught when the function was re-run. A strongly-consistent read re-runs parts of a task until
2003/// all of the dependencies have settled.
2004pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
2005    if cfg!(debug_assertions) {
2006        CURRENT_TASK_STATE.with(|cell| {
2007            cell.write().unwrap().in_top_level_task = false;
2008        })
2009    }
2010}
2011
2012pub fn prevent_gc() {
2013    // TODO implement garbage collection
2014}
2015
2016pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
2017    with_turbo_tasks(|tt| {
2018        let raw_vc = collectible.node.node;
2019        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
2020    })
2021}
2022
2023pub(crate) async fn read_task_output(
2024    this: &dyn TurboTasksApi,
2025    id: TaskId,
2026    options: ReadOutputOptions,
2027) -> Result<RawVc> {
2028    loop {
2029        match this.try_read_task_output(id, options)? {
2030            Ok(result) => return Ok(result),
2031            Err(listener) => listener.await,
2032        }
2033    }
2034}
2035
2036/// A reference to a task's cell with methods that allow updating the contents
2037/// of the cell.
2038///
2039/// Mutations should not outside of the task that that owns this cell. Doing so
2040/// is a logic error, and may lead to incorrect caching behavior.
2041#[derive(Clone, Copy)]
2042pub struct CurrentCellRef {
2043    current_task: TaskId,
2044    index: CellId,
2045    is_serializable_cell_content: bool,
2046}
2047
2048type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2049
2050impl CurrentCellRef {
2051    /// Updates the cell if the given `functor` returns a value.
2052    fn conditional_update<T>(
2053        &self,
2054        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2055    ) where
2056        T: VcValueType,
2057    {
2058        self.conditional_update_with_shared_reference(|old_shared_reference| {
2059            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2060            let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2061            Some((
2062                SharedReference::new(triomphe::Arc::new(new_value)),
2063                updated_key_hashes,
2064                content_hash,
2065            ))
2066        })
2067    }
2068
2069    /// Updates the cell if the given `functor` returns a `SharedReference`.
2070    fn conditional_update_with_shared_reference(
2071        &self,
2072        functor: impl FnOnce(
2073            Option<&SharedReference>,
2074        ) -> Option<(
2075            SharedReference,
2076            Option<SmallVec<[u64; 2]>>,
2077            Option<CellHash>,
2078        )>,
2079    ) {
2080        let tt = turbo_tasks();
2081        let cell_content = tt
2082            .read_own_task_cell(
2083                self.current_task,
2084                self.index,
2085                ReadCellOptions {
2086                    // INVALIDATION: Reading our own cell must be untracked
2087                    tracking: ReadCellTracking::Untracked,
2088                    is_serializable_cell_content: self.is_serializable_cell_content,
2089                    final_read_hint: false,
2090                },
2091            )
2092            .ok();
2093        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2094        if let Some((update, updated_key_hashes, content_hash)) = update {
2095            tt.update_own_task_cell(
2096                self.current_task,
2097                self.index,
2098                self.is_serializable_cell_content,
2099                CellContent(Some(update)),
2100                updated_key_hashes,
2101                content_hash,
2102                VerificationMode::EqualityCheck,
2103            )
2104        }
2105    }
2106
2107    /// Replace the current cell's content with `new_value` if the current content is not equal by
2108    /// value with the existing content.
2109    ///
2110    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
2111    ///
2112    /// Take this example of a custom equality implementation on a transparent wrapper type:
2113    ///
2114    /// ```
2115    /// #[turbo_tasks::value(transparent, eq = "manual")]
2116    /// struct Wrapper(Vec<u32>);
2117    ///
2118    /// impl PartialEq for Wrapper {
2119    ///     fn eq(&self, other: Wrapper) {
2120    ///         // Example: order doesn't matter for equality
2121    ///         let (mut this, mut other) = (self.clone(), other.clone());
2122    ///         this.sort_unstable();
2123    ///         other.sort_unstable();
2124    ///         this == other
2125    ///     }
2126    /// }
2127    ///
2128    /// impl Eq for Wrapper {}
2129    /// ```
2130    ///
2131    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2132    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2133    ///
2134    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2135    /// just forwards to the inner value's [`PartialEq`].
2136    ///
2137    /// If you already have a `SharedReference`, consider calling
2138    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2139    /// object.
2140    pub fn compare_and_update<T>(&self, new_value: T)
2141    where
2142        T: PartialEq + VcValueType,
2143    {
2144        self.conditional_update(|old_value| {
2145            if let Some(old_value) = old_value
2146                && old_value == &new_value
2147            {
2148                return None;
2149            }
2150            Some((new_value, None, None))
2151        });
2152    }
2153
2154    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2155    /// equal by value with the existing content.
2156    ///
2157    /// If you already have a `SharedReference`, this is a faster version of
2158    /// [`CurrentCellRef::compare_and_update`].
2159    ///
2160    /// The value should be stored in [`SharedReference`] using the type `T`.
2161    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2162    where
2163        T: VcValueType + PartialEq,
2164    {
2165        self.conditional_update_with_shared_reference(|old_sr| {
2166            if let Some(old_sr) = old_sr {
2167                let old_value = extract_sr_value::<T>(old_sr);
2168                let new_value = extract_sr_value::<T>(&new_shared_reference);
2169                if old_value == new_value {
2170                    return None;
2171                }
2172            }
2173            Some((new_shared_reference, None, None))
2174        });
2175    }
2176
2177    /// Replace the current cell's content if the new value is different.
2178    ///
2179    /// Like [`Self::compare_and_update`], but also computes and stores a hash of the value.
2180    /// When the cell's transient data is evicted, the stored hash enables the backend to detect
2181    /// whether the value actually changed without re-comparing values—avoiding unnecessary
2182    /// downstream invalidation.
2183    ///
2184    /// Requires `T: DeterministicHash` in addition to `T: PartialEq`.
2185    pub fn hashed_compare_and_update<T>(&self, new_value: T)
2186    where
2187        T: PartialEq + DeterministicHash + VcValueType,
2188    {
2189        self.conditional_update(|old_value| {
2190            if let Some(old_value) = old_value
2191                && old_value == &new_value
2192            {
2193                return None;
2194            }
2195            let content_hash = hash_xxh3_hash128(&new_value);
2196            Some((new_value, None, Some(content_hash)))
2197        });
2198    }
2199
2200    /// Replace the current cell's content if the new value (from a pre-existing
2201    /// [`SharedReference`]) is different.
2202    ///
2203    /// Like [`Self::compare_and_update_with_shared_reference`], but also passes a hash
2204    /// for hash-based change detection when transient data has been evicted.
2205    pub fn hashed_compare_and_update_with_shared_reference<T>(
2206        &self,
2207        new_shared_reference: SharedReference,
2208    ) where
2209        T: VcValueType + PartialEq + DeterministicHash,
2210    {
2211        self.conditional_update_with_shared_reference(move |old_sr| {
2212            if let Some(old_sr) = old_sr {
2213                let old_value = extract_sr_value::<T>(old_sr);
2214                let new_value = extract_sr_value::<T>(&new_shared_reference);
2215                if old_value == new_value {
2216                    return None;
2217                }
2218            }
2219            let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2220            Some((new_shared_reference, None, Some(content_hash)))
2221        });
2222    }
2223
2224    /// See [`Self::compare_and_update`], but selectively update individual keys.
2225    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2226    where
2227        T: PartialEq + VcValueType,
2228        VcReadTarget<T>: KeyedEq,
2229        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2230    {
2231        self.conditional_update(|old_value| {
2232            let Some(old_value) = old_value else {
2233                return Some((new_value, None, None));
2234            };
2235            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2236            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2237            let updated_keys = old_value.different_keys(new_value_ref);
2238            if updated_keys.is_empty() {
2239                return None;
2240            }
2241            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2242            let updated_key_hashes = updated_keys
2243                .into_iter()
2244                .map(|key| FxBuildHasher.hash_one(key))
2245                .collect();
2246            Some((new_value, Some(updated_key_hashes), None))
2247        });
2248    }
2249
2250    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2251    /// keys.
2252    pub fn keyed_compare_and_update_with_shared_reference<T>(
2253        &self,
2254        new_shared_reference: SharedReference,
2255    ) where
2256        T: VcValueType + PartialEq,
2257        VcReadTarget<T>: KeyedEq,
2258        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2259    {
2260        self.conditional_update_with_shared_reference(|old_sr| {
2261            let Some(old_sr) = old_sr else {
2262                return Some((new_shared_reference, None, None));
2263            };
2264            let old_value = extract_sr_value::<T>(old_sr);
2265            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2266            let new_value = extract_sr_value::<T>(&new_shared_reference);
2267            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2268            let updated_keys = old_value.different_keys(new_value);
2269            if updated_keys.is_empty() {
2270                return None;
2271            }
2272            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2273            let updated_key_hashes = updated_keys
2274                .into_iter()
2275                .map(|key| FxBuildHasher.hash_one(key))
2276                .collect();
2277            Some((new_shared_reference, Some(updated_key_hashes), None))
2278        });
2279    }
2280
2281    /// Unconditionally updates the content of the cell.
2282    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2283    where
2284        T: VcValueType,
2285    {
2286        let tt = turbo_tasks();
2287        tt.update_own_task_cell(
2288            self.current_task,
2289            self.index,
2290            self.is_serializable_cell_content,
2291            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2292            None,
2293            None,
2294            verification_mode,
2295        )
2296    }
2297
2298    /// A faster version of [`Self::update`] if you already have a
2299    /// [`SharedReference`].
2300    ///
2301    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2302    /// by identity, no update is performed.
2303    ///
2304    /// The value should be stored in [`SharedReference`] using the type `T`.
2305    pub fn update_with_shared_reference(
2306        &self,
2307        shared_ref: SharedReference,
2308        verification_mode: VerificationMode,
2309    ) {
2310        let tt = turbo_tasks();
2311        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2312            let content = tt
2313                .read_own_task_cell(
2314                    self.current_task,
2315                    self.index,
2316                    ReadCellOptions {
2317                        // INVALIDATION: Reading our own cell must be untracked
2318                        tracking: ReadCellTracking::Untracked,
2319                        is_serializable_cell_content: self.is_serializable_cell_content,
2320                        final_read_hint: false,
2321                    },
2322                )
2323                .ok();
2324            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2325                // pointer equality (not value equality)
2326                shared_ref_exp != shared_ref
2327            } else {
2328                true
2329            }
2330        } else {
2331            true
2332        };
2333        if update {
2334            tt.update_own_task_cell(
2335                self.current_task,
2336                self.index,
2337                self.is_serializable_cell_content,
2338                CellContent(Some(shared_ref)),
2339                None,
2340                None,
2341                verification_mode,
2342            )
2343        }
2344    }
2345}
2346
2347impl From<CurrentCellRef> for RawVc {
2348    fn from(cell: CurrentCellRef) -> Self {
2349        RawVc::TaskCell(cell.current_task, cell.index)
2350    }
2351}
2352
2353fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2354    sr.0.downcast_ref::<T>()
2355        .expect("cannot update SharedReference of different type")
2356}
2357
2358pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2359    find_cell_by_id(T::get_value_type_id(), T::has_serialization())
2360}
2361
2362pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
2363    CURRENT_TASK_STATE.with(|ts| {
2364        let current_task = current_task("celling turbo_tasks values");
2365        let mut ts = ts.write().unwrap();
2366        let map = ts.cell_counters.as_mut().unwrap();
2367        let current_index = map.entry(ty).or_default();
2368        let index = *current_index;
2369        *current_index += 1;
2370        CurrentCellRef {
2371            current_task,
2372            index: CellId { type_id: ty, index },
2373            is_serializable_cell_content,
2374        }
2375    })
2376}
2377
2378pub(crate) async fn read_local_output(
2379    this: &dyn TurboTasksApi,
2380    execution_id: ExecutionId,
2381    local_task_id: LocalTaskId,
2382) -> Result<RawVc> {
2383    loop {
2384        match this.try_read_local_output(execution_id, local_task_id)? {
2385            Ok(raw_vc) => return Ok(raw_vc),
2386            Err(event_listener) => event_listener.await,
2387        }
2388    }
2389}