turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    pin::Pin,
8    sync::{
9        Arc, Mutex, RwLock, Weak,
10        atomic::{AtomicBool, AtomicUsize, Ordering},
11    },
12    time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25
26use crate::{
27    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
28    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
29    VcValueTrait, VcValueType,
30    backend::{
31        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
32        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
33    },
34    capture_future::CaptureFuture,
35    event::{Event, EventListener},
36    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
37    id_factory::IdFactoryWithReuse,
38    keyed::Keyed,
39    macro_helpers::NativeFunction,
40    magic_any::MagicAny,
41    message_queue::{CompilationEvent, CompilationEventQueue},
42    priority_runner::{Executor, JoinHandle, PriorityRunner},
43    raw_vc::{CellId, RawVc},
44    registry,
45    serialization_invalidation::SerializationInvalidator,
46    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactory, StaticOrArc},
50};
51
52/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
53/// tasks from function calls.
54pub trait TurboTasksCallApi: Sync + Send {
55    /// Calls a native function with arguments. Resolves arguments when needed
56    /// with a wrapper task.
57    fn dynamic_call(
58        &self,
59        native_fn: &'static NativeFunction,
60        this: Option<RawVc>,
61        arg: Box<dyn MagicAny>,
62        persistence: TaskPersistence,
63    ) -> RawVc;
64    /// Call a native function with arguments.
65    /// All inputs must be resolved.
66    fn native_call(
67        &self,
68        native_fn: &'static NativeFunction,
69        this: Option<RawVc>,
70        arg: Box<dyn MagicAny>,
71        persistence: TaskPersistence,
72    ) -> RawVc;
73    /// Calls a trait method with arguments. First input is the `self` object.
74    /// Uses a wrapper task to resolve
75    fn trait_call(
76        &self,
77        trait_method: &'static TraitMethod,
78        this: RawVc,
79        arg: Box<dyn MagicAny>,
80        persistence: TaskPersistence,
81    ) -> RawVc;
82
83    fn run(
84        &self,
85        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
86    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
87    fn run_once(
88        &self,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
91    fn run_once_with_reason(
92        &self,
93        reason: StaticOrArc<dyn InvalidationReason>,
94        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
95    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
96    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
97}
98
99/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
100/// context. Returned by the [`turbo_tasks`] helper function.
101///
102/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
103/// parameter.
104pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
105    fn invalidate(&self, task: TaskId);
106    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
107
108    fn invalidate_serialization(&self, task: TaskId);
109
110    fn try_read_task_output(
111        &self,
112        task: TaskId,
113        options: ReadOutputOptions,
114    ) -> Result<Result<RawVc, EventListener>>;
115
116    fn try_read_task_cell(
117        &self,
118        task: TaskId,
119        index: CellId,
120        options: ReadCellOptions,
121    ) -> Result<Result<TypedCellContent, EventListener>>;
122
123    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
124    /// task points to.
125    ///
126    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
127    /// recursively or in a loop.
128    ///
129    /// This does not accept a consistency argument, as you cannot control consistency of a read of
130    /// an operation owned by your own task. Strongly consistent reads are only allowed on
131    /// [`OperationVc`]s, which should never be local tasks.
132    ///
133    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
134    /// task to depend on itself.
135    ///
136    /// [`OperationVc`]: crate::OperationVc
137    fn try_read_local_output(
138        &self,
139        execution_id: ExecutionId,
140        local_task_id: LocalTaskId,
141    ) -> Result<Result<RawVc, EventListener>>;
142
143    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
144
145    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
146    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
147    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
148
149    /// INVALIDATION: Be careful with this, it will not track dependencies, so
150    /// using it could break cache invalidation.
151    fn try_read_own_task_cell(
152        &self,
153        current_task: TaskId,
154        index: CellId,
155        options: ReadCellOptions,
156    ) -> Result<TypedCellContent>;
157
158    fn read_own_task_cell(
159        &self,
160        task: TaskId,
161        index: CellId,
162        options: ReadCellOptions,
163    ) -> Result<TypedCellContent>;
164    fn update_own_task_cell(
165        &self,
166        task: TaskId,
167        index: CellId,
168        is_serializable_cell_content: bool,
169        content: CellContent,
170        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
171        verification_mode: VerificationMode,
172    );
173    fn mark_own_task_as_finished(&self, task: TaskId);
174    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
175    fn mark_own_task_as_session_dependent(&self, task: TaskId);
176
177    fn connect_task(&self, task: TaskId);
178
179    /// Wraps the given future in the current task.
180    ///
181    /// Beware: this method is not safe to use in production code. It is only intended for use in
182    /// tests and for debugging purposes.
183    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
184
185    fn task_statistics(&self) -> &TaskStatisticsApi;
186
187    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
188
189    fn subscribe_to_compilation_events(
190        &self,
191        event_types: Option<Vec<String>>,
192    ) -> Receiver<Arc<dyn CompilationEvent>>;
193    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
194
195    // Returns true if TurboTasks is configured to track dependencies.
196    fn is_tracking_dependencies(&self) -> bool;
197}
198
199/// A wrapper around a value that is unused.
200pub struct Unused<T> {
201    inner: T,
202}
203
204impl<T> Unused<T> {
205    /// Creates a new unused value.
206    ///
207    /// # Safety
208    ///
209    /// The wrapped value must not be used.
210    pub unsafe fn new_unchecked(inner: T) -> Self {
211        Self { inner }
212    }
213
214    /// Get the inner value, without consuming the `Unused` wrapper.
215    ///
216    /// # Safety
217    ///
218    /// The user need to make sure that the value stays unused.
219    pub unsafe fn get_unchecked(&self) -> &T {
220        &self.inner
221    }
222
223    /// Unwraps the value, consuming the `Unused` wrapper.
224    pub fn into(self) -> T {
225        self.inner
226    }
227}
228
229/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
230pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
231    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
232
233    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
234    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
235    /// # Safety
236    ///
237    /// The caller must ensure that the task id is not used anymore.
238    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
239    /// # Safety
240    ///
241    /// The caller must ensure that the task id is not used anymore.
242    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
243
244    /// Schedule a task for execution.
245    fn schedule(&self, task: TaskId, priority: TaskPriority);
246
247    /// Returns the priority of the current task.
248    fn get_current_task_priority(&self) -> TaskPriority;
249
250    /// Schedule a foreground backend job for execution.
251    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
252
253    /// Schedule a background backend job for execution.
254    ///
255    /// Background jobs are not counted towards activeness of the system. The system is considered
256    /// idle even with active background jobs.
257    fn schedule_backend_background_job(&self, job: B::BackendJob);
258
259    /// Returns the duration from the start of the program to the given instant.
260    fn program_duration_until(&self, instant: Instant) -> Duration;
261
262    /// Returns true if the system is idle.
263    fn is_idle(&self) -> bool;
264
265    /// Returns a reference to the backend.
266    fn backend(&self) -> &B;
267}
268
269#[allow(clippy::manual_non_exhaustive)]
270pub struct UpdateInfo {
271    pub duration: Duration,
272    pub tasks: usize,
273    pub reasons: InvalidationReasonSet,
274    #[allow(dead_code)]
275    placeholder_for_future_fields: (),
276}
277
278#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
279pub enum TaskPersistence {
280    /// Tasks that may be persisted across sessions using serialization.
281    Persistent,
282
283    /// Tasks that will be persisted in memory for the life of this session, but won't persist
284    /// between sessions.
285    ///
286    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
287    /// type [`TransientValue`][crate::value::TransientValue] or
288    /// [`TransientInstance`][crate::value::TransientInstance].
289    Transient,
290}
291
292impl Display for TaskPersistence {
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        match self {
295            TaskPersistence::Persistent => write!(f, "persistent"),
296            TaskPersistence::Transient => write!(f, "transient"),
297        }
298    }
299}
300
301#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
302pub enum ReadConsistency {
303    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
304    /// may later trigger re-computation.
305    #[default]
306    Eventual,
307    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
308    /// the cost of slower reads.
309    ///
310    /// Top-level code that returns data to the user should use strongly consistent reads.
311    Strong,
312}
313
314#[derive(Clone, Copy, Debug, Eq, PartialEq)]
315pub enum ReadCellTracking {
316    /// Reads are tracked as dependencies of the current task.
317    Tracked {
318        /// The key used for the dependency
319        key: Option<u64>,
320    },
321    /// The read is only tracked when there is an error, otherwise it is untracked.
322    ///
323    /// INVALIDATION: Be careful with this, it will not track dependencies, so
324    /// using it could break cache invalidation.
325    TrackOnlyError,
326    /// The read is not tracked as a dependency of the current task.
327    ///
328    /// INVALIDATION: Be careful with this, it will not track dependencies, so
329    /// using it could break cache invalidation.
330    Untracked,
331}
332
333impl ReadCellTracking {
334    pub fn should_track(&self, is_err: bool) -> bool {
335        match self {
336            ReadCellTracking::Tracked { .. } => true,
337            ReadCellTracking::TrackOnlyError => is_err,
338            ReadCellTracking::Untracked => false,
339        }
340    }
341
342    pub fn key(&self) -> Option<u64> {
343        match self {
344            ReadCellTracking::Tracked { key } => *key,
345            ReadCellTracking::TrackOnlyError => None,
346            ReadCellTracking::Untracked => None,
347        }
348    }
349}
350
351impl Default for ReadCellTracking {
352    fn default() -> Self {
353        ReadCellTracking::Tracked { key: None }
354    }
355}
356
357impl Display for ReadCellTracking {
358    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359        match self {
360            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
361            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
362            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
363            ReadCellTracking::Untracked => write!(f, "untracked"),
364        }
365    }
366}
367
368#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
369pub enum ReadTracking {
370    /// Reads are tracked as dependencies of the current task.
371    #[default]
372    Tracked,
373    /// The read is only tracked when there is an error, otherwise it is untracked.
374    ///
375    /// INVALIDATION: Be careful with this, it will not track dependencies, so
376    /// using it could break cache invalidation.
377    TrackOnlyError,
378    /// The read is not tracked as a dependency of the current task.
379    ///
380    /// INVALIDATION: Be careful with this, it will not track dependencies, so
381    /// using it could break cache invalidation.
382    Untracked,
383}
384
385impl ReadTracking {
386    pub fn should_track(&self, is_err: bool) -> bool {
387        match self {
388            ReadTracking::Tracked => true,
389            ReadTracking::TrackOnlyError => is_err,
390            ReadTracking::Untracked => false,
391        }
392    }
393}
394
395impl Display for ReadTracking {
396    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
397        match self {
398            ReadTracking::Tracked => write!(f, "tracked"),
399            ReadTracking::TrackOnlyError => write!(f, "track only error"),
400            ReadTracking::Untracked => write!(f, "untracked"),
401        }
402    }
403}
404
405#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
406pub enum TaskPriority {
407    #[default]
408    Initial,
409    Invalidation {
410        priority: Reverse<u32>,
411    },
412}
413
414impl TaskPriority {
415    pub fn invalidation(priority: u32) -> Self {
416        Self::Invalidation {
417            priority: Reverse(priority),
418        }
419    }
420
421    pub fn initial() -> Self {
422        Self::Initial
423    }
424
425    pub fn leaf() -> Self {
426        Self::Invalidation {
427            priority: Reverse(0),
428        }
429    }
430
431    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
432        match self {
433            TaskPriority::Initial => parent_priority,
434            TaskPriority::Invalidation { priority } => {
435                if let TaskPriority::Invalidation {
436                    priority: parent_priority,
437                } = parent_priority
438                    && priority.0 < parent_priority.0
439                {
440                    Self::Invalidation {
441                        priority: Reverse(parent_priority.0.saturating_add(1)),
442                    }
443                } else {
444                    *self
445                }
446            }
447        }
448    }
449}
450
451impl Display for TaskPriority {
452    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453        match self {
454            TaskPriority::Initial => write!(f, "initial"),
455            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
456        }
457    }
458}
459
460enum ScheduledTask {
461    Task {
462        task_id: TaskId,
463        span: Span,
464    },
465    LocalTask {
466        ty: LocalTaskSpec,
467        persistence: TaskPersistence,
468        local_task_id: LocalTaskId,
469        global_task_state: Arc<RwLock<CurrentTaskState>>,
470        span: Span,
471    },
472}
473
474pub struct TurboTasks<B: Backend + 'static> {
475    this: Weak<Self>,
476    backend: B,
477    task_id_factory: IdFactoryWithReuse<TaskId>,
478    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
479    execution_id_factory: IdFactory<ExecutionId>,
480    stopped: AtomicBool,
481    currently_scheduled_foreground_jobs: AtomicUsize,
482    currently_scheduled_background_jobs: AtomicUsize,
483    scheduled_tasks: AtomicUsize,
484    priority_runner:
485        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
486    start: Mutex<Option<Instant>>,
487    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
488    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
489    event_foreground_start: Event,
490    /// Event that is triggered when all foreground jobs are done
491    /// (currently_scheduled_foreground_jobs becomes zero)
492    event_foreground_done: Event,
493    /// Event that is triggered when all background jobs are done
494    event_background_done: Event,
495    program_start: Instant,
496    compilation_events: CompilationEventQueue,
497}
498
499type LocalTaskTracker = Option<
500    FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
501>;
502
503/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
504/// all share the same non-local task state.
505///
506/// A non-local task is one that:
507///
508/// - Has a unique task id.
509/// - Is potentially cached.
510/// - The backend is aware of.
511struct CurrentTaskState {
512    task_id: Option<TaskId>,
513    execution_id: ExecutionId,
514    priority: TaskPriority,
515
516    /// True if the current task uses an external invalidator
517    has_invalidator: bool,
518
519    /// Tracks how many cells of each type has been allocated so far during this task execution.
520    /// When a task is re-executed, the cell count may not match the existing cell vec length.
521    ///
522    /// This is taken (and becomes `None`) during teardown of a task.
523    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
524
525    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
526    local_tasks: Vec<LocalTask>,
527
528    /// Tracks currently running local tasks, and defers cleanup of the global task until those
529    /// complete. Also used by `spawn_detached_for_testing`.
530    local_task_tracker: LocalTaskTracker,
531}
532
533impl CurrentTaskState {
534    fn new(task_id: TaskId, execution_id: ExecutionId, priority: TaskPriority) -> Self {
535        Self {
536            task_id: Some(task_id),
537            execution_id,
538            priority,
539            has_invalidator: false,
540            cell_counters: Some(AutoMap::default()),
541            local_tasks: Vec::new(),
542            local_task_tracker: None,
543        }
544    }
545
546    fn new_temporary(execution_id: ExecutionId, priority: TaskPriority) -> Self {
547        Self {
548            task_id: None,
549            execution_id,
550            priority,
551            has_invalidator: false,
552            cell_counters: None,
553            local_tasks: Vec::new(),
554            local_task_tracker: None,
555        }
556    }
557
558    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
559        if self.execution_id != expected_execution_id {
560            panic!(
561                "Local tasks can only be scheduled/awaited within the same execution of the \
562                 parent task that created them"
563            );
564        }
565    }
566
567    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
568        self.local_tasks.push(local_task);
569        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
570        if cfg!(debug_assertions) {
571            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
572        } else {
573            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
574        }
575    }
576
577    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
578        // local task ids are one-indexed (they use NonZeroU32)
579        &self.local_tasks[(*local_task_id as usize) - 1]
580    }
581
582    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
583        &mut self.local_tasks[(*local_task_id as usize) - 1]
584    }
585}
586
587// TODO implement our own thread pool and make these thread locals instead
588task_local! {
589    /// The current TurboTasks instance
590    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
591
592    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
593}
594
595impl<B: Backend + 'static> TurboTasks<B> {
596    // TODO better lifetime management for turbo tasks
597    // consider using unsafe for the task_local turbo tasks
598    // that should be safe as long tasks can't outlife turbo task
599    // so we probably want to make sure that all tasks are joined
600    // when trying to drop turbo tasks
601    pub fn new(backend: B) -> Arc<Self> {
602        let task_id_factory = IdFactoryWithReuse::new(
603            TaskId::MIN,
604            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
605        );
606        let transient_task_id_factory =
607            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
608        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
609        let this = Arc::new_cyclic(|this| Self {
610            this: this.clone(),
611            backend,
612            task_id_factory,
613            transient_task_id_factory,
614            execution_id_factory,
615            stopped: AtomicBool::new(false),
616            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
617            currently_scheduled_background_jobs: AtomicUsize::new(0),
618            scheduled_tasks: AtomicUsize::new(0),
619            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
620            start: Default::default(),
621            aggregated_update: Default::default(),
622            event_foreground_done: Event::new(|| {
623                || "TurboTasks::event_foreground_done".to_string()
624            }),
625            event_foreground_start: Event::new(|| {
626                || "TurboTasks::event_foreground_start".to_string()
627            }),
628            event_background_done: Event::new(|| {
629                || "TurboTasks::event_background_done".to_string()
630            }),
631            program_start: Instant::now(),
632            compilation_events: CompilationEventQueue::default(),
633        });
634        this.backend.startup(&*this);
635        this
636    }
637
638    pub fn pin(&self) -> Arc<Self> {
639        self.this.upgrade().unwrap()
640    }
641
642    /// Creates a new root task
643    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
644    where
645        T: ?Sized,
646        F: Fn() -> Fut + Send + Sync + Clone + 'static,
647        Fut: Future<Output = Result<Vc<T>>> + Send,
648    {
649        let id = self.backend.create_transient_task(
650            TransientTaskType::Root(Box::new(move || {
651                let functor = functor.clone();
652                Box::pin(async move {
653                    let raw_vc = functor().await?.node;
654                    raw_vc.to_non_local().await
655                })
656            })),
657            self,
658        );
659        self.schedule(id, TaskPriority::initial());
660        id
661    }
662
663    pub fn dispose_root_task(&self, task_id: TaskId) {
664        self.backend.dispose_root_task(task_id, self);
665    }
666
667    // TODO make sure that all dependencies settle before reading them
668    /// Creates a new root task, that is only executed once.
669    /// Dependencies will not invalidate the task.
670    #[track_caller]
671    fn spawn_once_task<T, Fut>(&self, future: Fut)
672    where
673        T: ?Sized,
674        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
675    {
676        let id = self.backend.create_transient_task(
677            TransientTaskType::Once(Box::pin(async move {
678                let raw_vc = future.await?.node;
679                raw_vc.to_non_local().await
680            })),
681            self,
682        );
683        self.schedule(id, TaskPriority::initial());
684    }
685
686    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
687        &self,
688        future: impl Future<Output = Result<T>> + Send + 'static,
689    ) -> Result<T> {
690        let (tx, rx) = tokio::sync::oneshot::channel();
691        self.spawn_once_task(async move {
692            let result = future.await;
693            tx.send(result)
694                .map_err(|_| anyhow!("unable to send result"))?;
695            Ok(Completion::new())
696        });
697
698        rx.await?
699    }
700
701    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
702    pub async fn run<T: TraceRawVcs + Send + 'static>(
703        &self,
704        future: impl Future<Output = Result<T>> + Send + 'static,
705    ) -> Result<T, TurboTasksExecutionError> {
706        self.begin_foreground_job();
707        // it's okay for execution ids to overflow and wrap, they're just used for an assert
708        let execution_id = self.execution_id_factory.wrapping_get();
709        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
710            execution_id,
711            TaskPriority::initial(),
712        )));
713
714        let result = TURBO_TASKS
715            .scope(
716                self.pin(),
717                CURRENT_TASK_STATE.scope(current_task_state, async {
718                    let result = CaptureFuture::new(future).await;
719
720                    // wait for all spawned local tasks using `local` to finish
721                    wait_for_local_tasks().await;
722
723                    match result {
724                        Ok(Ok(raw_vc)) => Ok(raw_vc),
725                        Ok(Err(err)) => Err(err.into()),
726                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
727                    }
728                }),
729            )
730            .await;
731        self.finish_foreground_job();
732        result
733    }
734
735    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
736        let this = self.pin();
737        tokio::spawn(async move {
738            this.pin()
739                .run_once(async move {
740                    this.finish_foreground_job();
741                    future.await;
742                    this.begin_foreground_job();
743                    Ok(())
744                })
745                .await
746                .unwrap()
747        });
748    }
749
750    pub(crate) fn native_call(
751        &self,
752        native_fn: &'static NativeFunction,
753        this: Option<RawVc>,
754        arg: Box<dyn MagicAny>,
755        persistence: TaskPersistence,
756    ) -> RawVc {
757        let task_type = CachedTaskType {
758            native_fn,
759            this,
760            arg,
761        };
762        RawVc::TaskOutput(match persistence {
763            TaskPersistence::Transient => self.backend.get_or_create_transient_task(
764                task_type,
765                current_task_if_available("turbo_function calls"),
766                self,
767            ),
768            TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
769                task_type,
770                current_task_if_available("turbo_function calls"),
771                self,
772            ),
773        })
774    }
775
776    pub fn dynamic_call(
777        &self,
778        native_fn: &'static NativeFunction,
779        this: Option<RawVc>,
780        arg: Box<dyn MagicAny>,
781        persistence: TaskPersistence,
782    ) -> RawVc {
783        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
784            return self.native_call(native_fn, this, arg, persistence);
785        }
786        let task_type = LocalTaskSpec {
787            task_type: LocalTaskType::ResolveNative { native_fn },
788            this,
789            arg,
790        };
791        self.schedule_local_task(task_type, persistence)
792    }
793
794    pub fn trait_call(
795        &self,
796        trait_method: &'static TraitMethod,
797        this: RawVc,
798        arg: Box<dyn MagicAny>,
799        persistence: TaskPersistence,
800    ) -> RawVc {
801        // avoid creating a wrapper task if self is already resolved
802        // for resolved cells we already know the value type so we can lookup the
803        // function
804        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
805            match registry::get_value_type(type_id).get_trait_method(trait_method) {
806                Some(native_fn) => {
807                    let arg = native_fn.arg_meta.filter_owned(arg);
808                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
809                }
810                None => {
811                    // We are destined to fail at this point, but we just retry resolution in the
812                    // local task since we cannot report an error from here.
813                    // TODO: A panic seems appropriate since the immediate caller is to blame
814                }
815            }
816        }
817
818        // create a wrapper task to resolve all inputs
819        let task_type = LocalTaskSpec {
820            task_type: LocalTaskType::ResolveTrait { trait_method },
821            this: Some(this),
822            arg,
823        };
824
825        self.schedule_local_task(task_type, persistence)
826    }
827
828    #[track_caller]
829    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
830        self.begin_foreground_job();
831        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
832
833        self.priority_runner.schedule(
834            &self.pin(),
835            ScheduledTask::Task {
836                task_id,
837                span: Span::current(),
838            },
839            priority,
840        );
841    }
842
843    fn schedule_local_task(
844        &self,
845        ty: LocalTaskSpec,
846        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
847        persistence: TaskPersistence,
848    ) -> RawVc {
849        let task_type = ty.task_type;
850        let (global_task_state, execution_id, priority, local_task_id) =
851            CURRENT_TASK_STATE.with(|gts| {
852                let mut gts_write = gts.write().unwrap();
853                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
854                    done_event: Event::new(move || {
855                        move || format!("LocalTask({task_type})::done_event")
856                    }),
857                });
858                (
859                    Arc::clone(gts),
860                    gts_write.execution_id,
861                    gts_write.priority,
862                    local_task_id,
863                )
864            });
865
866        let future = self.priority_runner.schedule_with_join_handle(
867            &self.pin(),
868            ScheduledTask::LocalTask {
869                ty,
870                persistence,
871                local_task_id,
872                global_task_state: global_task_state.clone(),
873                span: Span::current(),
874            },
875            priority,
876        );
877        global_task_state
878            .write()
879            .unwrap()
880            .local_task_tracker
881            .get_or_insert_default()
882            .push(Either::Left(future));
883
884        RawVc::LocalOutput(execution_id, local_task_id, persistence)
885    }
886
887    fn begin_foreground_job(&self) {
888        if self
889            .currently_scheduled_foreground_jobs
890            .fetch_add(1, Ordering::AcqRel)
891            == 0
892        {
893            *self.start.lock().unwrap() = Some(Instant::now());
894            self.event_foreground_start.notify(usize::MAX);
895            self.backend.idle_end(self);
896        }
897    }
898
899    fn finish_foreground_job(&self) {
900        if self
901            .currently_scheduled_foreground_jobs
902            .fetch_sub(1, Ordering::AcqRel)
903            == 1
904        {
905            self.backend.idle_start(self);
906            // That's not super race-condition-safe, but it's only for
907            // statistical reasons
908            let total = self.scheduled_tasks.load(Ordering::Acquire);
909            self.scheduled_tasks.store(0, Ordering::Release);
910            if let Some(start) = *self.start.lock().unwrap() {
911                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
912                if let Some(update) = update.as_mut() {
913                    update.0 += start.elapsed();
914                    update.1 += total;
915                } else {
916                    *update = Some((start.elapsed(), total));
917                }
918            }
919            self.event_foreground_done.notify(usize::MAX);
920        }
921    }
922
923    fn begin_background_job(&self) {
924        self.currently_scheduled_background_jobs
925            .fetch_add(1, Ordering::Relaxed);
926    }
927
928    fn finish_background_job(&self) {
929        if self
930            .currently_scheduled_background_jobs
931            .fetch_sub(1, Ordering::Relaxed)
932            == 1
933        {
934            self.event_background_done.notify(usize::MAX);
935        }
936    }
937
938    pub fn get_in_progress_count(&self) -> usize {
939        self.currently_scheduled_foreground_jobs
940            .load(Ordering::Acquire)
941    }
942
943    /// Waits for the given task to finish executing. This works by performing an untracked read,
944    /// and discarding the value of the task output.
945    ///
946    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
947    /// before all dependencies have completely settled.
948    ///
949    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
950    /// to fully settle before returning.
951    ///
952    /// As this function is typically called in top-level code that waits for results to be ready
953    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
954    pub async fn wait_task_completion(
955        &self,
956        id: TaskId,
957        consistency: ReadConsistency,
958    ) -> Result<()> {
959        read_task_output(
960            self,
961            id,
962            ReadOutputOptions {
963                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
964                tracking: ReadTracking::Untracked,
965                consistency,
966            },
967        )
968        .await?;
969        Ok(())
970    }
971
972    /// Returns [UpdateInfo] with all updates aggregated over a given duration
973    /// (`aggregation`). Will wait until an update happens.
974    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
975        self.aggregated_update_info(aggregation, Duration::MAX)
976            .await
977            .unwrap()
978    }
979
980    /// Returns [UpdateInfo] with all updates aggregated over a given duration
981    /// (`aggregation`). Will only return None when the timeout is reached while
982    /// waiting for the first update.
983    pub async fn aggregated_update_info(
984        &self,
985        aggregation: Duration,
986        timeout: Duration,
987    ) -> Option<UpdateInfo> {
988        let listener = self
989            .event_foreground_done
990            .listen_with_note(|| || "wait for update info".to_string());
991        let wait_for_finish = {
992            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
993            if aggregation.is_zero() {
994                if let Some((duration, tasks)) = update.take() {
995                    return Some(UpdateInfo {
996                        duration,
997                        tasks,
998                        reasons: take(reason_set),
999                        placeholder_for_future_fields: (),
1000                    });
1001                } else {
1002                    true
1003                }
1004            } else {
1005                update.is_none()
1006            }
1007        };
1008        if wait_for_finish {
1009            if timeout == Duration::MAX {
1010                // wait for finish
1011                listener.await;
1012            } else {
1013                // wait for start, then wait for finish or timeout
1014                let start_listener = self
1015                    .event_foreground_start
1016                    .listen_with_note(|| || "wait for update info".to_string());
1017                if self
1018                    .currently_scheduled_foreground_jobs
1019                    .load(Ordering::Acquire)
1020                    == 0
1021                {
1022                    start_listener.await;
1023                } else {
1024                    drop(start_listener);
1025                }
1026                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1027                    // Timeout
1028                    return None;
1029                }
1030            }
1031        }
1032        if !aggregation.is_zero() {
1033            loop {
1034                select! {
1035                    () = tokio::time::sleep(aggregation) => {
1036                        break;
1037                    }
1038                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1039                        // Resets the sleep
1040                    }
1041                }
1042            }
1043        }
1044        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1045        if let Some((duration, tasks)) = update.take() {
1046            Some(UpdateInfo {
1047                duration,
1048                tasks,
1049                reasons: take(reason_set),
1050                placeholder_for_future_fields: (),
1051            })
1052        } else {
1053            panic!("aggregated_update_info must not called concurrently")
1054        }
1055    }
1056
1057    pub async fn wait_background_done(&self) {
1058        let listener = self.event_background_done.listen();
1059        if self
1060            .currently_scheduled_background_jobs
1061            .load(Ordering::Acquire)
1062            != 0
1063        {
1064            listener.await;
1065        }
1066    }
1067
1068    pub async fn stop_and_wait(&self) {
1069        turbo_tasks_future_scope(self.pin(), async move {
1070            self.backend.stopping(self);
1071            self.stopped.store(true, Ordering::Release);
1072            {
1073                let listener = self
1074                    .event_foreground_done
1075                    .listen_with_note(|| || "wait for stop".to_string());
1076                if self
1077                    .currently_scheduled_foreground_jobs
1078                    .load(Ordering::Acquire)
1079                    != 0
1080                {
1081                    listener.await;
1082                }
1083            }
1084            {
1085                let listener = self.event_background_done.listen();
1086                if self
1087                    .currently_scheduled_background_jobs
1088                    .load(Ordering::Acquire)
1089                    != 0
1090                {
1091                    listener.await;
1092                }
1093            }
1094            self.backend.stop(self);
1095        })
1096        .await;
1097    }
1098
1099    #[track_caller]
1100    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1101    where
1102        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1103        T::CallOnceFuture: Send,
1104    {
1105        let mut this = self.pin();
1106        this.begin_foreground_job();
1107        tokio::spawn(
1108            TURBO_TASKS
1109                .scope(this.clone(), async move {
1110                    if !this.stopped.load(Ordering::Acquire) {
1111                        this = func(this.clone()).await;
1112                    }
1113                    this.finish_foreground_job();
1114                })
1115                .in_current_span(),
1116        );
1117    }
1118
1119    #[track_caller]
1120    pub(crate) fn schedule_background_job<T>(&self, func: T)
1121    where
1122        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1123        T::CallOnceFuture: Send,
1124    {
1125        let mut this = self.pin();
1126        self.begin_background_job();
1127        tokio::spawn(
1128            TURBO_TASKS
1129                .scope(this.clone(), async move {
1130                    if !this.stopped.load(Ordering::Acquire) {
1131                        this = func(this).await;
1132                    }
1133                    this.finish_background_job();
1134                })
1135                .in_current_span(),
1136        );
1137    }
1138
1139    fn finish_current_task_state(&self) -> FinishedTaskState {
1140        let has_invalidator = CURRENT_TASK_STATE.with(|cell| {
1141            let CurrentTaskState {
1142                has_invalidator, ..
1143            } = &mut *cell.write().unwrap();
1144            *has_invalidator
1145        });
1146
1147        FinishedTaskState { has_invalidator }
1148    }
1149
1150    pub fn backend(&self) -> &B {
1151        &self.backend
1152    }
1153}
1154
1155struct TurboTasksExecutor;
1156
1157impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1158    type Future = impl Future<Output = ()> + Send + 'static;
1159
1160    fn execute(
1161        &self,
1162        this: &Arc<TurboTasks<B>>,
1163        scheduled_task: ScheduledTask,
1164        priority: TaskPriority,
1165    ) -> Self::Future {
1166        match scheduled_task {
1167            ScheduledTask::Task { task_id, span } => {
1168                let this2 = this.clone();
1169                let this = this.clone();
1170                let future = async move {
1171                    let mut schedule_again = true;
1172                    while schedule_again {
1173                        // it's okay for execution ids to overflow and wrap, they're just used for
1174                        // an assert
1175                        let execution_id = this.execution_id_factory.wrapping_get();
1176                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1177                            task_id,
1178                            execution_id,
1179                            priority,
1180                        )));
1181                        let single_execution_future = async {
1182                            if this.stopped.load(Ordering::Acquire) {
1183                                this.backend.task_execution_canceled(task_id, &*this);
1184                                return false;
1185                            }
1186
1187                            let Some(TaskExecutionSpec { future, span }) = this
1188                                .backend
1189                                .try_start_task_execution(task_id, priority, &*this)
1190                            else {
1191                                return false;
1192                            };
1193
1194                            async {
1195                                let result = CaptureFuture::new(future).await;
1196
1197                                // wait for all spawned local tasks using `local` to finish
1198                                wait_for_local_tasks().await;
1199
1200                                let result = match result {
1201                                    Ok(Ok(raw_vc)) => Ok(raw_vc),
1202                                    Ok(Err(err)) => Err(err.into()),
1203                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1204                                };
1205
1206                                let FinishedTaskState { has_invalidator } =
1207                                    this.finish_current_task_state();
1208                                let cell_counters = CURRENT_TASK_STATE
1209                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1210                                this.backend.task_execution_completed(
1211                                    task_id,
1212                                    result,
1213                                    &cell_counters,
1214                                    has_invalidator,
1215                                    &*this,
1216                                )
1217                            }
1218                            .instrument(span)
1219                            .await
1220                        };
1221                        schedule_again = CURRENT_TASK_STATE
1222                            .scope(current_task_state, single_execution_future)
1223                            .await;
1224                    }
1225                    this.finish_foreground_job();
1226                };
1227
1228                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1229            }
1230            ScheduledTask::LocalTask {
1231                ty,
1232                persistence,
1233                local_task_id,
1234                global_task_state,
1235                span,
1236            } => {
1237                let this2 = this.clone();
1238                let this = this.clone();
1239                let task_type = ty.task_type;
1240                let future = async move {
1241                    let span = match &ty.task_type {
1242                        LocalTaskType::ResolveNative { native_fn } => {
1243                            native_fn.resolve_span(priority)
1244                        }
1245                        LocalTaskType::ResolveTrait { trait_method } => {
1246                            trait_method.resolve_span(priority)
1247                        }
1248                    };
1249                    async move {
1250                        let result = match ty.task_type {
1251                            LocalTaskType::ResolveNative { native_fn } => {
1252                                LocalTaskType::run_resolve_native(
1253                                    native_fn,
1254                                    ty.this,
1255                                    &*ty.arg,
1256                                    persistence,
1257                                    this,
1258                                )
1259                                .await
1260                            }
1261                            LocalTaskType::ResolveTrait { trait_method } => {
1262                                LocalTaskType::run_resolve_trait(
1263                                    trait_method,
1264                                    ty.this.unwrap(),
1265                                    &*ty.arg,
1266                                    persistence,
1267                                    this,
1268                                )
1269                                .await
1270                            }
1271                        };
1272
1273                        let output = match result {
1274                            Ok(raw_vc) => OutputContent::Link(raw_vc),
1275                            Err(err) => OutputContent::Error(
1276                                TurboTasksExecutionError::from(err)
1277                                    .with_task_context(task_type, None),
1278                            ),
1279                        };
1280
1281                        let local_task = LocalTask::Done { output };
1282
1283                        let done_event = CURRENT_TASK_STATE.with(move |gts| {
1284                            let mut gts_write = gts.write().unwrap();
1285                            let scheduled_task = std::mem::replace(
1286                                gts_write.get_mut_local_task(local_task_id),
1287                                local_task,
1288                            );
1289                            let LocalTask::Scheduled { done_event } = scheduled_task else {
1290                                panic!("local task finished, but was not in the scheduled state?");
1291                            };
1292                            done_event
1293                        });
1294                        done_event.notify(usize::MAX)
1295                    }
1296                    .instrument(span)
1297                    .await
1298                };
1299                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1300
1301                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1302            }
1303        }
1304    }
1305}
1306
1307struct FinishedTaskState {
1308    /// True if the task uses an external invalidator
1309    has_invalidator: bool,
1310}
1311
1312impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1313    fn dynamic_call(
1314        &self,
1315        native_fn: &'static NativeFunction,
1316        this: Option<RawVc>,
1317        arg: Box<dyn MagicAny>,
1318        persistence: TaskPersistence,
1319    ) -> RawVc {
1320        self.dynamic_call(native_fn, this, arg, persistence)
1321    }
1322    fn native_call(
1323        &self,
1324        native_fn: &'static NativeFunction,
1325        this: Option<RawVc>,
1326        arg: Box<dyn MagicAny>,
1327        persistence: TaskPersistence,
1328    ) -> RawVc {
1329        self.native_call(native_fn, this, arg, persistence)
1330    }
1331    fn trait_call(
1332        &self,
1333        trait_method: &'static TraitMethod,
1334        this: RawVc,
1335        arg: Box<dyn MagicAny>,
1336        persistence: TaskPersistence,
1337    ) -> RawVc {
1338        self.trait_call(trait_method, this, arg, persistence)
1339    }
1340
1341    #[track_caller]
1342    fn run(
1343        &self,
1344        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1345    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1346        let this = self.pin();
1347        Box::pin(async move { this.run(future).await })
1348    }
1349
1350    #[track_caller]
1351    fn run_once(
1352        &self,
1353        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1354    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1355        let this = self.pin();
1356        Box::pin(async move { this.run_once(future).await })
1357    }
1358
1359    #[track_caller]
1360    fn run_once_with_reason(
1361        &self,
1362        reason: StaticOrArc<dyn InvalidationReason>,
1363        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1364    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1365        {
1366            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1367            reason_set.insert(reason);
1368        }
1369        let this = self.pin();
1370        Box::pin(async move { this.run_once(future).await })
1371    }
1372
1373    #[track_caller]
1374    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1375        self.start_once_process(future)
1376    }
1377}
1378
1379impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1380    #[instrument(level = "info", skip_all, name = "invalidate")]
1381    fn invalidate(&self, task: TaskId) {
1382        self.backend.invalidate_task(task, self);
1383    }
1384
1385    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1386    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1387        {
1388            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1389            reason_set.insert(reason);
1390        }
1391        self.backend.invalidate_task(task, self);
1392    }
1393
1394    fn invalidate_serialization(&self, task: TaskId) {
1395        self.backend.invalidate_serialization(task, self);
1396    }
1397
1398    fn try_read_task_output(
1399        &self,
1400        task: TaskId,
1401        options: ReadOutputOptions,
1402    ) -> Result<Result<RawVc, EventListener>> {
1403        self.backend.try_read_task_output(
1404            task,
1405            current_task_if_available("reading Vcs"),
1406            options,
1407            self,
1408        )
1409    }
1410
1411    fn try_read_task_cell(
1412        &self,
1413        task: TaskId,
1414        index: CellId,
1415        options: ReadCellOptions,
1416    ) -> Result<Result<TypedCellContent, EventListener>> {
1417        self.backend.try_read_task_cell(
1418            task,
1419            index,
1420            current_task_if_available("reading Vcs"),
1421            options,
1422            self,
1423        )
1424    }
1425
1426    fn try_read_own_task_cell(
1427        &self,
1428        current_task: TaskId,
1429        index: CellId,
1430        options: ReadCellOptions,
1431    ) -> Result<TypedCellContent> {
1432        self.backend
1433            .try_read_own_task_cell(current_task, index, options, self)
1434    }
1435
1436    fn try_read_local_output(
1437        &self,
1438        execution_id: ExecutionId,
1439        local_task_id: LocalTaskId,
1440    ) -> Result<Result<RawVc, EventListener>> {
1441        CURRENT_TASK_STATE.with(|gts| {
1442            let gts_read = gts.read().unwrap();
1443
1444            // Local Vcs are local to their parent task's current execution, and do not exist
1445            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1446            // marker trait. This assertion exists to handle any potential escapes that the
1447            // compile-time checks cannot capture.
1448            gts_read.assert_execution_id(execution_id);
1449
1450            match gts_read.get_local_task(local_task_id) {
1451                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1452                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1453            }
1454        })
1455    }
1456
1457    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1458        self.backend.read_task_collectibles(
1459            task,
1460            trait_id,
1461            current_task_if_available("reading collectibles"),
1462            self,
1463        )
1464    }
1465
1466    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1467        self.backend.emit_collectible(
1468            trait_type,
1469            collectible,
1470            current_task("emitting collectible"),
1471            self,
1472        );
1473    }
1474
1475    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1476        self.backend.unemit_collectible(
1477            trait_type,
1478            collectible,
1479            count,
1480            current_task("emitting collectible"),
1481            self,
1482        );
1483    }
1484
1485    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1486        for (&collectible, &count) in collectibles {
1487            if count > 0 {
1488                self.backend.unemit_collectible(
1489                    trait_type,
1490                    collectible,
1491                    count as u32,
1492                    current_task("emitting collectible"),
1493                    self,
1494                );
1495            }
1496        }
1497    }
1498
1499    fn read_own_task_cell(
1500        &self,
1501        task: TaskId,
1502        index: CellId,
1503        options: ReadCellOptions,
1504    ) -> Result<TypedCellContent> {
1505        self.try_read_own_task_cell(task, index, options)
1506    }
1507
1508    fn update_own_task_cell(
1509        &self,
1510        task: TaskId,
1511        index: CellId,
1512        is_serializable_cell_content: bool,
1513        content: CellContent,
1514        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1515        verification_mode: VerificationMode,
1516    ) {
1517        self.backend.update_task_cell(
1518            task,
1519            index,
1520            is_serializable_cell_content,
1521            content,
1522            updated_key_hashes,
1523            verification_mode,
1524            self,
1525        );
1526    }
1527
1528    fn connect_task(&self, task: TaskId) {
1529        self.backend
1530            .connect_task(task, current_task_if_available("connecting task"), self);
1531    }
1532
1533    fn mark_own_task_as_finished(&self, task: TaskId) {
1534        self.backend.mark_own_task_as_finished(task, self);
1535    }
1536
1537    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1538        self.backend
1539            .set_own_task_aggregation_number(task, aggregation_number, self);
1540    }
1541
1542    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1543        self.backend.mark_own_task_as_session_dependent(task, self);
1544    }
1545
1546    /// Creates a future that inherits the current task id and task state. The current global task
1547    /// will wait for this future to be dropped before exiting.
1548    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1549        // this is similar to what happens for a local task, except that we keep the local task's
1550        // state as well.
1551        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1552        let fut = tokio::spawn(TURBO_TASKS.scope(
1553            turbo_tasks(),
1554            CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1555        ));
1556        let fut = Box::pin(async move {
1557            fut.await.unwrap();
1558        });
1559        let mut ts = global_task_state.write().unwrap();
1560        ts.local_task_tracker
1561            .get_or_insert_default()
1562            .push(Either::Right(fut));
1563    }
1564
1565    fn task_statistics(&self) -> &TaskStatisticsApi {
1566        self.backend.task_statistics()
1567    }
1568
1569    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1570        let this = self.pin();
1571        Box::pin(async move {
1572            this.stop_and_wait().await;
1573        })
1574    }
1575
1576    fn subscribe_to_compilation_events(
1577        &self,
1578        event_types: Option<Vec<String>>,
1579    ) -> Receiver<Arc<dyn CompilationEvent>> {
1580        self.compilation_events.subscribe(event_types)
1581    }
1582
1583    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1584        if let Err(e) = self.compilation_events.send(event) {
1585            tracing::warn!("Failed to send compilation event: {e}");
1586        }
1587    }
1588
1589    fn is_tracking_dependencies(&self) -> bool {
1590        self.backend.is_tracking_dependencies()
1591    }
1592}
1593
1594impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1595    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1596        self.pin()
1597    }
1598    fn backend(&self) -> &B {
1599        &self.backend
1600    }
1601
1602    #[track_caller]
1603    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1604        self.schedule_background_job(async move |this| {
1605            this.backend.run_backend_job(job, &*this).await;
1606            this
1607        })
1608    }
1609
1610    #[track_caller]
1611    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1612        self.schedule_foreground_job(async move |this| {
1613            this.backend.run_backend_job(job, &*this).await;
1614            this
1615        })
1616    }
1617
1618    #[track_caller]
1619    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1620        self.schedule(task, priority)
1621    }
1622
1623    fn get_current_task_priority(&self) -> TaskPriority {
1624        CURRENT_TASK_STATE
1625            .try_with(|task_state| task_state.read().unwrap().priority)
1626            .unwrap_or(TaskPriority::initial())
1627    }
1628
1629    fn program_duration_until(&self, instant: Instant) -> Duration {
1630        instant - self.program_start
1631    }
1632
1633    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1634        // SAFETY: This is a fresh id from the factory
1635        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1636    }
1637
1638    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1639        // SAFETY: This is a fresh id from the factory
1640        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1641    }
1642
1643    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1644        unsafe { self.task_id_factory.reuse(id.into()) }
1645    }
1646
1647    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1648        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1649    }
1650
1651    fn is_idle(&self) -> bool {
1652        self.currently_scheduled_foreground_jobs
1653            .load(Ordering::Acquire)
1654            == 0
1655    }
1656}
1657
1658async fn wait_for_local_tasks() {
1659    while let Some(mut ltt) =
1660        CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1661    {
1662        use futures::StreamExt;
1663        while ltt.next().await.is_some() {}
1664    }
1665}
1666
1667pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1668    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1669        Ok(id) => id,
1670        Err(_) => panic!(
1671            "{from} can only be used in the context of a turbo_tasks task execution or \
1672             turbo_tasks run"
1673        ),
1674    }
1675}
1676
1677pub(crate) fn current_task(from: &str) -> TaskId {
1678    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1679        Ok(Some(id)) => id,
1680        Ok(None) | Err(_) => {
1681            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1682        }
1683    }
1684}
1685
1686pub async fn run<T: Send + 'static>(
1687    tt: Arc<dyn TurboTasksApi>,
1688    future: impl Future<Output = Result<T>> + Send + 'static,
1689) -> Result<T> {
1690    let (tx, rx) = tokio::sync::oneshot::channel();
1691
1692    tt.run(Box::pin(async move {
1693        let result = future.await?;
1694        tx.send(result)
1695            .map_err(|_| anyhow!("unable to send result"))?;
1696        Ok(())
1697    }))
1698    .await?;
1699
1700    Ok(rx.await?)
1701}
1702
1703pub async fn run_once<T: Send + 'static>(
1704    tt: Arc<dyn TurboTasksApi>,
1705    future: impl Future<Output = Result<T>> + Send + 'static,
1706) -> Result<T> {
1707    let (tx, rx) = tokio::sync::oneshot::channel();
1708
1709    tt.run_once(Box::pin(async move {
1710        let result = future.await?;
1711        tx.send(result)
1712            .map_err(|_| anyhow!("unable to send result"))?;
1713        Ok(())
1714    }))
1715    .await?;
1716
1717    Ok(rx.await?)
1718}
1719
1720pub async fn run_once_with_reason<T: Send + 'static>(
1721    tt: Arc<dyn TurboTasksApi>,
1722    reason: impl InvalidationReason,
1723    future: impl Future<Output = Result<T>> + Send + 'static,
1724) -> Result<T> {
1725    let (tx, rx) = tokio::sync::oneshot::channel();
1726
1727    tt.run_once_with_reason(
1728        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1729        Box::pin(async move {
1730            let result = future.await?;
1731            tx.send(result)
1732                .map_err(|_| anyhow!("unable to send result"))?;
1733            Ok(())
1734        }),
1735    )
1736    .await?;
1737
1738    Ok(rx.await?)
1739}
1740
1741/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1742pub fn dynamic_call(
1743    func: &'static NativeFunction,
1744    this: Option<RawVc>,
1745    arg: Box<dyn MagicAny>,
1746    persistence: TaskPersistence,
1747) -> RawVc {
1748    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1749}
1750
1751/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1752pub fn trait_call(
1753    trait_method: &'static TraitMethod,
1754    this: RawVc,
1755    arg: Box<dyn MagicAny>,
1756    persistence: TaskPersistence,
1757) -> RawVc {
1758    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1759}
1760
1761pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1762    TURBO_TASKS.with(|arc| arc.clone())
1763}
1764
1765pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1766    TURBO_TASKS.with(Arc::downgrade)
1767}
1768
1769pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1770    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1771}
1772
1773pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1774    TURBO_TASKS.with(|arc| func(arc))
1775}
1776
1777pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1778    TURBO_TASKS.sync_scope(tt, f)
1779}
1780
1781pub fn turbo_tasks_future_scope<T>(
1782    tt: Arc<dyn TurboTasksApi>,
1783    f: impl Future<Output = T>,
1784) -> impl Future<Output = T> {
1785    TURBO_TASKS.scope(tt, f)
1786}
1787
1788pub fn with_turbo_tasks_for_testing<T>(
1789    tt: Arc<dyn TurboTasksApi>,
1790    current_task: TaskId,
1791    execution_id: ExecutionId,
1792    f: impl Future<Output = T>,
1793) -> impl Future<Output = T> {
1794    TURBO_TASKS.scope(
1795        tt,
1796        CURRENT_TASK_STATE.scope(
1797            Arc::new(RwLock::new(CurrentTaskState::new(
1798                current_task,
1799                execution_id,
1800                TaskPriority::initial(),
1801            ))),
1802            f,
1803        ),
1804    )
1805}
1806
1807/// Spawns the given future within the context of the current task.
1808///
1809/// Beware: this method is not safe to use in production code. It is only
1810/// intended for use in tests and for debugging purposes.
1811pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1812    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1813}
1814
1815pub fn current_task_for_testing() -> Option<TaskId> {
1816    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1817}
1818
1819/// Marks the current task as dirty when restored from filesystem cache.
1820pub fn mark_session_dependent() {
1821    with_turbo_tasks(|tt| {
1822        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1823    });
1824}
1825
1826/// Marks the current task as a root in the aggregation graph.  This means it starts with the
1827/// correct aggregation number instead of needing to recompute it after the fact.
1828pub fn mark_root() {
1829    with_turbo_tasks(|tt| {
1830        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1831    });
1832}
1833
1834/// Marks the current task as finished. This excludes it from waiting for
1835/// strongly consistency.
1836pub fn mark_finished() {
1837    with_turbo_tasks(|tt| {
1838        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1839    });
1840}
1841
1842/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1843/// serialization of the current task cells
1844pub fn get_serialization_invalidator() -> SerializationInvalidator {
1845    CURRENT_TASK_STATE.with(|cell| {
1846        let CurrentTaskState { task_id, .. } = &mut *cell.write().unwrap();
1847        let Some(task_id) = *task_id else {
1848            panic!(
1849                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1850                 task execution"
1851            );
1852        };
1853        SerializationInvalidator::new(task_id)
1854    })
1855}
1856
1857pub fn mark_invalidator() {
1858    CURRENT_TASK_STATE.with(|cell| {
1859        let CurrentTaskState {
1860            has_invalidator, ..
1861        } = &mut *cell.write().unwrap();
1862        *has_invalidator = true;
1863    })
1864}
1865
1866pub fn prevent_gc() {
1867    // TODO implement garbage collection
1868}
1869
1870pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1871    with_turbo_tasks(|tt| {
1872        let raw_vc = collectible.node.node;
1873        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1874    })
1875}
1876
1877pub(crate) async fn read_task_output(
1878    this: &dyn TurboTasksApi,
1879    id: TaskId,
1880    options: ReadOutputOptions,
1881) -> Result<RawVc> {
1882    loop {
1883        match this.try_read_task_output(id, options)? {
1884            Ok(result) => return Ok(result),
1885            Err(listener) => listener.await,
1886        }
1887    }
1888}
1889
1890pub(crate) async fn read_task_cell(
1891    this: &dyn TurboTasksApi,
1892    id: TaskId,
1893    index: CellId,
1894    options: ReadCellOptions,
1895) -> Result<TypedCellContent> {
1896    loop {
1897        match this.try_read_task_cell(id, index, options)? {
1898            Ok(result) => return Ok(result),
1899            Err(listener) => listener.await,
1900        }
1901    }
1902}
1903
1904/// A reference to a task's cell with methods that allow updating the contents
1905/// of the cell.
1906///
1907/// Mutations should not outside of the task that that owns this cell. Doing so
1908/// is a logic error, and may lead to incorrect caching behavior.
1909#[derive(Clone, Copy)]
1910pub struct CurrentCellRef {
1911    current_task: TaskId,
1912    index: CellId,
1913    is_serializable_cell_content: bool,
1914}
1915
1916type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
1917
1918impl CurrentCellRef {
1919    /// Updates the cell if the given `functor` returns a value.
1920    fn conditional_update<T>(
1921        &self,
1922        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>)>,
1923    ) where
1924        T: VcValueType,
1925    {
1926        self.conditional_update_with_shared_reference(|old_shared_reference| {
1927            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
1928            let (new_value, updated_key_hashes) = functor(old_ref)?;
1929            Some((
1930                SharedReference::new(triomphe::Arc::new(new_value)),
1931                updated_key_hashes,
1932            ))
1933        })
1934    }
1935
1936    /// Updates the cell if the given `functor` returns a `SharedReference`.
1937    fn conditional_update_with_shared_reference(
1938        &self,
1939        functor: impl FnOnce(
1940            Option<&SharedReference>,
1941        ) -> Option<(SharedReference, Option<SmallVec<[u64; 2]>>)>,
1942    ) {
1943        let tt = turbo_tasks();
1944        let cell_content = tt
1945            .read_own_task_cell(
1946                self.current_task,
1947                self.index,
1948                ReadCellOptions {
1949                    // INVALIDATION: Reading our own cell must be untracked
1950                    tracking: ReadCellTracking::Untracked,
1951                    is_serializable_cell_content: self.is_serializable_cell_content,
1952                    final_read_hint: false,
1953                },
1954            )
1955            .ok();
1956        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1957        if let Some((update, updated_key_hashes)) = update {
1958            tt.update_own_task_cell(
1959                self.current_task,
1960                self.index,
1961                self.is_serializable_cell_content,
1962                CellContent(Some(update)),
1963                updated_key_hashes,
1964                VerificationMode::EqualityCheck,
1965            )
1966        }
1967    }
1968
1969    /// Replace the current cell's content with `new_value` if the current content is not equal by
1970    /// value with the existing content.
1971    ///
1972    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1973    ///
1974    /// Take this example of a custom equality implementation on a transparent wrapper type:
1975    ///
1976    /// ```
1977    /// #[turbo_tasks::value(transparent, eq = "manual")]
1978    /// struct Wrapper(Vec<u32>);
1979    ///
1980    /// impl PartialEq for Wrapper {
1981    ///     fn eq(&self, other: Wrapper) {
1982    ///         // Example: order doesn't matter for equality
1983    ///         let (mut this, mut other) = (self.clone(), other.clone());
1984    ///         this.sort_unstable();
1985    ///         other.sort_unstable();
1986    ///         this == other
1987    ///     }
1988    /// }
1989    ///
1990    /// impl Eq for Wrapper {}
1991    /// ```
1992    ///
1993    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
1994    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
1995    ///
1996    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
1997    /// just forwards to the inner value's [`PartialEq`].
1998    ///
1999    /// If you already have a `SharedReference`, consider calling
2000    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2001    /// object.
2002    pub fn compare_and_update<T>(&self, new_value: T)
2003    where
2004        T: PartialEq + VcValueType,
2005    {
2006        self.conditional_update(|old_value| {
2007            if let Some(old_value) = old_value
2008                && old_value == &new_value
2009            {
2010                return None;
2011            }
2012            Some((new_value, None))
2013        });
2014    }
2015
2016    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2017    /// equal by value with the existing content.
2018    ///
2019    /// If you already have a `SharedReference`, this is a faster version of
2020    /// [`CurrentCellRef::compare_and_update`].
2021    ///
2022    /// The value should be stored in [`SharedReference`] using the type `T`.
2023    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2024    where
2025        T: VcValueType + PartialEq,
2026    {
2027        self.conditional_update_with_shared_reference(|old_sr| {
2028            if let Some(old_sr) = old_sr {
2029                let old_value = extract_sr_value::<T>(old_sr);
2030                let new_value = extract_sr_value::<T>(&new_shared_reference);
2031                if old_value == new_value {
2032                    return None;
2033                }
2034            }
2035            Some((new_shared_reference, None))
2036        });
2037    }
2038
2039    /// See [`Self::compare_and_update`], but selectively update individual keys.
2040    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2041    where
2042        T: PartialEq + VcValueType,
2043        VcReadTarget<T>: Keyed,
2044        <VcReadTarget<T> as Keyed>::Key: std::hash::Hash,
2045    {
2046        self.conditional_update(|old_value| {
2047            let Some(old_value) = old_value else {
2048                return Some((new_value, None));
2049            };
2050            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2051            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2052            let updated_keys = old_value.different_keys(new_value_ref);
2053            if updated_keys.is_empty() {
2054                return None;
2055            }
2056            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2057            let updated_key_hashes = updated_keys
2058                .into_iter()
2059                .map(|key| FxBuildHasher.hash_one(key))
2060                .collect();
2061            Some((new_value, Some(updated_key_hashes)))
2062        });
2063    }
2064
2065    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2066    /// keys.
2067    pub fn keyed_compare_and_update_with_shared_reference<T>(
2068        &self,
2069        new_shared_reference: SharedReference,
2070    ) where
2071        T: VcValueType + PartialEq,
2072        VcReadTarget<T>: Keyed,
2073        <VcReadTarget<T> as Keyed>::Key: std::hash::Hash,
2074    {
2075        self.conditional_update_with_shared_reference(|old_sr| {
2076            let Some(old_sr) = old_sr else {
2077                return Some((new_shared_reference, None));
2078            };
2079            let old_value = extract_sr_value::<T>(old_sr);
2080            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2081            let new_value = extract_sr_value::<T>(&new_shared_reference);
2082            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2083            let updated_keys = old_value.different_keys(new_value);
2084            if updated_keys.is_empty() {
2085                return None;
2086            }
2087            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2088            let updated_key_hashes = updated_keys
2089                .into_iter()
2090                .map(|key| FxBuildHasher.hash_one(key))
2091                .collect();
2092            Some((new_shared_reference, Some(updated_key_hashes)))
2093        });
2094    }
2095
2096    /// Unconditionally updates the content of the cell.
2097    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2098    where
2099        T: VcValueType,
2100    {
2101        let tt = turbo_tasks();
2102        tt.update_own_task_cell(
2103            self.current_task,
2104            self.index,
2105            self.is_serializable_cell_content,
2106            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2107            None,
2108            verification_mode,
2109        )
2110    }
2111
2112    /// A faster version of [`Self::update`] if you already have a
2113    /// [`SharedReference`].
2114    ///
2115    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2116    /// by identity, no update is performed.
2117    ///
2118    /// The value should be stored in [`SharedReference`] using the type `T`.
2119    pub fn update_with_shared_reference(
2120        &self,
2121        shared_ref: SharedReference,
2122        verification_mode: VerificationMode,
2123    ) {
2124        let tt = turbo_tasks();
2125        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2126            let content = tt
2127                .read_own_task_cell(
2128                    self.current_task,
2129                    self.index,
2130                    ReadCellOptions {
2131                        // INVALIDATION: Reading our own cell must be untracked
2132                        tracking: ReadCellTracking::Untracked,
2133                        is_serializable_cell_content: self.is_serializable_cell_content,
2134                        final_read_hint: false,
2135                    },
2136                )
2137                .ok();
2138            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2139                // pointer equality (not value equality)
2140                shared_ref_exp != shared_ref
2141            } else {
2142                true
2143            }
2144        } else {
2145            true
2146        };
2147        if update {
2148            tt.update_own_task_cell(
2149                self.current_task,
2150                self.index,
2151                self.is_serializable_cell_content,
2152                CellContent(Some(shared_ref)),
2153                None,
2154                verification_mode,
2155            )
2156        }
2157    }
2158}
2159
2160impl From<CurrentCellRef> for RawVc {
2161    fn from(cell: CurrentCellRef) -> Self {
2162        RawVc::TaskCell(cell.current_task, cell.index)
2163    }
2164}
2165
2166fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2167    sr.0.downcast_ref::<T>()
2168        .expect("cannot update SharedReference of different type")
2169}
2170
2171pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2172    find_cell_by_id(T::get_value_type_id(), T::has_serialization())
2173}
2174
2175pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
2176    CURRENT_TASK_STATE.with(|ts| {
2177        let current_task = current_task("celling turbo_tasks values");
2178        let mut ts = ts.write().unwrap();
2179        let map = ts.cell_counters.as_mut().unwrap();
2180        let current_index = map.entry(ty).or_default();
2181        let index = *current_index;
2182        *current_index += 1;
2183        CurrentCellRef {
2184            current_task,
2185            index: CellId { type_id: ty, index },
2186            is_serializable_cell_content,
2187        }
2188    })
2189}
2190
2191pub(crate) async fn read_local_output(
2192    this: &dyn TurboTasksApi,
2193    execution_id: ExecutionId,
2194    local_task_id: LocalTaskId,
2195) -> Result<RawVc> {
2196    loop {
2197        match this.try_read_local_output(execution_id, local_task_id)? {
2198            Ok(raw_vc) => return Ok(raw_vc),
2199            Err(event_listener) => event_listener.await,
2200        }
2201    }
2202}