Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    pin::Pin,
8    sync::{
9        Arc, Mutex, RwLock, Weak,
10        atomic::{AtomicBool, AtomicUsize, Ordering},
11    },
12    time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25
26use crate::{
27    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
28    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
29    VcValueTrait, VcValueType,
30    backend::{
31        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
32        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
33    },
34    capture_future::CaptureFuture,
35    event::{Event, EventListener},
36    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
37    id_factory::IdFactoryWithReuse,
38    keyed::KeyedEq,
39    macro_helpers::NativeFunction,
40    magic_any::MagicAny,
41    message_queue::{CompilationEvent, CompilationEventQueue},
42    priority_runner::{Executor, JoinHandle, PriorityRunner},
43    raw_vc::{CellId, RawVc},
44    registry,
45    serialization_invalidation::SerializationInvalidator,
46    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactory, StaticOrArc},
50};
51
52/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
53/// tasks from function calls.
54pub trait TurboTasksCallApi: Sync + Send {
55    /// Calls a native function with arguments. Resolves arguments when needed
56    /// with a wrapper task.
57    fn dynamic_call(
58        &self,
59        native_fn: &'static NativeFunction,
60        this: Option<RawVc>,
61        arg: Box<dyn MagicAny>,
62        persistence: TaskPersistence,
63    ) -> RawVc;
64    /// Call a native function with arguments.
65    /// All inputs must be resolved.
66    fn native_call(
67        &self,
68        native_fn: &'static NativeFunction,
69        this: Option<RawVc>,
70        arg: Box<dyn MagicAny>,
71        persistence: TaskPersistence,
72    ) -> RawVc;
73    /// Calls a trait method with arguments. First input is the `self` object.
74    /// Uses a wrapper task to resolve
75    fn trait_call(
76        &self,
77        trait_method: &'static TraitMethod,
78        this: RawVc,
79        arg: Box<dyn MagicAny>,
80        persistence: TaskPersistence,
81    ) -> RawVc;
82
83    fn run(
84        &self,
85        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
86    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
87    fn run_once(
88        &self,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
91    fn run_once_with_reason(
92        &self,
93        reason: StaticOrArc<dyn InvalidationReason>,
94        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
95    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
96    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
97
98    /// Sends a compilation event to subscribers.
99    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
100
101    /// Returns a human-readable name for the given task.
102    fn get_task_name(&self, task: TaskId) -> String;
103}
104
105/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
106/// context. Returned by the [`turbo_tasks`] helper function.
107///
108/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
109/// parameter.
110pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
111    fn invalidate(&self, task: TaskId);
112    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
113
114    fn invalidate_serialization(&self, task: TaskId);
115
116    fn try_read_task_output(
117        &self,
118        task: TaskId,
119        options: ReadOutputOptions,
120    ) -> Result<Result<RawVc, EventListener>>;
121
122    fn try_read_task_cell(
123        &self,
124        task: TaskId,
125        index: CellId,
126        options: ReadCellOptions,
127    ) -> Result<Result<TypedCellContent, EventListener>>;
128
129    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
130    /// task points to.
131    ///
132    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
133    /// recursively or in a loop.
134    ///
135    /// This does not accept a consistency argument, as you cannot control consistency of a read of
136    /// an operation owned by your own task. Strongly consistent reads are only allowed on
137    /// [`OperationVc`]s, which should never be local tasks.
138    ///
139    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
140    /// task to depend on itself.
141    ///
142    /// [`OperationVc`]: crate::OperationVc
143    fn try_read_local_output(
144        &self,
145        execution_id: ExecutionId,
146        local_task_id: LocalTaskId,
147    ) -> Result<Result<RawVc, EventListener>>;
148
149    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
150
151    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
152    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
153    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
154
155    /// INVALIDATION: Be careful with this, it will not track dependencies, so
156    /// using it could break cache invalidation.
157    fn try_read_own_task_cell(
158        &self,
159        current_task: TaskId,
160        index: CellId,
161        options: ReadCellOptions,
162    ) -> Result<TypedCellContent>;
163
164    fn read_own_task_cell(
165        &self,
166        task: TaskId,
167        index: CellId,
168        options: ReadCellOptions,
169    ) -> Result<TypedCellContent>;
170    fn update_own_task_cell(
171        &self,
172        task: TaskId,
173        index: CellId,
174        is_serializable_cell_content: bool,
175        content: CellContent,
176        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
177        verification_mode: VerificationMode,
178    );
179    fn mark_own_task_as_finished(&self, task: TaskId);
180    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
181    fn mark_own_task_as_session_dependent(&self, task: TaskId);
182
183    fn connect_task(&self, task: TaskId);
184
185    /// Wraps the given future in the current task.
186    ///
187    /// Beware: this method is not safe to use in production code. It is only intended for use in
188    /// tests and for debugging purposes.
189    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
190
191    fn task_statistics(&self) -> &TaskStatisticsApi;
192
193    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
194
195    fn subscribe_to_compilation_events(
196        &self,
197        event_types: Option<Vec<String>>,
198    ) -> Receiver<Arc<dyn CompilationEvent>>;
199
200    // Returns true if TurboTasks is configured to track dependencies.
201    fn is_tracking_dependencies(&self) -> bool;
202}
203
204/// A wrapper around a value that is unused.
205pub struct Unused<T> {
206    inner: T,
207}
208
209impl<T> Unused<T> {
210    /// Creates a new unused value.
211    ///
212    /// # Safety
213    ///
214    /// The wrapped value must not be used.
215    pub unsafe fn new_unchecked(inner: T) -> Self {
216        Self { inner }
217    }
218
219    /// Get the inner value, without consuming the `Unused` wrapper.
220    ///
221    /// # Safety
222    ///
223    /// The user need to make sure that the value stays unused.
224    pub unsafe fn get_unchecked(&self) -> &T {
225        &self.inner
226    }
227
228    /// Unwraps the value, consuming the `Unused` wrapper.
229    pub fn into(self) -> T {
230        self.inner
231    }
232}
233
234/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
235pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
236    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
237
238    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
239    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
240    /// # Safety
241    ///
242    /// The caller must ensure that the task id is not used anymore.
243    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
244    /// # Safety
245    ///
246    /// The caller must ensure that the task id is not used anymore.
247    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
248
249    /// Schedule a task for execution.
250    fn schedule(&self, task: TaskId, priority: TaskPriority);
251
252    /// Returns the priority of the current task.
253    fn get_current_task_priority(&self) -> TaskPriority;
254
255    /// Schedule a foreground backend job for execution.
256    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
257
258    /// Schedule a background backend job for execution.
259    ///
260    /// Background jobs are not counted towards activeness of the system. The system is considered
261    /// idle even with active background jobs.
262    fn schedule_backend_background_job(&self, job: B::BackendJob);
263
264    /// Returns the duration from the start of the program to the given instant.
265    fn program_duration_until(&self, instant: Instant) -> Duration;
266
267    /// Returns true if the system is idle.
268    fn is_idle(&self) -> bool;
269
270    /// Returns a reference to the backend.
271    fn backend(&self) -> &B;
272}
273
274#[allow(clippy::manual_non_exhaustive)]
275pub struct UpdateInfo {
276    pub duration: Duration,
277    pub tasks: usize,
278    pub reasons: InvalidationReasonSet,
279    #[allow(dead_code)]
280    placeholder_for_future_fields: (),
281}
282
283#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
284pub enum TaskPersistence {
285    /// Tasks that may be persisted across sessions using serialization.
286    Persistent,
287
288    /// Tasks that will be persisted in memory for the life of this session, but won't persist
289    /// between sessions.
290    ///
291    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
292    /// type [`TransientValue`][crate::value::TransientValue] or
293    /// [`TransientInstance`][crate::value::TransientInstance].
294    Transient,
295}
296
297impl Display for TaskPersistence {
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        match self {
300            TaskPersistence::Persistent => write!(f, "persistent"),
301            TaskPersistence::Transient => write!(f, "transient"),
302        }
303    }
304}
305
306#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
307pub enum ReadConsistency {
308    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
309    /// may later trigger re-computation.
310    #[default]
311    Eventual,
312    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
313    /// the cost of slower reads.
314    ///
315    /// Top-level code that returns data to the user should use strongly consistent reads.
316    Strong,
317}
318
319#[derive(Clone, Copy, Debug, Eq, PartialEq)]
320pub enum ReadCellTracking {
321    /// Reads are tracked as dependencies of the current task.
322    Tracked {
323        /// The key used for the dependency
324        key: Option<u64>,
325    },
326    /// The read is only tracked when there is an error, otherwise it is untracked.
327    ///
328    /// INVALIDATION: Be careful with this, it will not track dependencies, so
329    /// using it could break cache invalidation.
330    TrackOnlyError,
331    /// The read is not tracked as a dependency of the current task.
332    ///
333    /// INVALIDATION: Be careful with this, it will not track dependencies, so
334    /// using it could break cache invalidation.
335    Untracked,
336}
337
338impl ReadCellTracking {
339    pub fn should_track(&self, is_err: bool) -> bool {
340        match self {
341            ReadCellTracking::Tracked { .. } => true,
342            ReadCellTracking::TrackOnlyError => is_err,
343            ReadCellTracking::Untracked => false,
344        }
345    }
346
347    pub fn key(&self) -> Option<u64> {
348        match self {
349            ReadCellTracking::Tracked { key } => *key,
350            ReadCellTracking::TrackOnlyError => None,
351            ReadCellTracking::Untracked => None,
352        }
353    }
354}
355
356impl Default for ReadCellTracking {
357    fn default() -> Self {
358        ReadCellTracking::Tracked { key: None }
359    }
360}
361
362impl Display for ReadCellTracking {
363    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364        match self {
365            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
366            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
367            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
368            ReadCellTracking::Untracked => write!(f, "untracked"),
369        }
370    }
371}
372
373#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
374pub enum ReadTracking {
375    /// Reads are tracked as dependencies of the current task.
376    #[default]
377    Tracked,
378    /// The read is only tracked when there is an error, otherwise it is untracked.
379    ///
380    /// INVALIDATION: Be careful with this, it will not track dependencies, so
381    /// using it could break cache invalidation.
382    TrackOnlyError,
383    /// The read is not tracked as a dependency of the current task.
384    ///
385    /// INVALIDATION: Be careful with this, it will not track dependencies, so
386    /// using it could break cache invalidation.
387    Untracked,
388}
389
390impl ReadTracking {
391    pub fn should_track(&self, is_err: bool) -> bool {
392        match self {
393            ReadTracking::Tracked => true,
394            ReadTracking::TrackOnlyError => is_err,
395            ReadTracking::Untracked => false,
396        }
397    }
398}
399
400impl Display for ReadTracking {
401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402        match self {
403            ReadTracking::Tracked => write!(f, "tracked"),
404            ReadTracking::TrackOnlyError => write!(f, "track only error"),
405            ReadTracking::Untracked => write!(f, "untracked"),
406        }
407    }
408}
409
410#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
411pub enum TaskPriority {
412    #[default]
413    Initial,
414    Invalidation {
415        priority: Reverse<u32>,
416    },
417}
418
419impl TaskPriority {
420    pub fn invalidation(priority: u32) -> Self {
421        Self::Invalidation {
422            priority: Reverse(priority),
423        }
424    }
425
426    pub fn initial() -> Self {
427        Self::Initial
428    }
429
430    pub fn leaf() -> Self {
431        Self::Invalidation {
432            priority: Reverse(0),
433        }
434    }
435
436    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
437        match self {
438            TaskPriority::Initial => parent_priority,
439            TaskPriority::Invalidation { priority } => {
440                if let TaskPriority::Invalidation {
441                    priority: parent_priority,
442                } = parent_priority
443                    && priority.0 < parent_priority.0
444                {
445                    Self::Invalidation {
446                        priority: Reverse(parent_priority.0.saturating_add(1)),
447                    }
448                } else {
449                    *self
450                }
451            }
452        }
453    }
454}
455
456impl Display for TaskPriority {
457    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
458        match self {
459            TaskPriority::Initial => write!(f, "initial"),
460            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
461        }
462    }
463}
464
465enum ScheduledTask {
466    Task {
467        task_id: TaskId,
468        span: Span,
469    },
470    LocalTask {
471        ty: LocalTaskSpec,
472        persistence: TaskPersistence,
473        local_task_id: LocalTaskId,
474        global_task_state: Arc<RwLock<CurrentTaskState>>,
475        span: Span,
476    },
477}
478
479pub struct TurboTasks<B: Backend + 'static> {
480    this: Weak<Self>,
481    backend: B,
482    task_id_factory: IdFactoryWithReuse<TaskId>,
483    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
484    execution_id_factory: IdFactory<ExecutionId>,
485    stopped: AtomicBool,
486    currently_scheduled_foreground_jobs: AtomicUsize,
487    currently_scheduled_background_jobs: AtomicUsize,
488    scheduled_tasks: AtomicUsize,
489    priority_runner:
490        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
491    start: Mutex<Option<Instant>>,
492    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
493    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
494    event_foreground_start: Event,
495    /// Event that is triggered when all foreground jobs are done
496    /// (currently_scheduled_foreground_jobs becomes zero)
497    event_foreground_done: Event,
498    /// Event that is triggered when all background jobs are done
499    event_background_done: Event,
500    program_start: Instant,
501    compilation_events: CompilationEventQueue,
502}
503
504type LocalTaskTracker = Option<
505    FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
506>;
507
508/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
509/// all share the same non-local task state.
510///
511/// A non-local task is one that:
512///
513/// - Has a unique task id.
514/// - Is potentially cached.
515/// - The backend is aware of.
516struct CurrentTaskState {
517    task_id: Option<TaskId>,
518    execution_id: ExecutionId,
519    priority: TaskPriority,
520
521    /// True if the current task has state in cells (interior mutability).
522    /// Only tracked when verify_determinism feature is enabled.
523    #[cfg(feature = "verify_determinism")]
524    stateful: bool,
525
526    /// True if the current task uses an external invalidator
527    has_invalidator: bool,
528
529    /// Tracks how many cells of each type has been allocated so far during this task execution.
530    /// When a task is re-executed, the cell count may not match the existing cell vec length.
531    ///
532    /// This is taken (and becomes `None`) during teardown of a task.
533    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
534
535    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
536    local_tasks: Vec<LocalTask>,
537
538    /// Tracks currently running local tasks, and defers cleanup of the global task until those
539    /// complete. Also used by `spawn_detached_for_testing`.
540    local_task_tracker: LocalTaskTracker,
541}
542
543impl CurrentTaskState {
544    fn new(task_id: TaskId, execution_id: ExecutionId, priority: TaskPriority) -> Self {
545        Self {
546            task_id: Some(task_id),
547            execution_id,
548            priority,
549            #[cfg(feature = "verify_determinism")]
550            stateful: false,
551            has_invalidator: false,
552            cell_counters: Some(AutoMap::default()),
553            local_tasks: Vec::new(),
554            local_task_tracker: None,
555        }
556    }
557
558    fn new_temporary(execution_id: ExecutionId, priority: TaskPriority) -> Self {
559        Self {
560            task_id: None,
561            execution_id,
562            priority,
563            #[cfg(feature = "verify_determinism")]
564            stateful: false,
565            has_invalidator: false,
566            cell_counters: None,
567            local_tasks: Vec::new(),
568            local_task_tracker: None,
569        }
570    }
571
572    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
573        if self.execution_id != expected_execution_id {
574            panic!(
575                "Local tasks can only be scheduled/awaited within the same execution of the \
576                 parent task that created them"
577            );
578        }
579    }
580
581    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
582        self.local_tasks.push(local_task);
583        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
584        if cfg!(debug_assertions) {
585            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
586        } else {
587            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
588        }
589    }
590
591    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
592        // local task ids are one-indexed (they use NonZeroU32)
593        &self.local_tasks[(*local_task_id as usize) - 1]
594    }
595
596    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
597        &mut self.local_tasks[(*local_task_id as usize) - 1]
598    }
599}
600
601// TODO implement our own thread pool and make these thread locals instead
602task_local! {
603    /// The current TurboTasks instance
604    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
605
606    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
607}
608
609impl<B: Backend + 'static> TurboTasks<B> {
610    // TODO better lifetime management for turbo tasks
611    // consider using unsafe for the task_local turbo tasks
612    // that should be safe as long tasks can't outlife turbo task
613    // so we probably want to make sure that all tasks are joined
614    // when trying to drop turbo tasks
615    pub fn new(backend: B) -> Arc<Self> {
616        let task_id_factory = IdFactoryWithReuse::new(
617            TaskId::MIN,
618            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
619        );
620        let transient_task_id_factory =
621            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
622        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
623        let this = Arc::new_cyclic(|this| Self {
624            this: this.clone(),
625            backend,
626            task_id_factory,
627            transient_task_id_factory,
628            execution_id_factory,
629            stopped: AtomicBool::new(false),
630            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
631            currently_scheduled_background_jobs: AtomicUsize::new(0),
632            scheduled_tasks: AtomicUsize::new(0),
633            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
634            start: Default::default(),
635            aggregated_update: Default::default(),
636            event_foreground_done: Event::new(|| {
637                || "TurboTasks::event_foreground_done".to_string()
638            }),
639            event_foreground_start: Event::new(|| {
640                || "TurboTasks::event_foreground_start".to_string()
641            }),
642            event_background_done: Event::new(|| {
643                || "TurboTasks::event_background_done".to_string()
644            }),
645            program_start: Instant::now(),
646            compilation_events: CompilationEventQueue::default(),
647        });
648        this.backend.startup(&*this);
649        this
650    }
651
652    pub fn pin(&self) -> Arc<Self> {
653        self.this.upgrade().unwrap()
654    }
655
656    /// Creates a new root task
657    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
658    where
659        T: ?Sized,
660        F: Fn() -> Fut + Send + Sync + Clone + 'static,
661        Fut: Future<Output = Result<Vc<T>>> + Send,
662    {
663        let id = self.backend.create_transient_task(
664            TransientTaskType::Root(Box::new(move || {
665                let functor = functor.clone();
666                Box::pin(async move {
667                    let raw_vc = functor().await?.node;
668                    raw_vc.to_non_local().await
669                })
670            })),
671            self,
672        );
673        self.schedule(id, TaskPriority::initial());
674        id
675    }
676
677    pub fn dispose_root_task(&self, task_id: TaskId) {
678        self.backend.dispose_root_task(task_id, self);
679    }
680
681    // TODO make sure that all dependencies settle before reading them
682    /// Creates a new root task, that is only executed once.
683    /// Dependencies will not invalidate the task.
684    #[track_caller]
685    fn spawn_once_task<T, Fut>(&self, future: Fut)
686    where
687        T: ?Sized,
688        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
689    {
690        let id = self.backend.create_transient_task(
691            TransientTaskType::Once(Box::pin(async move {
692                let raw_vc = future.await?.node;
693                raw_vc.to_non_local().await
694            })),
695            self,
696        );
697        self.schedule(id, TaskPriority::initial());
698    }
699
700    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
701        &self,
702        future: impl Future<Output = Result<T>> + Send + 'static,
703    ) -> Result<T> {
704        let (tx, rx) = tokio::sync::oneshot::channel();
705        self.spawn_once_task(async move {
706            let result = future.await;
707            tx.send(result)
708                .map_err(|_| anyhow!("unable to send result"))?;
709            Ok(Completion::new())
710        });
711
712        rx.await?
713    }
714
715    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
716    pub async fn run<T: TraceRawVcs + Send + 'static>(
717        &self,
718        future: impl Future<Output = Result<T>> + Send + 'static,
719    ) -> Result<T, TurboTasksExecutionError> {
720        self.begin_foreground_job();
721        // it's okay for execution ids to overflow and wrap, they're just used for an assert
722        let execution_id = self.execution_id_factory.wrapping_get();
723        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
724            execution_id,
725            TaskPriority::initial(),
726        )));
727
728        let result = TURBO_TASKS
729            .scope(
730                self.pin(),
731                CURRENT_TASK_STATE.scope(current_task_state, async {
732                    let result = CaptureFuture::new(future).await;
733
734                    // wait for all spawned local tasks using `local` to finish
735                    wait_for_local_tasks().await;
736
737                    match result {
738                        Ok(Ok(raw_vc)) => Ok(raw_vc),
739                        Ok(Err(err)) => Err(err.into()),
740                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
741                    }
742                }),
743            )
744            .await;
745        self.finish_foreground_job();
746        result
747    }
748
749    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
750        let this = self.pin();
751        tokio::spawn(async move {
752            this.pin()
753                .run_once(async move {
754                    this.finish_foreground_job();
755                    future.await;
756                    this.begin_foreground_job();
757                    Ok(())
758                })
759                .await
760                .unwrap()
761        });
762    }
763
764    pub(crate) fn native_call(
765        &self,
766        native_fn: &'static NativeFunction,
767        this: Option<RawVc>,
768        arg: Box<dyn MagicAny>,
769        persistence: TaskPersistence,
770    ) -> RawVc {
771        let task_type = CachedTaskType {
772            native_fn,
773            this,
774            arg,
775        };
776        RawVc::TaskOutput(match persistence {
777            TaskPersistence::Transient => self.backend.get_or_create_transient_task(
778                task_type,
779                current_task_if_available("turbo_function calls"),
780                self,
781            ),
782            TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
783                task_type,
784                current_task_if_available("turbo_function calls"),
785                self,
786            ),
787        })
788    }
789
790    pub fn dynamic_call(
791        &self,
792        native_fn: &'static NativeFunction,
793        this: Option<RawVc>,
794        arg: Box<dyn MagicAny>,
795        persistence: TaskPersistence,
796    ) -> RawVc {
797        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
798            return self.native_call(native_fn, this, arg, persistence);
799        }
800        let task_type = LocalTaskSpec {
801            task_type: LocalTaskType::ResolveNative { native_fn },
802            this,
803            arg,
804        };
805        self.schedule_local_task(task_type, persistence)
806    }
807
808    pub fn trait_call(
809        &self,
810        trait_method: &'static TraitMethod,
811        this: RawVc,
812        arg: Box<dyn MagicAny>,
813        persistence: TaskPersistence,
814    ) -> RawVc {
815        // avoid creating a wrapper task if self is already resolved
816        // for resolved cells we already know the value type so we can lookup the
817        // function
818        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
819            match registry::get_value_type(type_id).get_trait_method(trait_method) {
820                Some(native_fn) => {
821                    let arg = native_fn.arg_meta.filter_owned(arg);
822                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
823                }
824                None => {
825                    // We are destined to fail at this point, but we just retry resolution in the
826                    // local task since we cannot report an error from here.
827                    // TODO: A panic seems appropriate since the immediate caller is to blame
828                }
829            }
830        }
831
832        // create a wrapper task to resolve all inputs
833        let task_type = LocalTaskSpec {
834            task_type: LocalTaskType::ResolveTrait { trait_method },
835            this: Some(this),
836            arg,
837        };
838
839        self.schedule_local_task(task_type, persistence)
840    }
841
842    #[track_caller]
843    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
844        self.begin_foreground_job();
845        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
846
847        self.priority_runner.schedule(
848            &self.pin(),
849            ScheduledTask::Task {
850                task_id,
851                span: Span::current(),
852            },
853            priority,
854        );
855    }
856
857    fn schedule_local_task(
858        &self,
859        ty: LocalTaskSpec,
860        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
861        persistence: TaskPersistence,
862    ) -> RawVc {
863        let task_type = ty.task_type;
864        let (global_task_state, execution_id, priority, local_task_id) =
865            CURRENT_TASK_STATE.with(|gts| {
866                let mut gts_write = gts.write().unwrap();
867                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
868                    done_event: Event::new(move || {
869                        move || format!("LocalTask({task_type})::done_event")
870                    }),
871                });
872                (
873                    Arc::clone(gts),
874                    gts_write.execution_id,
875                    gts_write.priority,
876                    local_task_id,
877                )
878            });
879
880        let future = self.priority_runner.schedule_with_join_handle(
881            &self.pin(),
882            ScheduledTask::LocalTask {
883                ty,
884                persistence,
885                local_task_id,
886                global_task_state: global_task_state.clone(),
887                span: Span::current(),
888            },
889            priority,
890        );
891        global_task_state
892            .write()
893            .unwrap()
894            .local_task_tracker
895            .get_or_insert_default()
896            .push(Either::Left(future));
897
898        RawVc::LocalOutput(execution_id, local_task_id, persistence)
899    }
900
901    fn begin_foreground_job(&self) {
902        if self
903            .currently_scheduled_foreground_jobs
904            .fetch_add(1, Ordering::AcqRel)
905            == 0
906        {
907            *self.start.lock().unwrap() = Some(Instant::now());
908            self.event_foreground_start.notify(usize::MAX);
909            self.backend.idle_end(self);
910        }
911    }
912
913    fn finish_foreground_job(&self) {
914        if self
915            .currently_scheduled_foreground_jobs
916            .fetch_sub(1, Ordering::AcqRel)
917            == 1
918        {
919            self.backend.idle_start(self);
920            // That's not super race-condition-safe, but it's only for
921            // statistical reasons
922            let total = self.scheduled_tasks.load(Ordering::Acquire);
923            self.scheduled_tasks.store(0, Ordering::Release);
924            if let Some(start) = *self.start.lock().unwrap() {
925                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
926                if let Some(update) = update.as_mut() {
927                    update.0 += start.elapsed();
928                    update.1 += total;
929                } else {
930                    *update = Some((start.elapsed(), total));
931                }
932            }
933            self.event_foreground_done.notify(usize::MAX);
934        }
935    }
936
937    fn begin_background_job(&self) {
938        self.currently_scheduled_background_jobs
939            .fetch_add(1, Ordering::Relaxed);
940    }
941
942    fn finish_background_job(&self) {
943        if self
944            .currently_scheduled_background_jobs
945            .fetch_sub(1, Ordering::Relaxed)
946            == 1
947        {
948            self.event_background_done.notify(usize::MAX);
949        }
950    }
951
952    pub fn get_in_progress_count(&self) -> usize {
953        self.currently_scheduled_foreground_jobs
954            .load(Ordering::Acquire)
955    }
956
957    /// Waits for the given task to finish executing. This works by performing an untracked read,
958    /// and discarding the value of the task output.
959    ///
960    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
961    /// before all dependencies have completely settled.
962    ///
963    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
964    /// to fully settle before returning.
965    ///
966    /// As this function is typically called in top-level code that waits for results to be ready
967    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
968    pub async fn wait_task_completion(
969        &self,
970        id: TaskId,
971        consistency: ReadConsistency,
972    ) -> Result<()> {
973        read_task_output(
974            self,
975            id,
976            ReadOutputOptions {
977                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
978                tracking: ReadTracking::Untracked,
979                consistency,
980            },
981        )
982        .await?;
983        Ok(())
984    }
985
986    /// Returns [UpdateInfo] with all updates aggregated over a given duration
987    /// (`aggregation`). Will wait until an update happens.
988    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
989        self.aggregated_update_info(aggregation, Duration::MAX)
990            .await
991            .unwrap()
992    }
993
994    /// Returns [UpdateInfo] with all updates aggregated over a given duration
995    /// (`aggregation`). Will only return None when the timeout is reached while
996    /// waiting for the first update.
997    pub async fn aggregated_update_info(
998        &self,
999        aggregation: Duration,
1000        timeout: Duration,
1001    ) -> Option<UpdateInfo> {
1002        let listener = self
1003            .event_foreground_done
1004            .listen_with_note(|| || "wait for update info".to_string());
1005        let wait_for_finish = {
1006            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1007            if aggregation.is_zero() {
1008                if let Some((duration, tasks)) = update.take() {
1009                    return Some(UpdateInfo {
1010                        duration,
1011                        tasks,
1012                        reasons: take(reason_set),
1013                        placeholder_for_future_fields: (),
1014                    });
1015                } else {
1016                    true
1017                }
1018            } else {
1019                update.is_none()
1020            }
1021        };
1022        if wait_for_finish {
1023            if timeout == Duration::MAX {
1024                // wait for finish
1025                listener.await;
1026            } else {
1027                // wait for start, then wait for finish or timeout
1028                let start_listener = self
1029                    .event_foreground_start
1030                    .listen_with_note(|| || "wait for update info".to_string());
1031                if self
1032                    .currently_scheduled_foreground_jobs
1033                    .load(Ordering::Acquire)
1034                    == 0
1035                {
1036                    start_listener.await;
1037                } else {
1038                    drop(start_listener);
1039                }
1040                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1041                    // Timeout
1042                    return None;
1043                }
1044            }
1045        }
1046        if !aggregation.is_zero() {
1047            loop {
1048                select! {
1049                    () = tokio::time::sleep(aggregation) => {
1050                        break;
1051                    }
1052                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1053                        // Resets the sleep
1054                    }
1055                }
1056            }
1057        }
1058        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1059        if let Some((duration, tasks)) = update.take() {
1060            Some(UpdateInfo {
1061                duration,
1062                tasks,
1063                reasons: take(reason_set),
1064                placeholder_for_future_fields: (),
1065            })
1066        } else {
1067            panic!("aggregated_update_info must not called concurrently")
1068        }
1069    }
1070
1071    pub async fn wait_background_done(&self) {
1072        let listener = self.event_background_done.listen();
1073        if self
1074            .currently_scheduled_background_jobs
1075            .load(Ordering::Acquire)
1076            != 0
1077        {
1078            listener.await;
1079        }
1080    }
1081
1082    pub async fn stop_and_wait(&self) {
1083        turbo_tasks_future_scope(self.pin(), async move {
1084            self.backend.stopping(self);
1085            self.stopped.store(true, Ordering::Release);
1086            {
1087                let listener = self
1088                    .event_foreground_done
1089                    .listen_with_note(|| || "wait for stop".to_string());
1090                if self
1091                    .currently_scheduled_foreground_jobs
1092                    .load(Ordering::Acquire)
1093                    != 0
1094                {
1095                    listener.await;
1096                }
1097            }
1098            {
1099                let listener = self.event_background_done.listen();
1100                if self
1101                    .currently_scheduled_background_jobs
1102                    .load(Ordering::Acquire)
1103                    != 0
1104                {
1105                    listener.await;
1106                }
1107            }
1108            self.backend.stop(self);
1109        })
1110        .await;
1111    }
1112
1113    #[track_caller]
1114    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1115    where
1116        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1117        T::CallOnceFuture: Send,
1118    {
1119        let mut this = self.pin();
1120        this.begin_foreground_job();
1121        tokio::spawn(
1122            TURBO_TASKS
1123                .scope(this.clone(), async move {
1124                    if !this.stopped.load(Ordering::Acquire) {
1125                        this = func(this.clone()).await;
1126                    }
1127                    this.finish_foreground_job();
1128                })
1129                .in_current_span(),
1130        );
1131    }
1132
1133    #[track_caller]
1134    pub(crate) fn schedule_background_job<T>(&self, func: T)
1135    where
1136        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1137        T::CallOnceFuture: Send,
1138    {
1139        let mut this = self.pin();
1140        self.begin_background_job();
1141        tokio::spawn(
1142            TURBO_TASKS
1143                .scope(this.clone(), async move {
1144                    if !this.stopped.load(Ordering::Acquire) {
1145                        this = func(this).await;
1146                    }
1147                    this.finish_background_job();
1148                })
1149                .in_current_span(),
1150        );
1151    }
1152
1153    fn finish_current_task_state(&self) -> FinishedTaskState {
1154        CURRENT_TASK_STATE.with(|cell| {
1155            let current_task_state = &*cell.write().unwrap();
1156            FinishedTaskState {
1157                #[cfg(feature = "verify_determinism")]
1158                stateful: current_task_state.stateful,
1159                has_invalidator: current_task_state.has_invalidator,
1160            }
1161        })
1162    }
1163
1164    pub fn backend(&self) -> &B {
1165        &self.backend
1166    }
1167}
1168
1169struct TurboTasksExecutor;
1170
1171impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1172    type Future = impl Future<Output = ()> + Send + 'static;
1173
1174    fn execute(
1175        &self,
1176        this: &Arc<TurboTasks<B>>,
1177        scheduled_task: ScheduledTask,
1178        priority: TaskPriority,
1179    ) -> Self::Future {
1180        match scheduled_task {
1181            ScheduledTask::Task { task_id, span } => {
1182                let this2 = this.clone();
1183                let this = this.clone();
1184                let future = async move {
1185                    let mut schedule_again = true;
1186                    while schedule_again {
1187                        // it's okay for execution ids to overflow and wrap, they're just used for
1188                        // an assert
1189                        let execution_id = this.execution_id_factory.wrapping_get();
1190                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1191                            task_id,
1192                            execution_id,
1193                            priority,
1194                        )));
1195                        let single_execution_future = async {
1196                            if this.stopped.load(Ordering::Acquire) {
1197                                this.backend.task_execution_canceled(task_id, &*this);
1198                                return false;
1199                            }
1200
1201                            let Some(TaskExecutionSpec { future, span }) = this
1202                                .backend
1203                                .try_start_task_execution(task_id, priority, &*this)
1204                            else {
1205                                return false;
1206                            };
1207
1208                            async {
1209                                let result = CaptureFuture::new(future).await;
1210
1211                                // wait for all spawned local tasks using `local` to finish
1212                                wait_for_local_tasks().await;
1213
1214                                let result = match result {
1215                                    Ok(Ok(raw_vc)) => Ok(raw_vc),
1216                                    Ok(Err(err)) => Err(err.into()),
1217                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1218                                };
1219
1220                                let finihed_state = this.finish_current_task_state();
1221                                let cell_counters = CURRENT_TASK_STATE
1222                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1223                                this.backend.task_execution_completed(
1224                                    task_id,
1225                                    result,
1226                                    &cell_counters,
1227                                    #[cfg(feature = "verify_determinism")]
1228                                    finihed_state.stateful,
1229                                    finihed_state.has_invalidator,
1230                                    &*this,
1231                                )
1232                            }
1233                            .instrument(span)
1234                            .await
1235                        };
1236                        schedule_again = CURRENT_TASK_STATE
1237                            .scope(current_task_state, single_execution_future)
1238                            .await;
1239                    }
1240                    this.finish_foreground_job();
1241                };
1242
1243                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1244            }
1245            ScheduledTask::LocalTask {
1246                ty,
1247                persistence,
1248                local_task_id,
1249                global_task_state,
1250                span,
1251            } => {
1252                let this2 = this.clone();
1253                let this = this.clone();
1254                let task_type = ty.task_type;
1255                let future = async move {
1256                    let span = match &ty.task_type {
1257                        LocalTaskType::ResolveNative { native_fn } => {
1258                            native_fn.resolve_span(priority)
1259                        }
1260                        LocalTaskType::ResolveTrait { trait_method } => {
1261                            trait_method.resolve_span(priority)
1262                        }
1263                    };
1264                    async move {
1265                        let result = match ty.task_type {
1266                            LocalTaskType::ResolveNative { native_fn } => {
1267                                LocalTaskType::run_resolve_native(
1268                                    native_fn,
1269                                    ty.this,
1270                                    &*ty.arg,
1271                                    persistence,
1272                                    this,
1273                                )
1274                                .await
1275                            }
1276                            LocalTaskType::ResolveTrait { trait_method } => {
1277                                LocalTaskType::run_resolve_trait(
1278                                    trait_method,
1279                                    ty.this.unwrap(),
1280                                    &*ty.arg,
1281                                    persistence,
1282                                    this,
1283                                )
1284                                .await
1285                            }
1286                        };
1287
1288                        let output = match result {
1289                            Ok(raw_vc) => OutputContent::Link(raw_vc),
1290                            Err(err) => OutputContent::Error(
1291                                TurboTasksExecutionError::from(err)
1292                                    .with_local_task_context(task_type.to_string()),
1293                            ),
1294                        };
1295
1296                        let local_task = LocalTask::Done { output };
1297
1298                        let done_event = CURRENT_TASK_STATE.with(move |gts| {
1299                            let mut gts_write = gts.write().unwrap();
1300                            let scheduled_task = std::mem::replace(
1301                                gts_write.get_mut_local_task(local_task_id),
1302                                local_task,
1303                            );
1304                            let LocalTask::Scheduled { done_event } = scheduled_task else {
1305                                panic!("local task finished, but was not in the scheduled state?");
1306                            };
1307                            done_event
1308                        });
1309                        done_event.notify(usize::MAX)
1310                    }
1311                    .instrument(span)
1312                    .await
1313                };
1314                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1315
1316                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1317            }
1318        }
1319    }
1320}
1321
1322struct FinishedTaskState {
1323    /// True if the task has state in cells (interior mutability).
1324    /// Only tracked when verify_determinism feature is enabled.
1325    #[cfg(feature = "verify_determinism")]
1326    stateful: bool,
1327
1328    /// True if the task uses an external invalidator
1329    has_invalidator: bool,
1330}
1331
1332impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1333    fn dynamic_call(
1334        &self,
1335        native_fn: &'static NativeFunction,
1336        this: Option<RawVc>,
1337        arg: Box<dyn MagicAny>,
1338        persistence: TaskPersistence,
1339    ) -> RawVc {
1340        self.dynamic_call(native_fn, this, arg, persistence)
1341    }
1342    fn native_call(
1343        &self,
1344        native_fn: &'static NativeFunction,
1345        this: Option<RawVc>,
1346        arg: Box<dyn MagicAny>,
1347        persistence: TaskPersistence,
1348    ) -> RawVc {
1349        self.native_call(native_fn, this, arg, persistence)
1350    }
1351    fn trait_call(
1352        &self,
1353        trait_method: &'static TraitMethod,
1354        this: RawVc,
1355        arg: Box<dyn MagicAny>,
1356        persistence: TaskPersistence,
1357    ) -> RawVc {
1358        self.trait_call(trait_method, this, arg, persistence)
1359    }
1360
1361    #[track_caller]
1362    fn run(
1363        &self,
1364        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1365    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1366        let this = self.pin();
1367        Box::pin(async move { this.run(future).await })
1368    }
1369
1370    #[track_caller]
1371    fn run_once(
1372        &self,
1373        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1374    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1375        let this = self.pin();
1376        Box::pin(async move { this.run_once(future).await })
1377    }
1378
1379    #[track_caller]
1380    fn run_once_with_reason(
1381        &self,
1382        reason: StaticOrArc<dyn InvalidationReason>,
1383        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1384    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1385        {
1386            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1387            reason_set.insert(reason);
1388        }
1389        let this = self.pin();
1390        Box::pin(async move { this.run_once(future).await })
1391    }
1392
1393    #[track_caller]
1394    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1395        self.start_once_process(future)
1396    }
1397
1398    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1399        if let Err(e) = self.compilation_events.send(event) {
1400            tracing::warn!("Failed to send compilation event: {e}");
1401        }
1402    }
1403
1404    fn get_task_name(&self, task: TaskId) -> String {
1405        self.backend.get_task_name(task, self)
1406    }
1407}
1408
1409impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1410    #[instrument(level = "info", skip_all, name = "invalidate")]
1411    fn invalidate(&self, task: TaskId) {
1412        self.backend.invalidate_task(task, self);
1413    }
1414
1415    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1416    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1417        {
1418            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1419            reason_set.insert(reason);
1420        }
1421        self.backend.invalidate_task(task, self);
1422    }
1423
1424    fn invalidate_serialization(&self, task: TaskId) {
1425        self.backend.invalidate_serialization(task, self);
1426    }
1427
1428    fn try_read_task_output(
1429        &self,
1430        task: TaskId,
1431        options: ReadOutputOptions,
1432    ) -> Result<Result<RawVc, EventListener>> {
1433        self.backend.try_read_task_output(
1434            task,
1435            current_task_if_available("reading Vcs"),
1436            options,
1437            self,
1438        )
1439    }
1440
1441    fn try_read_task_cell(
1442        &self,
1443        task: TaskId,
1444        index: CellId,
1445        options: ReadCellOptions,
1446    ) -> Result<Result<TypedCellContent, EventListener>> {
1447        self.backend.try_read_task_cell(
1448            task,
1449            index,
1450            current_task_if_available("reading Vcs"),
1451            options,
1452            self,
1453        )
1454    }
1455
1456    fn try_read_own_task_cell(
1457        &self,
1458        current_task: TaskId,
1459        index: CellId,
1460        options: ReadCellOptions,
1461    ) -> Result<TypedCellContent> {
1462        self.backend
1463            .try_read_own_task_cell(current_task, index, options, self)
1464    }
1465
1466    fn try_read_local_output(
1467        &self,
1468        execution_id: ExecutionId,
1469        local_task_id: LocalTaskId,
1470    ) -> Result<Result<RawVc, EventListener>> {
1471        CURRENT_TASK_STATE.with(|gts| {
1472            let gts_read = gts.read().unwrap();
1473
1474            // Local Vcs are local to their parent task's current execution, and do not exist
1475            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1476            // marker trait. This assertion exists to handle any potential escapes that the
1477            // compile-time checks cannot capture.
1478            gts_read.assert_execution_id(execution_id);
1479
1480            match gts_read.get_local_task(local_task_id) {
1481                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1482                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1483            }
1484        })
1485    }
1486
1487    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1488        self.backend.read_task_collectibles(
1489            task,
1490            trait_id,
1491            current_task_if_available("reading collectibles"),
1492            self,
1493        )
1494    }
1495
1496    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1497        self.backend.emit_collectible(
1498            trait_type,
1499            collectible,
1500            current_task("emitting collectible"),
1501            self,
1502        );
1503    }
1504
1505    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1506        self.backend.unemit_collectible(
1507            trait_type,
1508            collectible,
1509            count,
1510            current_task("emitting collectible"),
1511            self,
1512        );
1513    }
1514
1515    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1516        for (&collectible, &count) in collectibles {
1517            if count > 0 {
1518                self.backend.unemit_collectible(
1519                    trait_type,
1520                    collectible,
1521                    count as u32,
1522                    current_task("emitting collectible"),
1523                    self,
1524                );
1525            }
1526        }
1527    }
1528
1529    fn read_own_task_cell(
1530        &self,
1531        task: TaskId,
1532        index: CellId,
1533        options: ReadCellOptions,
1534    ) -> Result<TypedCellContent> {
1535        self.try_read_own_task_cell(task, index, options)
1536    }
1537
1538    fn update_own_task_cell(
1539        &self,
1540        task: TaskId,
1541        index: CellId,
1542        is_serializable_cell_content: bool,
1543        content: CellContent,
1544        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1545        verification_mode: VerificationMode,
1546    ) {
1547        self.backend.update_task_cell(
1548            task,
1549            index,
1550            is_serializable_cell_content,
1551            content,
1552            updated_key_hashes,
1553            verification_mode,
1554            self,
1555        );
1556    }
1557
1558    fn connect_task(&self, task: TaskId) {
1559        self.backend
1560            .connect_task(task, current_task_if_available("connecting task"), self);
1561    }
1562
1563    fn mark_own_task_as_finished(&self, task: TaskId) {
1564        self.backend.mark_own_task_as_finished(task, self);
1565    }
1566
1567    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1568        self.backend
1569            .set_own_task_aggregation_number(task, aggregation_number, self);
1570    }
1571
1572    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1573        self.backend.mark_own_task_as_session_dependent(task, self);
1574    }
1575
1576    /// Creates a future that inherits the current task id and task state. The current global task
1577    /// will wait for this future to be dropped before exiting.
1578    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1579        // this is similar to what happens for a local task, except that we keep the local task's
1580        // state as well.
1581        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1582        let fut = tokio::spawn(TURBO_TASKS.scope(
1583            turbo_tasks(),
1584            CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1585        ));
1586        let fut = Box::pin(async move {
1587            fut.await.unwrap();
1588        });
1589        let mut ts = global_task_state.write().unwrap();
1590        ts.local_task_tracker
1591            .get_or_insert_default()
1592            .push(Either::Right(fut));
1593    }
1594
1595    fn task_statistics(&self) -> &TaskStatisticsApi {
1596        self.backend.task_statistics()
1597    }
1598
1599    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1600        let this = self.pin();
1601        Box::pin(async move {
1602            this.stop_and_wait().await;
1603        })
1604    }
1605
1606    fn subscribe_to_compilation_events(
1607        &self,
1608        event_types: Option<Vec<String>>,
1609    ) -> Receiver<Arc<dyn CompilationEvent>> {
1610        self.compilation_events.subscribe(event_types)
1611    }
1612
1613    fn is_tracking_dependencies(&self) -> bool {
1614        self.backend.is_tracking_dependencies()
1615    }
1616}
1617
1618impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1619    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1620        self.pin()
1621    }
1622    fn backend(&self) -> &B {
1623        &self.backend
1624    }
1625
1626    #[track_caller]
1627    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1628        self.schedule_background_job(async move |this| {
1629            this.backend.run_backend_job(job, &*this).await;
1630            this
1631        })
1632    }
1633
1634    #[track_caller]
1635    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1636        self.schedule_foreground_job(async move |this| {
1637            this.backend.run_backend_job(job, &*this).await;
1638            this
1639        })
1640    }
1641
1642    #[track_caller]
1643    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1644        self.schedule(task, priority)
1645    }
1646
1647    fn get_current_task_priority(&self) -> TaskPriority {
1648        CURRENT_TASK_STATE
1649            .try_with(|task_state| task_state.read().unwrap().priority)
1650            .unwrap_or(TaskPriority::initial())
1651    }
1652
1653    fn program_duration_until(&self, instant: Instant) -> Duration {
1654        instant - self.program_start
1655    }
1656
1657    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1658        // SAFETY: This is a fresh id from the factory
1659        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1660    }
1661
1662    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1663        // SAFETY: This is a fresh id from the factory
1664        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1665    }
1666
1667    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1668        unsafe { self.task_id_factory.reuse(id.into()) }
1669    }
1670
1671    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1672        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1673    }
1674
1675    fn is_idle(&self) -> bool {
1676        self.currently_scheduled_foreground_jobs
1677            .load(Ordering::Acquire)
1678            == 0
1679    }
1680}
1681
1682async fn wait_for_local_tasks() {
1683    while let Some(mut ltt) =
1684        CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1685    {
1686        use futures::StreamExt;
1687        while ltt.next().await.is_some() {}
1688    }
1689}
1690
1691pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1692    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1693        Ok(id) => id,
1694        Err(_) => panic!(
1695            "{from} can only be used in the context of a turbo_tasks task execution or \
1696             turbo_tasks run"
1697        ),
1698    }
1699}
1700
1701pub(crate) fn current_task(from: &str) -> TaskId {
1702    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1703        Ok(Some(id)) => id,
1704        Ok(None) | Err(_) => {
1705            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1706        }
1707    }
1708}
1709
1710pub async fn run<T: Send + 'static>(
1711    tt: Arc<dyn TurboTasksApi>,
1712    future: impl Future<Output = Result<T>> + Send + 'static,
1713) -> Result<T> {
1714    let (tx, rx) = tokio::sync::oneshot::channel();
1715
1716    tt.run(Box::pin(async move {
1717        let result = future.await?;
1718        tx.send(result)
1719            .map_err(|_| anyhow!("unable to send result"))?;
1720        Ok(())
1721    }))
1722    .await?;
1723
1724    Ok(rx.await?)
1725}
1726
1727pub async fn run_once<T: Send + 'static>(
1728    tt: Arc<dyn TurboTasksApi>,
1729    future: impl Future<Output = Result<T>> + Send + 'static,
1730) -> Result<T> {
1731    let (tx, rx) = tokio::sync::oneshot::channel();
1732
1733    tt.run_once(Box::pin(async move {
1734        let result = future.await?;
1735        tx.send(result)
1736            .map_err(|_| anyhow!("unable to send result"))?;
1737        Ok(())
1738    }))
1739    .await?;
1740
1741    Ok(rx.await?)
1742}
1743
1744pub async fn run_once_with_reason<T: Send + 'static>(
1745    tt: Arc<dyn TurboTasksApi>,
1746    reason: impl InvalidationReason,
1747    future: impl Future<Output = Result<T>> + Send + 'static,
1748) -> Result<T> {
1749    let (tx, rx) = tokio::sync::oneshot::channel();
1750
1751    tt.run_once_with_reason(
1752        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1753        Box::pin(async move {
1754            let result = future.await?;
1755            tx.send(result)
1756                .map_err(|_| anyhow!("unable to send result"))?;
1757            Ok(())
1758        }),
1759    )
1760    .await?;
1761
1762    Ok(rx.await?)
1763}
1764
1765/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1766pub fn dynamic_call(
1767    func: &'static NativeFunction,
1768    this: Option<RawVc>,
1769    arg: Box<dyn MagicAny>,
1770    persistence: TaskPersistence,
1771) -> RawVc {
1772    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1773}
1774
1775/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1776pub fn trait_call(
1777    trait_method: &'static TraitMethod,
1778    this: RawVc,
1779    arg: Box<dyn MagicAny>,
1780    persistence: TaskPersistence,
1781) -> RawVc {
1782    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1783}
1784
1785pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1786    TURBO_TASKS.with(|arc| arc.clone())
1787}
1788
1789pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1790    TURBO_TASKS.with(Arc::downgrade)
1791}
1792
1793pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1794    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1795}
1796
1797pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1798    TURBO_TASKS.with(|arc| func(arc))
1799}
1800
1801pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1802    TURBO_TASKS.sync_scope(tt, f)
1803}
1804
1805pub fn turbo_tasks_future_scope<T>(
1806    tt: Arc<dyn TurboTasksApi>,
1807    f: impl Future<Output = T>,
1808) -> impl Future<Output = T> {
1809    TURBO_TASKS.scope(tt, f)
1810}
1811
1812pub fn with_turbo_tasks_for_testing<T>(
1813    tt: Arc<dyn TurboTasksApi>,
1814    current_task: TaskId,
1815    execution_id: ExecutionId,
1816    f: impl Future<Output = T>,
1817) -> impl Future<Output = T> {
1818    TURBO_TASKS.scope(
1819        tt,
1820        CURRENT_TASK_STATE.scope(
1821            Arc::new(RwLock::new(CurrentTaskState::new(
1822                current_task,
1823                execution_id,
1824                TaskPriority::initial(),
1825            ))),
1826            f,
1827        ),
1828    )
1829}
1830
1831/// Spawns the given future within the context of the current task.
1832///
1833/// Beware: this method is not safe to use in production code. It is only
1834/// intended for use in tests and for debugging purposes.
1835pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1836    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1837}
1838
1839pub fn current_task_for_testing() -> Option<TaskId> {
1840    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1841}
1842
1843/// Marks the current task as dirty when restored from filesystem cache.
1844pub fn mark_session_dependent() {
1845    with_turbo_tasks(|tt| {
1846        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1847    });
1848}
1849
1850/// Marks the current task as a root in the aggregation graph.  This means it starts with the
1851/// correct aggregation number instead of needing to recompute it after the fact.
1852pub fn mark_root() {
1853    with_turbo_tasks(|tt| {
1854        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1855    });
1856}
1857
1858/// Marks the current task as finished. This excludes it from waiting for
1859/// strongly consistency.
1860pub fn mark_finished() {
1861    with_turbo_tasks(|tt| {
1862        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1863    });
1864}
1865
1866/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1867/// serialization of the current task cells.
1868///
1869/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1870/// since State allocation implies interior mutability.
1871pub fn get_serialization_invalidator() -> SerializationInvalidator {
1872    CURRENT_TASK_STATE.with(|cell| {
1873        let CurrentTaskState {
1874            task_id,
1875            #[cfg(feature = "verify_determinism")]
1876            stateful,
1877            ..
1878        } = &mut *cell.write().unwrap();
1879        #[cfg(feature = "verify_determinism")]
1880        {
1881            *stateful = true;
1882        }
1883        let Some(task_id) = *task_id else {
1884            panic!(
1885                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1886                 task execution"
1887            );
1888        };
1889        SerializationInvalidator::new(task_id)
1890    })
1891}
1892
1893pub fn mark_invalidator() {
1894    CURRENT_TASK_STATE.with(|cell| {
1895        let CurrentTaskState {
1896            has_invalidator, ..
1897        } = &mut *cell.write().unwrap();
1898        *has_invalidator = true;
1899    })
1900}
1901
1902/// Marks the current task as stateful. This is used to indicate that the task
1903/// has interior mutability (e.g., via State or TransientState), which means
1904/// the task may produce different outputs even with the same inputs.
1905///
1906/// Only has an effect when the `verify_determinism` feature is enabled.
1907pub fn mark_stateful() {
1908    #[cfg(feature = "verify_determinism")]
1909    {
1910        CURRENT_TASK_STATE.with(|cell| {
1911            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1912            *stateful = true;
1913        })
1914    }
1915    // No-op when verify_determinism is not enabled
1916}
1917
1918pub fn prevent_gc() {
1919    // TODO implement garbage collection
1920}
1921
1922pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1923    with_turbo_tasks(|tt| {
1924        let raw_vc = collectible.node.node;
1925        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1926    })
1927}
1928
1929pub(crate) async fn read_task_output(
1930    this: &dyn TurboTasksApi,
1931    id: TaskId,
1932    options: ReadOutputOptions,
1933) -> Result<RawVc> {
1934    loop {
1935        match this.try_read_task_output(id, options)? {
1936            Ok(result) => return Ok(result),
1937            Err(listener) => listener.await,
1938        }
1939    }
1940}
1941
1942/// A reference to a task's cell with methods that allow updating the contents
1943/// of the cell.
1944///
1945/// Mutations should not outside of the task that that owns this cell. Doing so
1946/// is a logic error, and may lead to incorrect caching behavior.
1947#[derive(Clone, Copy)]
1948pub struct CurrentCellRef {
1949    current_task: TaskId,
1950    index: CellId,
1951    is_serializable_cell_content: bool,
1952}
1953
1954type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
1955
1956impl CurrentCellRef {
1957    /// Updates the cell if the given `functor` returns a value.
1958    fn conditional_update<T>(
1959        &self,
1960        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>)>,
1961    ) where
1962        T: VcValueType,
1963    {
1964        self.conditional_update_with_shared_reference(|old_shared_reference| {
1965            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
1966            let (new_value, updated_key_hashes) = functor(old_ref)?;
1967            Some((
1968                SharedReference::new(triomphe::Arc::new(new_value)),
1969                updated_key_hashes,
1970            ))
1971        })
1972    }
1973
1974    /// Updates the cell if the given `functor` returns a `SharedReference`.
1975    fn conditional_update_with_shared_reference(
1976        &self,
1977        functor: impl FnOnce(
1978            Option<&SharedReference>,
1979        ) -> Option<(SharedReference, Option<SmallVec<[u64; 2]>>)>,
1980    ) {
1981        let tt = turbo_tasks();
1982        let cell_content = tt
1983            .read_own_task_cell(
1984                self.current_task,
1985                self.index,
1986                ReadCellOptions {
1987                    // INVALIDATION: Reading our own cell must be untracked
1988                    tracking: ReadCellTracking::Untracked,
1989                    is_serializable_cell_content: self.is_serializable_cell_content,
1990                    final_read_hint: false,
1991                },
1992            )
1993            .ok();
1994        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1995        if let Some((update, updated_key_hashes)) = update {
1996            tt.update_own_task_cell(
1997                self.current_task,
1998                self.index,
1999                self.is_serializable_cell_content,
2000                CellContent(Some(update)),
2001                updated_key_hashes,
2002                VerificationMode::EqualityCheck,
2003            )
2004        }
2005    }
2006
2007    /// Replace the current cell's content with `new_value` if the current content is not equal by
2008    /// value with the existing content.
2009    ///
2010    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
2011    ///
2012    /// Take this example of a custom equality implementation on a transparent wrapper type:
2013    ///
2014    /// ```
2015    /// #[turbo_tasks::value(transparent, eq = "manual")]
2016    /// struct Wrapper(Vec<u32>);
2017    ///
2018    /// impl PartialEq for Wrapper {
2019    ///     fn eq(&self, other: Wrapper) {
2020    ///         // Example: order doesn't matter for equality
2021    ///         let (mut this, mut other) = (self.clone(), other.clone());
2022    ///         this.sort_unstable();
2023    ///         other.sort_unstable();
2024    ///         this == other
2025    ///     }
2026    /// }
2027    ///
2028    /// impl Eq for Wrapper {}
2029    /// ```
2030    ///
2031    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2032    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2033    ///
2034    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2035    /// just forwards to the inner value's [`PartialEq`].
2036    ///
2037    /// If you already have a `SharedReference`, consider calling
2038    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2039    /// object.
2040    pub fn compare_and_update<T>(&self, new_value: T)
2041    where
2042        T: PartialEq + VcValueType,
2043    {
2044        self.conditional_update(|old_value| {
2045            if let Some(old_value) = old_value
2046                && old_value == &new_value
2047            {
2048                return None;
2049            }
2050            Some((new_value, None))
2051        });
2052    }
2053
2054    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2055    /// equal by value with the existing content.
2056    ///
2057    /// If you already have a `SharedReference`, this is a faster version of
2058    /// [`CurrentCellRef::compare_and_update`].
2059    ///
2060    /// The value should be stored in [`SharedReference`] using the type `T`.
2061    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2062    where
2063        T: VcValueType + PartialEq,
2064    {
2065        self.conditional_update_with_shared_reference(|old_sr| {
2066            if let Some(old_sr) = old_sr {
2067                let old_value = extract_sr_value::<T>(old_sr);
2068                let new_value = extract_sr_value::<T>(&new_shared_reference);
2069                if old_value == new_value {
2070                    return None;
2071                }
2072            }
2073            Some((new_shared_reference, None))
2074        });
2075    }
2076
2077    /// See [`Self::compare_and_update`], but selectively update individual keys.
2078    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2079    where
2080        T: PartialEq + VcValueType,
2081        VcReadTarget<T>: KeyedEq,
2082        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2083    {
2084        self.conditional_update(|old_value| {
2085            let Some(old_value) = old_value else {
2086                return Some((new_value, None));
2087            };
2088            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2089            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2090            let updated_keys = old_value.different_keys(new_value_ref);
2091            if updated_keys.is_empty() {
2092                return None;
2093            }
2094            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2095            let updated_key_hashes = updated_keys
2096                .into_iter()
2097                .map(|key| FxBuildHasher.hash_one(key))
2098                .collect();
2099            Some((new_value, Some(updated_key_hashes)))
2100        });
2101    }
2102
2103    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2104    /// keys.
2105    pub fn keyed_compare_and_update_with_shared_reference<T>(
2106        &self,
2107        new_shared_reference: SharedReference,
2108    ) where
2109        T: VcValueType + PartialEq,
2110        VcReadTarget<T>: KeyedEq,
2111        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2112    {
2113        self.conditional_update_with_shared_reference(|old_sr| {
2114            let Some(old_sr) = old_sr else {
2115                return Some((new_shared_reference, None));
2116            };
2117            let old_value = extract_sr_value::<T>(old_sr);
2118            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2119            let new_value = extract_sr_value::<T>(&new_shared_reference);
2120            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2121            let updated_keys = old_value.different_keys(new_value);
2122            if updated_keys.is_empty() {
2123                return None;
2124            }
2125            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2126            let updated_key_hashes = updated_keys
2127                .into_iter()
2128                .map(|key| FxBuildHasher.hash_one(key))
2129                .collect();
2130            Some((new_shared_reference, Some(updated_key_hashes)))
2131        });
2132    }
2133
2134    /// Unconditionally updates the content of the cell.
2135    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2136    where
2137        T: VcValueType,
2138    {
2139        let tt = turbo_tasks();
2140        tt.update_own_task_cell(
2141            self.current_task,
2142            self.index,
2143            self.is_serializable_cell_content,
2144            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2145            None,
2146            verification_mode,
2147        )
2148    }
2149
2150    /// A faster version of [`Self::update`] if you already have a
2151    /// [`SharedReference`].
2152    ///
2153    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2154    /// by identity, no update is performed.
2155    ///
2156    /// The value should be stored in [`SharedReference`] using the type `T`.
2157    pub fn update_with_shared_reference(
2158        &self,
2159        shared_ref: SharedReference,
2160        verification_mode: VerificationMode,
2161    ) {
2162        let tt = turbo_tasks();
2163        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2164            let content = tt
2165                .read_own_task_cell(
2166                    self.current_task,
2167                    self.index,
2168                    ReadCellOptions {
2169                        // INVALIDATION: Reading our own cell must be untracked
2170                        tracking: ReadCellTracking::Untracked,
2171                        is_serializable_cell_content: self.is_serializable_cell_content,
2172                        final_read_hint: false,
2173                    },
2174                )
2175                .ok();
2176            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2177                // pointer equality (not value equality)
2178                shared_ref_exp != shared_ref
2179            } else {
2180                true
2181            }
2182        } else {
2183            true
2184        };
2185        if update {
2186            tt.update_own_task_cell(
2187                self.current_task,
2188                self.index,
2189                self.is_serializable_cell_content,
2190                CellContent(Some(shared_ref)),
2191                None,
2192                verification_mode,
2193            )
2194        }
2195    }
2196}
2197
2198impl From<CurrentCellRef> for RawVc {
2199    fn from(cell: CurrentCellRef) -> Self {
2200        RawVc::TaskCell(cell.current_task, cell.index)
2201    }
2202}
2203
2204fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2205    sr.0.downcast_ref::<T>()
2206        .expect("cannot update SharedReference of different type")
2207}
2208
2209pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2210    find_cell_by_id(T::get_value_type_id(), T::has_serialization())
2211}
2212
2213pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
2214    CURRENT_TASK_STATE.with(|ts| {
2215        let current_task = current_task("celling turbo_tasks values");
2216        let mut ts = ts.write().unwrap();
2217        let map = ts.cell_counters.as_mut().unwrap();
2218        let current_index = map.entry(ty).or_default();
2219        let index = *current_index;
2220        *current_index += 1;
2221        CurrentCellRef {
2222            current_task,
2223            index: CellId { type_id: ty, index },
2224            is_serializable_cell_content,
2225        }
2226    })
2227}
2228
2229pub(crate) async fn read_local_output(
2230    this: &dyn TurboTasksApi,
2231    execution_id: ExecutionId,
2232    local_task_id: LocalTaskId,
2233) -> Result<RawVc> {
2234    loop {
2235        match this.try_read_local_output(execution_id, local_task_id)? {
2236            Ok(raw_vc) => return Ok(raw_vc),
2237            Err(event_listener) => event_listener.await,
2238        }
2239    }
2240}