Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    pin::Pin,
8    sync::{
9        Arc, Mutex, RwLock, Weak,
10        atomic::{AtomicBool, AtomicUsize, Ordering},
11    },
12    time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
26
27use crate::{
28    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
29    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
30    VcValueTrait, VcValueType,
31    backend::{
32        Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
33        TurboTasksExecutionError, TypedCellContent, VerificationMode,
34    },
35    capture_future::CaptureFuture,
36    dyn_task_inputs::StackDynTaskInputs,
37    event::{Event, EventListener},
38    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
39    id_factory::IdFactoryWithReuse,
40    keyed::KeyedEq,
41    macro_helpers::NativeFunction,
42    message_queue::{CompilationEvent, CompilationEventQueue},
43    priority_runner::{Executor, JoinHandle, PriorityRunner},
44    raw_vc::{CellId, RawVc},
45    registry,
46    serialization_invalidation::SerializationInvalidator,
47    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
48    task_statistics::TaskStatisticsApi,
49    trace::TraceRawVcs,
50    util::{IdFactory, StaticOrArc},
51};
52
53/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
54/// tasks from function calls.
55pub trait TurboTasksCallApi: Sync + Send {
56    /// Calls a native function with arguments. Resolves arguments when needed
57    /// with a wrapper task.
58    fn dynamic_call(
59        &self,
60        native_fn: &'static NativeFunction,
61        this: Option<RawVc>,
62        arg: &mut dyn StackDynTaskInputs,
63        persistence: TaskPersistence,
64    ) -> RawVc;
65    /// Call a native function with arguments.
66    /// All inputs must be resolved.
67    fn native_call(
68        &self,
69        native_fn: &'static NativeFunction,
70        this: Option<RawVc>,
71        arg: &mut dyn StackDynTaskInputs,
72        persistence: TaskPersistence,
73    ) -> RawVc;
74    /// Calls a trait method with arguments. First input is the `self` object.
75    /// Uses a wrapper task to resolve
76    fn trait_call(
77        &self,
78        trait_method: &'static TraitMethod,
79        this: RawVc,
80        arg: &mut dyn StackDynTaskInputs,
81        persistence: TaskPersistence,
82    ) -> RawVc;
83
84    fn run(
85        &self,
86        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
87    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
88    fn run_once(
89        &self,
90        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
91    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
92    fn run_once_with_reason(
93        &self,
94        reason: StaticOrArc<dyn InvalidationReason>,
95        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
96    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
97    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
98
99    /// Sends a compilation event to subscribers.
100    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
101
102    /// Returns a human-readable name for the given task.
103    fn get_task_name(&self, task: TaskId) -> String;
104}
105
106/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
107/// context. Returned by the [`turbo_tasks`] helper function.
108///
109/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
110/// parameter.
111pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
112    fn invalidate(&self, task: TaskId);
113    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
114
115    fn invalidate_serialization(&self, task: TaskId);
116
117    fn try_read_task_output(
118        &self,
119        task: TaskId,
120        options: ReadOutputOptions,
121    ) -> Result<Result<RawVc, EventListener>>;
122
123    fn try_read_task_cell(
124        &self,
125        task: TaskId,
126        index: CellId,
127        options: ReadCellOptions,
128    ) -> Result<Result<TypedCellContent, EventListener>>;
129
130    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
131    /// task points to.
132    ///
133    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
134    /// recursively or in a loop.
135    ///
136    /// This does not accept a consistency argument, as you cannot control consistency of a read of
137    /// an operation owned by your own task. Strongly consistent reads are only allowed on
138    /// [`OperationVc`]s, which should never be local tasks.
139    ///
140    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
141    /// task to depend on itself.
142    ///
143    /// [`OperationVc`]: crate::OperationVc
144    fn try_read_local_output(
145        &self,
146        execution_id: ExecutionId,
147        local_task_id: LocalTaskId,
148    ) -> Result<Result<RawVc, EventListener>>;
149
150    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
151
152    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
153    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
154    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
155
156    /// INVALIDATION: Be careful with this, it will not track dependencies, so
157    /// using it could break cache invalidation.
158    fn try_read_own_task_cell(
159        &self,
160        current_task: TaskId,
161        index: CellId,
162    ) -> Result<TypedCellContent>;
163
164    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
165    fn update_own_task_cell(
166        &self,
167        task: TaskId,
168        index: CellId,
169        content: CellContent,
170        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
171        content_hash: Option<CellHash>,
172        verification_mode: VerificationMode,
173    );
174    fn mark_own_task_as_finished(&self, task: TaskId);
175    fn mark_own_task_as_session_dependent(&self, task: TaskId);
176
177    fn connect_task(&self, task: TaskId);
178
179    /// Wraps the given future in the current task.
180    ///
181    /// Beware: this method is not safe to use in production code. It is only intended for use in
182    /// tests and for debugging purposes.
183    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
184
185    fn task_statistics(&self) -> &TaskStatisticsApi;
186
187    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
188
189    fn subscribe_to_compilation_events(
190        &self,
191        event_types: Option<Vec<String>>,
192    ) -> Receiver<Arc<dyn CompilationEvent>>;
193
194    // Returns true if TurboTasks is configured to track dependencies.
195    fn is_tracking_dependencies(&self) -> bool;
196}
197
198/// A wrapper around a value that is unused.
199pub struct Unused<T> {
200    inner: T,
201}
202
203impl<T> Unused<T> {
204    /// Creates a new unused value.
205    ///
206    /// # Safety
207    ///
208    /// The wrapped value must not be used.
209    pub unsafe fn new_unchecked(inner: T) -> Self {
210        Self { inner }
211    }
212
213    /// Get the inner value, without consuming the `Unused` wrapper.
214    ///
215    /// # Safety
216    ///
217    /// The user need to make sure that the value stays unused.
218    pub unsafe fn get_unchecked(&self) -> &T {
219        &self.inner
220    }
221
222    /// Unwraps the value, consuming the `Unused` wrapper.
223    pub fn into(self) -> T {
224        self.inner
225    }
226}
227
228/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
229pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
230    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
231
232    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
233    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
234    /// # Safety
235    ///
236    /// The caller must ensure that the task id is not used anymore.
237    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
238    /// # Safety
239    ///
240    /// The caller must ensure that the task id is not used anymore.
241    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
242
243    /// Schedule a task for execution.
244    fn schedule(&self, task: TaskId, priority: TaskPriority);
245
246    /// Returns the priority of the current task.
247    fn get_current_task_priority(&self) -> TaskPriority;
248
249    /// Schedule a foreground backend job for execution.
250    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
251
252    /// Schedule a background backend job for execution.
253    ///
254    /// Background jobs are not counted towards activeness of the system. The system is considered
255    /// idle even with active background jobs.
256    fn schedule_backend_background_job(&self, job: B::BackendJob);
257
258    /// Returns the duration from the start of the program to the given instant.
259    fn program_duration_until(&self, instant: Instant) -> Duration;
260
261    /// Returns true if the system is idle.
262    fn is_idle(&self) -> bool;
263
264    /// Returns a reference to the backend.
265    fn backend(&self) -> &B;
266}
267
268#[allow(clippy::manual_non_exhaustive)]
269pub struct UpdateInfo {
270    pub duration: Duration,
271    pub tasks: usize,
272    pub reasons: InvalidationReasonSet,
273    #[allow(dead_code)]
274    placeholder_for_future_fields: (),
275}
276
277#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
278pub enum TaskPersistence {
279    /// Tasks that may be persisted across sessions using serialization.
280    Persistent,
281
282    /// Tasks that will be persisted in memory for the life of this session, but won't persist
283    /// between sessions.
284    ///
285    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
286    /// type [`TransientValue`][crate::value::TransientValue] or
287    /// [`TransientInstance`][crate::value::TransientInstance].
288    Transient,
289}
290
291impl Display for TaskPersistence {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        match self {
294            TaskPersistence::Persistent => write!(f, "persistent"),
295            TaskPersistence::Transient => write!(f, "transient"),
296        }
297    }
298}
299
300#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
301pub enum ReadConsistency {
302    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
303    /// may later trigger re-computation.
304    #[default]
305    Eventual,
306    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
307    /// the cost of slower reads.
308    ///
309    /// Top-level code that returns data to the user should use strongly consistent reads.
310    Strong,
311}
312
313#[derive(Clone, Copy, Debug, Eq, PartialEq)]
314pub enum ReadCellTracking {
315    /// Reads are tracked as dependencies of the current task.
316    Tracked {
317        /// The key used for the dependency
318        key: Option<u64>,
319    },
320    /// The read is only tracked when there is an error, otherwise it is untracked.
321    ///
322    /// INVALIDATION: Be careful with this, it will not track dependencies, so
323    /// using it could break cache invalidation.
324    TrackOnlyError,
325    /// The read is not tracked as a dependency of the current task.
326    ///
327    /// INVALIDATION: Be careful with this, it will not track dependencies, so
328    /// using it could break cache invalidation.
329    Untracked,
330}
331
332impl ReadCellTracking {
333    pub fn should_track(&self, is_err: bool) -> bool {
334        match self {
335            ReadCellTracking::Tracked { .. } => true,
336            ReadCellTracking::TrackOnlyError => is_err,
337            ReadCellTracking::Untracked => false,
338        }
339    }
340
341    pub fn key(&self) -> Option<u64> {
342        match self {
343            ReadCellTracking::Tracked { key } => *key,
344            ReadCellTracking::TrackOnlyError => None,
345            ReadCellTracking::Untracked => None,
346        }
347    }
348}
349
350impl Default for ReadCellTracking {
351    fn default() -> Self {
352        ReadCellTracking::Tracked { key: None }
353    }
354}
355
356impl Display for ReadCellTracking {
357    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358        match self {
359            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
360            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
361            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
362            ReadCellTracking::Untracked => write!(f, "untracked"),
363        }
364    }
365}
366
367#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
368pub enum ReadTracking {
369    /// Reads are tracked as dependencies of the current task.
370    #[default]
371    Tracked,
372    /// The read is only tracked when there is an error, otherwise it is untracked.
373    ///
374    /// INVALIDATION: Be careful with this, it will not track dependencies, so
375    /// using it could break cache invalidation.
376    TrackOnlyError,
377    /// The read is not tracked as a dependency of the current task.
378    ///
379    /// INVALIDATION: Be careful with this, it will not track dependencies, so
380    /// using it could break cache invalidation.
381    Untracked,
382}
383
384impl ReadTracking {
385    pub fn should_track(&self, is_err: bool) -> bool {
386        match self {
387            ReadTracking::Tracked => true,
388            ReadTracking::TrackOnlyError => is_err,
389            ReadTracking::Untracked => false,
390        }
391    }
392}
393
394impl Display for ReadTracking {
395    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396        match self {
397            ReadTracking::Tracked => write!(f, "tracked"),
398            ReadTracking::TrackOnlyError => write!(f, "track only error"),
399            ReadTracking::Untracked => write!(f, "untracked"),
400        }
401    }
402}
403
404#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
405pub enum TaskPriority {
406    #[default]
407    Initial,
408    Invalidation {
409        priority: Reverse<u32>,
410    },
411}
412
413impl TaskPriority {
414    pub fn invalidation(priority: u32) -> Self {
415        Self::Invalidation {
416            priority: Reverse(priority),
417        }
418    }
419
420    pub fn initial() -> Self {
421        Self::Initial
422    }
423
424    pub fn leaf() -> Self {
425        Self::Invalidation {
426            priority: Reverse(0),
427        }
428    }
429
430    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
431        match self {
432            TaskPriority::Initial => parent_priority,
433            TaskPriority::Invalidation { priority } => {
434                if let TaskPriority::Invalidation {
435                    priority: parent_priority,
436                } = parent_priority
437                    && priority.0 < parent_priority.0
438                {
439                    Self::Invalidation {
440                        priority: Reverse(parent_priority.0.saturating_add(1)),
441                    }
442                } else {
443                    *self
444                }
445            }
446        }
447    }
448}
449
450impl Display for TaskPriority {
451    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452        match self {
453            TaskPriority::Initial => write!(f, "initial"),
454            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
455        }
456    }
457}
458
459enum ScheduledTask {
460    Task {
461        task_id: TaskId,
462        span: Span,
463    },
464    LocalTask {
465        ty: LocalTaskSpec,
466        persistence: TaskPersistence,
467        local_task_id: LocalTaskId,
468        global_task_state: Arc<RwLock<CurrentTaskState>>,
469        span: Span,
470    },
471}
472
473pub struct TurboTasks<B: Backend + 'static> {
474    this: Weak<Self>,
475    backend: B,
476    task_id_factory: IdFactoryWithReuse<TaskId>,
477    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
478    execution_id_factory: IdFactory<ExecutionId>,
479    stopped: AtomicBool,
480    currently_scheduled_foreground_jobs: AtomicUsize,
481    currently_scheduled_background_jobs: AtomicUsize,
482    scheduled_tasks: AtomicUsize,
483    priority_runner:
484        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
485    start: Mutex<Option<Instant>>,
486    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
487    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
488    event_foreground_start: Event,
489    /// Event that is triggered when all foreground jobs are done
490    /// (currently_scheduled_foreground_jobs becomes zero)
491    event_foreground_done: Event,
492    /// Event that is triggered when all background jobs are done
493    event_background_done: Event,
494    program_start: Instant,
495    compilation_events: CompilationEventQueue,
496}
497
498type LocalTaskTracker = Option<
499    FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
500>;
501
502/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
503/// all share the same non-local task state.
504///
505/// A non-local task is one that:
506///
507/// - Has a unique task id.
508/// - Is potentially cached.
509/// - The backend is aware of.
510struct CurrentTaskState {
511    task_id: Option<TaskId>,
512    execution_id: ExecutionId,
513    priority: TaskPriority,
514
515    /// True if the current task has state in cells (interior mutability).
516    /// Only tracked when verify_determinism feature is enabled.
517    #[cfg(feature = "verify_determinism")]
518    stateful: bool,
519
520    /// True if the current task uses an external invalidator
521    has_invalidator: bool,
522
523    /// True if we're in a top-level task (e.g. `.run_once(...)` or `.run(...)`).
524    /// Eventually consistent reads are not allowed in top-level tasks.
525    in_top_level_task: bool,
526
527    /// Tracks how many cells of each type has been allocated so far during this task execution.
528    /// When a task is re-executed, the cell count may not match the existing cell vec length.
529    ///
530    /// This is taken (and becomes `None`) during teardown of a task.
531    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
532
533    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
534    local_tasks: Vec<LocalTask>,
535
536    /// Tracks currently running local tasks, and defers cleanup of the global task until those
537    /// complete. Also used by `spawn_detached_for_testing`.
538    local_task_tracker: LocalTaskTracker,
539}
540
541impl CurrentTaskState {
542    fn new(
543        task_id: TaskId,
544        execution_id: ExecutionId,
545        priority: TaskPriority,
546        in_top_level_task: bool,
547    ) -> Self {
548        Self {
549            task_id: Some(task_id),
550            execution_id,
551            priority,
552            #[cfg(feature = "verify_determinism")]
553            stateful: false,
554            has_invalidator: false,
555            in_top_level_task,
556            cell_counters: Some(AutoMap::default()),
557            local_tasks: Vec::new(),
558            local_task_tracker: None,
559        }
560    }
561
562    fn new_temporary(
563        execution_id: ExecutionId,
564        priority: TaskPriority,
565        in_top_level_task: bool,
566    ) -> Self {
567        Self {
568            task_id: None,
569            execution_id,
570            priority,
571            #[cfg(feature = "verify_determinism")]
572            stateful: false,
573            has_invalidator: false,
574            in_top_level_task,
575            cell_counters: None,
576            local_tasks: Vec::new(),
577            local_task_tracker: None,
578        }
579    }
580
581    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
582        if self.execution_id != expected_execution_id {
583            panic!(
584                "Local tasks can only be scheduled/awaited within the same execution of the \
585                 parent task that created them"
586            );
587        }
588    }
589
590    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
591        self.local_tasks.push(local_task);
592        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
593        if cfg!(debug_assertions) {
594            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
595        } else {
596            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
597        }
598    }
599
600    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
601        // local task ids are one-indexed (they use NonZeroU32)
602        &self.local_tasks[(*local_task_id as usize) - 1]
603    }
604
605    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
606        &mut self.local_tasks[(*local_task_id as usize) - 1]
607    }
608}
609
610// TODO implement our own thread pool and make these thread locals instead
611task_local! {
612    /// The current TurboTasks instance
613    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
614
615    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
616
617    /// Temporarily suppresses the eventual consistency check in top-level tasks.
618    /// This is used by strongly consistent reads to allow them to succeed in top-level tasks.
619    /// This is NOT shared across local tasks (unlike CURRENT_TASK_STATE), so it's safe
620    /// to set/unset without race conditions.
621    pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
622}
623
624impl<B: Backend + 'static> TurboTasks<B> {
625    // TODO better lifetime management for turbo tasks
626    // consider using unsafe for the task_local turbo tasks
627    // that should be safe as long tasks can't outlife turbo task
628    // so we probably want to make sure that all tasks are joined
629    // when trying to drop turbo tasks
630    pub fn new(backend: B) -> Arc<Self> {
631        let task_id_factory = IdFactoryWithReuse::new(
632            TaskId::MIN,
633            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
634        );
635        let transient_task_id_factory =
636            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
637        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
638        let this = Arc::new_cyclic(|this| Self {
639            this: this.clone(),
640            backend,
641            task_id_factory,
642            transient_task_id_factory,
643            execution_id_factory,
644            stopped: AtomicBool::new(false),
645            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
646            currently_scheduled_background_jobs: AtomicUsize::new(0),
647            scheduled_tasks: AtomicUsize::new(0),
648            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
649            start: Default::default(),
650            aggregated_update: Default::default(),
651            event_foreground_done: Event::new(|| {
652                || "TurboTasks::event_foreground_done".to_string()
653            }),
654            event_foreground_start: Event::new(|| {
655                || "TurboTasks::event_foreground_start".to_string()
656            }),
657            event_background_done: Event::new(|| {
658                || "TurboTasks::event_background_done".to_string()
659            }),
660            program_start: Instant::now(),
661            compilation_events: CompilationEventQueue::default(),
662        });
663        this.backend.startup(&*this);
664        this
665    }
666
667    pub fn pin(&self) -> Arc<Self> {
668        self.this.upgrade().unwrap()
669    }
670
671    /// Creates a new root task
672    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
673    where
674        T: ?Sized,
675        F: Fn() -> Fut + Send + Sync + Clone + 'static,
676        Fut: Future<Output = Result<Vc<T>>> + Send,
677    {
678        let id = self.backend.create_transient_task(
679            TransientTaskType::Root(Box::new(move || {
680                let functor = functor.clone();
681                Box::pin(async move {
682                    mark_top_level_task();
683                    let raw_vc = functor().await?.node;
684                    raw_vc.to_non_local().await
685                })
686            })),
687            self,
688        );
689        self.schedule(id, TaskPriority::initial());
690        id
691    }
692
693    pub fn dispose_root_task(&self, task_id: TaskId) {
694        self.backend.dispose_root_task(task_id, self);
695    }
696
697    // TODO make sure that all dependencies settle before reading them
698    /// Creates a new root task, that is only executed once.
699    /// Dependencies will not invalidate the task.
700    #[track_caller]
701    fn spawn_once_task<T, Fut>(&self, future: Fut)
702    where
703        T: ?Sized,
704        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
705    {
706        let id = self.backend.create_transient_task(
707            TransientTaskType::Once(Box::pin(async move {
708                mark_top_level_task();
709                let raw_vc = future.await?.node;
710                raw_vc.to_non_local().await
711            })),
712            self,
713        );
714        self.schedule(id, TaskPriority::initial());
715    }
716
717    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
718        &self,
719        future: impl Future<Output = Result<T>> + Send + 'static,
720    ) -> Result<T> {
721        let (tx, rx) = tokio::sync::oneshot::channel();
722        self.spawn_once_task(async move {
723            mark_top_level_task();
724            let result = future.await;
725            tx.send(result)
726                .map_err(|_| anyhow!("unable to send result"))?;
727            Ok(Completion::new())
728        });
729
730        rx.await?
731    }
732
733    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
734    pub async fn run<T: TraceRawVcs + Send + 'static>(
735        &self,
736        future: impl Future<Output = Result<T>> + Send + 'static,
737    ) -> Result<T, TurboTasksExecutionError> {
738        self.begin_foreground_job();
739        // it's okay for execution ids to overflow and wrap, they're just used for an assert
740        let execution_id = self.execution_id_factory.wrapping_get();
741        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
742            execution_id,
743            TaskPriority::initial(),
744            true, // in_top_level_task
745        )));
746
747        let result = TURBO_TASKS
748            .scope(
749                self.pin(),
750                CURRENT_TASK_STATE.scope(current_task_state, async {
751                    let result = CaptureFuture::new(future).await;
752
753                    // wait for all spawned local tasks using `local` to finish
754                    wait_for_local_tasks().await;
755
756                    match result {
757                        Ok(Ok(raw_vc)) => Ok(raw_vc),
758                        Ok(Err(err)) => Err(err.into()),
759                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
760                    }
761                }),
762            )
763            .await;
764        self.finish_foreground_job();
765        result
766    }
767
768    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
769        let this = self.pin();
770        tokio::spawn(async move {
771            this.pin()
772                .run_once(async move {
773                    this.finish_foreground_job();
774                    future.await;
775                    this.begin_foreground_job();
776                    Ok(())
777                })
778                .await
779                .unwrap()
780        });
781    }
782
783    pub(crate) fn native_call(
784        &self,
785        native_fn: &'static NativeFunction,
786        this: Option<RawVc>,
787        arg: &mut dyn StackDynTaskInputs,
788        persistence: TaskPersistence,
789    ) -> RawVc {
790        RawVc::TaskOutput(self.backend.get_or_create_task(
791            native_fn,
792            this,
793            arg,
794            current_task_if_available("turbo_function calls"),
795            persistence,
796            self,
797        ))
798    }
799
800    pub fn dynamic_call(
801        &self,
802        native_fn: &'static NativeFunction,
803        this: Option<RawVc>,
804        arg: &mut dyn StackDynTaskInputs,
805        persistence: TaskPersistence,
806    ) -> RawVc {
807        if this.is_none_or(|this| this.is_resolved())
808            && native_fn.arg_meta.is_resolved(arg.as_ref())
809        {
810            return self.native_call(native_fn, this, arg, persistence);
811        }
812        // Need async resolution — must move the arg to the heap now
813        let arg = arg.take_box();
814        let task_type = LocalTaskSpec {
815            task_type: LocalTaskType::ResolveNative { native_fn },
816            this,
817            arg,
818        };
819        self.schedule_local_task(task_type, persistence)
820    }
821
822    pub fn trait_call(
823        &self,
824        trait_method: &'static TraitMethod,
825        this: RawVc,
826        arg: &mut dyn StackDynTaskInputs,
827        persistence: TaskPersistence,
828    ) -> RawVc {
829        // avoid creating a wrapper task if self is already resolved
830        // for resolved cells we already know the value type so we can lookup the
831        // function
832        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
833            match registry::get_value_type(type_id).get_trait_method(trait_method) {
834                Some(native_fn) => {
835                    if let Some(filter) = native_fn.arg_meta.filter_owned {
836                        let mut arg = (filter)(arg);
837                        return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
838                    } else {
839                        return self.dynamic_call(native_fn, Some(this), arg, persistence);
840                    }
841                }
842                None => {
843                    // We are destined to fail at this point, but we just retry resolution in the
844                    // local task since we cannot report an error from here.
845                    // TODO: A panic seems appropriate since the immediate caller is to blame
846                }
847            }
848        }
849
850        // create a wrapper task to resolve all inputs
851        let task_type = LocalTaskSpec {
852            task_type: LocalTaskType::ResolveTrait { trait_method },
853            this: Some(this),
854            arg: arg.take_box(),
855        };
856
857        self.schedule_local_task(task_type, persistence)
858    }
859
860    #[track_caller]
861    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
862        self.begin_foreground_job();
863        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
864
865        self.priority_runner.schedule(
866            &self.pin(),
867            ScheduledTask::Task {
868                task_id,
869                span: Span::current(),
870            },
871            priority,
872        );
873    }
874
875    fn schedule_local_task(
876        &self,
877        ty: LocalTaskSpec,
878        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
879        persistence: TaskPersistence,
880    ) -> RawVc {
881        let task_type = ty.task_type;
882        let (global_task_state, execution_id, priority, local_task_id) =
883            CURRENT_TASK_STATE.with(|gts| {
884                let mut gts_write = gts.write().unwrap();
885                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
886                    done_event: Event::new(move || {
887                        move || format!("LocalTask({task_type})::done_event")
888                    }),
889                });
890                (
891                    Arc::clone(gts),
892                    gts_write.execution_id,
893                    gts_write.priority,
894                    local_task_id,
895                )
896            });
897
898        let future = self.priority_runner.schedule_with_join_handle(
899            &self.pin(),
900            ScheduledTask::LocalTask {
901                ty,
902                persistence,
903                local_task_id,
904                global_task_state: global_task_state.clone(),
905                span: Span::current(),
906            },
907            priority,
908        );
909        global_task_state
910            .write()
911            .unwrap()
912            .local_task_tracker
913            .get_or_insert_default()
914            .push(Either::Left(future));
915
916        RawVc::LocalOutput(execution_id, local_task_id, persistence)
917    }
918
919    fn begin_foreground_job(&self) {
920        if self
921            .currently_scheduled_foreground_jobs
922            .fetch_add(1, Ordering::AcqRel)
923            == 0
924        {
925            *self.start.lock().unwrap() = Some(Instant::now());
926            self.event_foreground_start.notify(usize::MAX);
927            self.backend.idle_end(self);
928        }
929    }
930
931    fn finish_foreground_job(&self) {
932        if self
933            .currently_scheduled_foreground_jobs
934            .fetch_sub(1, Ordering::AcqRel)
935            == 1
936        {
937            self.backend.idle_start(self);
938            // That's not super race-condition-safe, but it's only for
939            // statistical reasons
940            let total = self.scheduled_tasks.load(Ordering::Acquire);
941            self.scheduled_tasks.store(0, Ordering::Release);
942            if let Some(start) = *self.start.lock().unwrap() {
943                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
944                if let Some(update) = update.as_mut() {
945                    update.0 += start.elapsed();
946                    update.1 += total;
947                } else {
948                    *update = Some((start.elapsed(), total));
949                }
950            }
951            self.event_foreground_done.notify(usize::MAX);
952        }
953    }
954
955    fn begin_background_job(&self) {
956        self.currently_scheduled_background_jobs
957            .fetch_add(1, Ordering::Relaxed);
958    }
959
960    fn finish_background_job(&self) {
961        if self
962            .currently_scheduled_background_jobs
963            .fetch_sub(1, Ordering::Relaxed)
964            == 1
965        {
966            self.event_background_done.notify(usize::MAX);
967        }
968    }
969
970    pub fn get_in_progress_count(&self) -> usize {
971        self.currently_scheduled_foreground_jobs
972            .load(Ordering::Acquire)
973    }
974
975    /// Waits for the given task to finish executing. This works by performing an untracked read,
976    /// and discarding the value of the task output.
977    ///
978    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
979    /// before all dependencies have completely settled.
980    ///
981    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
982    /// to fully settle before returning.
983    ///
984    /// As this function is typically called in top-level code that waits for results to be ready
985    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
986    pub async fn wait_task_completion(
987        &self,
988        id: TaskId,
989        consistency: ReadConsistency,
990    ) -> Result<()> {
991        read_task_output(
992            self,
993            id,
994            ReadOutputOptions {
995                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
996                tracking: ReadTracking::Untracked,
997                consistency,
998            },
999        )
1000        .await?;
1001        Ok(())
1002    }
1003
1004    /// Returns [UpdateInfo] with all updates aggregated over a given duration
1005    /// (`aggregation`). Will wait until an update happens.
1006    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
1007        self.aggregated_update_info(aggregation, Duration::MAX)
1008            .await
1009            .unwrap()
1010    }
1011
1012    /// Returns [UpdateInfo] with all updates aggregated over a given duration
1013    /// (`aggregation`). Will only return None when the timeout is reached while
1014    /// waiting for the first update.
1015    pub async fn aggregated_update_info(
1016        &self,
1017        aggregation: Duration,
1018        timeout: Duration,
1019    ) -> Option<UpdateInfo> {
1020        let listener = self
1021            .event_foreground_done
1022            .listen_with_note(|| || "wait for update info".to_string());
1023        let wait_for_finish = {
1024            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1025            if aggregation.is_zero() {
1026                if let Some((duration, tasks)) = update.take() {
1027                    return Some(UpdateInfo {
1028                        duration,
1029                        tasks,
1030                        reasons: take(reason_set),
1031                        placeholder_for_future_fields: (),
1032                    });
1033                } else {
1034                    true
1035                }
1036            } else {
1037                update.is_none()
1038            }
1039        };
1040        if wait_for_finish {
1041            if timeout == Duration::MAX {
1042                // wait for finish
1043                listener.await;
1044            } else {
1045                // wait for start, then wait for finish or timeout
1046                let start_listener = self
1047                    .event_foreground_start
1048                    .listen_with_note(|| || "wait for update info".to_string());
1049                if self
1050                    .currently_scheduled_foreground_jobs
1051                    .load(Ordering::Acquire)
1052                    == 0
1053                {
1054                    start_listener.await;
1055                } else {
1056                    drop(start_listener);
1057                }
1058                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1059                    // Timeout
1060                    return None;
1061                }
1062            }
1063        }
1064        if !aggregation.is_zero() {
1065            loop {
1066                select! {
1067                    () = tokio::time::sleep(aggregation) => {
1068                        break;
1069                    }
1070                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1071                        // Resets the sleep
1072                    }
1073                }
1074            }
1075        }
1076        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1077        if let Some((duration, tasks)) = update.take() {
1078            Some(UpdateInfo {
1079                duration,
1080                tasks,
1081                reasons: take(reason_set),
1082                placeholder_for_future_fields: (),
1083            })
1084        } else {
1085            panic!("aggregated_update_info must not called concurrently")
1086        }
1087    }
1088
1089    pub async fn wait_background_done(&self) {
1090        let listener = self.event_background_done.listen();
1091        if self
1092            .currently_scheduled_background_jobs
1093            .load(Ordering::Acquire)
1094            != 0
1095        {
1096            listener.await;
1097        }
1098    }
1099
1100    pub async fn stop_and_wait(&self) {
1101        turbo_tasks_future_scope(self.pin(), async move {
1102            self.backend.stopping(self);
1103            self.stopped.store(true, Ordering::Release);
1104            {
1105                let listener = self
1106                    .event_foreground_done
1107                    .listen_with_note(|| || "wait for stop".to_string());
1108                if self
1109                    .currently_scheduled_foreground_jobs
1110                    .load(Ordering::Acquire)
1111                    != 0
1112                {
1113                    listener.await;
1114                }
1115            }
1116            {
1117                let listener = self.event_background_done.listen();
1118                if self
1119                    .currently_scheduled_background_jobs
1120                    .load(Ordering::Acquire)
1121                    != 0
1122                {
1123                    listener.await;
1124                }
1125            }
1126            self.backend.stop(self);
1127        })
1128        .await;
1129    }
1130
1131    #[track_caller]
1132    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1133    where
1134        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1135        T::CallOnceFuture: Send,
1136    {
1137        let mut this = self.pin();
1138        this.begin_foreground_job();
1139        tokio::spawn(
1140            TURBO_TASKS
1141                .scope(this.clone(), async move {
1142                    if !this.stopped.load(Ordering::Acquire) {
1143                        this = func(this.clone()).await;
1144                    }
1145                    this.finish_foreground_job();
1146                })
1147                .in_current_span(),
1148        );
1149    }
1150
1151    #[track_caller]
1152    pub(crate) fn schedule_background_job<T>(&self, func: T)
1153    where
1154        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1155        T::CallOnceFuture: Send,
1156    {
1157        let mut this = self.pin();
1158        self.begin_background_job();
1159        tokio::spawn(
1160            TURBO_TASKS
1161                .scope(this.clone(), async move {
1162                    if !this.stopped.load(Ordering::Acquire) {
1163                        this = func(this).await;
1164                    }
1165                    this.finish_background_job();
1166                })
1167                .in_current_span(),
1168        );
1169    }
1170
1171    fn finish_current_task_state(&self) -> FinishedTaskState {
1172        CURRENT_TASK_STATE.with(|cell| {
1173            let current_task_state = &*cell.write().unwrap();
1174            FinishedTaskState {
1175                #[cfg(feature = "verify_determinism")]
1176                stateful: current_task_state.stateful,
1177                has_invalidator: current_task_state.has_invalidator,
1178            }
1179        })
1180    }
1181
1182    pub fn backend(&self) -> &B {
1183        &self.backend
1184    }
1185}
1186
1187struct TurboTasksExecutor;
1188
1189impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1190    type Future = impl Future<Output = ()> + Send + 'static;
1191
1192    fn execute(
1193        &self,
1194        this: &Arc<TurboTasks<B>>,
1195        scheduled_task: ScheduledTask,
1196        priority: TaskPriority,
1197    ) -> Self::Future {
1198        match scheduled_task {
1199            ScheduledTask::Task { task_id, span } => {
1200                let this2 = this.clone();
1201                let this = this.clone();
1202                let future = async move {
1203                    let mut schedule_again = true;
1204                    while schedule_again {
1205                        // it's okay for execution ids to overflow and wrap, they're just used for
1206                        // an assert
1207                        let execution_id = this.execution_id_factory.wrapping_get();
1208                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1209                            task_id,
1210                            execution_id,
1211                            priority,
1212                            false, // in_top_level_task
1213                        )));
1214                        let single_execution_future = async {
1215                            if this.stopped.load(Ordering::Acquire) {
1216                                this.backend.task_execution_canceled(task_id, &*this);
1217                                return false;
1218                            }
1219
1220                            let Some(TaskExecutionSpec { future, span }) = this
1221                                .backend
1222                                .try_start_task_execution(task_id, priority, &*this)
1223                            else {
1224                                return false;
1225                            };
1226
1227                            async {
1228                                let result = CaptureFuture::new(future).await;
1229
1230                                // wait for all spawned local tasks using `local` to finish
1231                                wait_for_local_tasks().await;
1232
1233                                let result = match result {
1234                                    Ok(Ok(raw_vc)) => Ok(raw_vc),
1235                                    Ok(Err(err)) => Err(err.into()),
1236                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1237                                };
1238
1239                                let finished_state = this.finish_current_task_state();
1240                                let cell_counters = CURRENT_TASK_STATE
1241                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1242                                this.backend.task_execution_completed(
1243                                    task_id,
1244                                    result,
1245                                    &cell_counters,
1246                                    #[cfg(feature = "verify_determinism")]
1247                                    finished_state.stateful,
1248                                    finished_state.has_invalidator,
1249                                    &*this,
1250                                )
1251                            }
1252                            .instrument(span)
1253                            .await
1254                        };
1255                        schedule_again = CURRENT_TASK_STATE
1256                            .scope(current_task_state, single_execution_future)
1257                            .await;
1258                    }
1259                    this.finish_foreground_job();
1260                };
1261
1262                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1263            }
1264            ScheduledTask::LocalTask {
1265                ty,
1266                persistence,
1267                local_task_id,
1268                global_task_state,
1269                span,
1270            } => {
1271                let this2 = this.clone();
1272                let this = this.clone();
1273                let task_type = ty.task_type;
1274                let future = async move {
1275                    let span = match &ty.task_type {
1276                        LocalTaskType::ResolveNative { native_fn } => {
1277                            native_fn.resolve_span(priority)
1278                        }
1279                        LocalTaskType::ResolveTrait { trait_method } => {
1280                            trait_method.resolve_span(priority)
1281                        }
1282                    };
1283                    async move {
1284                        let result = match ty.task_type {
1285                            LocalTaskType::ResolveNative { native_fn } => {
1286                                LocalTaskType::run_resolve_native(
1287                                    native_fn,
1288                                    ty.this,
1289                                    &*ty.arg,
1290                                    persistence,
1291                                    this,
1292                                )
1293                                .await
1294                            }
1295                            LocalTaskType::ResolveTrait { trait_method } => {
1296                                LocalTaskType::run_resolve_trait(
1297                                    trait_method,
1298                                    ty.this.unwrap(),
1299                                    &*ty.arg,
1300                                    persistence,
1301                                    this,
1302                                )
1303                                .await
1304                            }
1305                        };
1306
1307                        let output = match result {
1308                            Ok(raw_vc) => OutputContent::Link(raw_vc),
1309                            Err(err) => OutputContent::Error(
1310                                TurboTasksExecutionError::from(err)
1311                                    .with_local_task_context(task_type.to_string()),
1312                            ),
1313                        };
1314
1315                        let local_task = LocalTask::Done { output };
1316
1317                        let done_event = CURRENT_TASK_STATE.with(move |gts| {
1318                            let mut gts_write = gts.write().unwrap();
1319                            let scheduled_task = std::mem::replace(
1320                                gts_write.get_mut_local_task(local_task_id),
1321                                local_task,
1322                            );
1323                            let LocalTask::Scheduled { done_event } = scheduled_task else {
1324                                panic!("local task finished, but was not in the scheduled state?");
1325                            };
1326                            done_event
1327                        });
1328                        done_event.notify(usize::MAX)
1329                    }
1330                    .instrument(span)
1331                    .await
1332                };
1333                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1334
1335                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1336            }
1337        }
1338    }
1339}
1340
1341struct FinishedTaskState {
1342    /// True if the task has state in cells (interior mutability).
1343    /// Only tracked when verify_determinism feature is enabled.
1344    #[cfg(feature = "verify_determinism")]
1345    stateful: bool,
1346
1347    /// True if the task uses an external invalidator
1348    has_invalidator: bool,
1349}
1350
1351impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1352    fn dynamic_call(
1353        &self,
1354        native_fn: &'static NativeFunction,
1355        this: Option<RawVc>,
1356        arg: &mut dyn StackDynTaskInputs,
1357        persistence: TaskPersistence,
1358    ) -> RawVc {
1359        self.dynamic_call(native_fn, this, arg, persistence)
1360    }
1361    fn native_call(
1362        &self,
1363        native_fn: &'static NativeFunction,
1364        this: Option<RawVc>,
1365        arg: &mut dyn StackDynTaskInputs,
1366        persistence: TaskPersistence,
1367    ) -> RawVc {
1368        self.native_call(native_fn, this, arg, persistence)
1369    }
1370    fn trait_call(
1371        &self,
1372        trait_method: &'static TraitMethod,
1373        this: RawVc,
1374        arg: &mut dyn StackDynTaskInputs,
1375        persistence: TaskPersistence,
1376    ) -> RawVc {
1377        self.trait_call(trait_method, this, arg, persistence)
1378    }
1379
1380    #[track_caller]
1381    fn run(
1382        &self,
1383        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1384    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1385        let this = self.pin();
1386        Box::pin(async move { this.run(future).await })
1387    }
1388
1389    #[track_caller]
1390    fn run_once(
1391        &self,
1392        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1393    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1394        let this = self.pin();
1395        Box::pin(async move { this.run_once(future).await })
1396    }
1397
1398    #[track_caller]
1399    fn run_once_with_reason(
1400        &self,
1401        reason: StaticOrArc<dyn InvalidationReason>,
1402        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1403    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1404        {
1405            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1406            reason_set.insert(reason);
1407        }
1408        let this = self.pin();
1409        Box::pin(async move { this.run_once(future).await })
1410    }
1411
1412    #[track_caller]
1413    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1414        self.start_once_process(future)
1415    }
1416
1417    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1418        if let Err(e) = self.compilation_events.send(event) {
1419            tracing::warn!("Failed to send compilation event: {e}");
1420        }
1421    }
1422
1423    fn get_task_name(&self, task: TaskId) -> String {
1424        self.backend.get_task_name(task, self)
1425    }
1426}
1427
1428impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1429    #[instrument(level = "info", skip_all, name = "invalidate")]
1430    fn invalidate(&self, task: TaskId) {
1431        self.backend.invalidate_task(task, self);
1432    }
1433
1434    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1435    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1436        {
1437            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1438            reason_set.insert(reason);
1439        }
1440        self.backend.invalidate_task(task, self);
1441    }
1442
1443    fn invalidate_serialization(&self, task: TaskId) {
1444        self.backend.invalidate_serialization(task, self);
1445    }
1446
1447    #[track_caller]
1448    fn try_read_task_output(
1449        &self,
1450        task: TaskId,
1451        options: ReadOutputOptions,
1452    ) -> Result<Result<RawVc, EventListener>> {
1453        if options.consistency == ReadConsistency::Eventual {
1454            debug_assert_not_in_top_level_task("read_task_output");
1455        }
1456        self.backend.try_read_task_output(
1457            task,
1458            current_task_if_available("reading Vcs"),
1459            options,
1460            self,
1461        )
1462    }
1463
1464    #[track_caller]
1465    fn try_read_task_cell(
1466        &self,
1467        task: TaskId,
1468        index: CellId,
1469        options: ReadCellOptions,
1470    ) -> Result<Result<TypedCellContent, EventListener>> {
1471        let reader = current_task_if_available("reading Vcs");
1472        self.backend
1473            .try_read_task_cell(task, index, reader, options, self)
1474    }
1475
1476    fn try_read_own_task_cell(
1477        &self,
1478        current_task: TaskId,
1479        index: CellId,
1480    ) -> Result<TypedCellContent> {
1481        self.backend
1482            .try_read_own_task_cell(current_task, index, self)
1483    }
1484
1485    #[track_caller]
1486    fn try_read_local_output(
1487        &self,
1488        execution_id: ExecutionId,
1489        local_task_id: LocalTaskId,
1490    ) -> Result<Result<RawVc, EventListener>> {
1491        debug_assert_not_in_top_level_task("read_local_output");
1492        CURRENT_TASK_STATE.with(|gts| {
1493            let gts_read = gts.read().unwrap();
1494
1495            // Local Vcs are local to their parent task's current execution, and do not exist
1496            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1497            // marker trait. This assertion exists to handle any potential escapes that the
1498            // compile-time checks cannot capture.
1499            gts_read.assert_execution_id(execution_id);
1500
1501            match gts_read.get_local_task(local_task_id) {
1502                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1503                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1504            }
1505        })
1506    }
1507
1508    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1509        // TODO: Add assert_not_in_top_level_task("read_task_collectibles") check here.
1510        // Collectible reads are eventually consistent.
1511        self.backend.read_task_collectibles(
1512            task,
1513            trait_id,
1514            current_task_if_available("reading collectibles"),
1515            self,
1516        )
1517    }
1518
1519    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1520        self.backend.emit_collectible(
1521            trait_type,
1522            collectible,
1523            current_task("emitting collectible"),
1524            self,
1525        );
1526    }
1527
1528    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1529        self.backend.unemit_collectible(
1530            trait_type,
1531            collectible,
1532            count,
1533            current_task("emitting collectible"),
1534            self,
1535        );
1536    }
1537
1538    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1539        for (&collectible, &count) in collectibles {
1540            if count > 0 {
1541                self.backend.unemit_collectible(
1542                    trait_type,
1543                    collectible,
1544                    count as u32,
1545                    current_task("emitting collectible"),
1546                    self,
1547                );
1548            }
1549        }
1550    }
1551
1552    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1553        self.try_read_own_task_cell(task, index)
1554    }
1555
1556    fn update_own_task_cell(
1557        &self,
1558        task: TaskId,
1559        index: CellId,
1560        content: CellContent,
1561        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1562        content_hash: Option<CellHash>,
1563        verification_mode: VerificationMode,
1564    ) {
1565        self.backend.update_task_cell(
1566            task,
1567            index,
1568            content,
1569            updated_key_hashes,
1570            content_hash,
1571            verification_mode,
1572            self,
1573        );
1574    }
1575
1576    fn connect_task(&self, task: TaskId) {
1577        self.backend
1578            .connect_task(task, current_task_if_available("connecting task"), self);
1579    }
1580
1581    fn mark_own_task_as_finished(&self, task: TaskId) {
1582        self.backend.mark_own_task_as_finished(task, self);
1583    }
1584
1585    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1586        self.backend.mark_own_task_as_session_dependent(task, self);
1587    }
1588
1589    /// Creates a future that inherits the current task id and task state. The current global task
1590    /// will wait for this future to be dropped before exiting.
1591    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1592        // this is similar to what happens for a local task, except that we keep the local task's
1593        // state as well.
1594        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1595        let fut = tokio::spawn(TURBO_TASKS.scope(
1596            turbo_tasks(),
1597            CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1598        ));
1599        let fut = Box::pin(async move {
1600            fut.await.unwrap();
1601        });
1602        let mut ts = global_task_state.write().unwrap();
1603        ts.local_task_tracker
1604            .get_or_insert_default()
1605            .push(Either::Right(fut));
1606    }
1607
1608    fn task_statistics(&self) -> &TaskStatisticsApi {
1609        self.backend.task_statistics()
1610    }
1611
1612    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1613        let this = self.pin();
1614        Box::pin(async move {
1615            this.stop_and_wait().await;
1616        })
1617    }
1618
1619    fn subscribe_to_compilation_events(
1620        &self,
1621        event_types: Option<Vec<String>>,
1622    ) -> Receiver<Arc<dyn CompilationEvent>> {
1623        self.compilation_events.subscribe(event_types)
1624    }
1625
1626    fn is_tracking_dependencies(&self) -> bool {
1627        self.backend.is_tracking_dependencies()
1628    }
1629}
1630
1631impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1632    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1633        self.pin()
1634    }
1635    fn backend(&self) -> &B {
1636        &self.backend
1637    }
1638
1639    #[track_caller]
1640    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1641        self.schedule_background_job(async move |this| {
1642            this.backend.run_backend_job(job, &*this).await;
1643            this
1644        })
1645    }
1646
1647    #[track_caller]
1648    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1649        self.schedule_foreground_job(async move |this| {
1650            this.backend.run_backend_job(job, &*this).await;
1651            this
1652        })
1653    }
1654
1655    #[track_caller]
1656    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1657        self.schedule(task, priority)
1658    }
1659
1660    fn get_current_task_priority(&self) -> TaskPriority {
1661        CURRENT_TASK_STATE
1662            .try_with(|task_state| task_state.read().unwrap().priority)
1663            .unwrap_or(TaskPriority::initial())
1664    }
1665
1666    fn program_duration_until(&self, instant: Instant) -> Duration {
1667        instant - self.program_start
1668    }
1669
1670    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1671        // SAFETY: This is a fresh id from the factory
1672        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1673    }
1674
1675    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1676        // SAFETY: This is a fresh id from the factory
1677        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1678    }
1679
1680    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1681        unsafe { self.task_id_factory.reuse(id.into()) }
1682    }
1683
1684    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1685        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1686    }
1687
1688    fn is_idle(&self) -> bool {
1689        self.currently_scheduled_foreground_jobs
1690            .load(Ordering::Acquire)
1691            == 0
1692    }
1693}
1694
1695async fn wait_for_local_tasks() {
1696    while let Some(mut ltt) =
1697        CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1698    {
1699        use futures::StreamExt;
1700        while ltt.next().await.is_some() {}
1701    }
1702}
1703
1704pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1705    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1706        Ok(id) => id,
1707        Err(_) => panic!(
1708            "{from} can only be used in the context of a turbo_tasks task execution or \
1709             turbo_tasks run"
1710        ),
1711    }
1712}
1713
1714pub(crate) fn current_task(from: &str) -> TaskId {
1715    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1716        Ok(Some(id)) => id,
1717        Ok(None) | Err(_) => {
1718            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1719        }
1720    }
1721}
1722
1723/// Panics if we're not in a top-level task (e.g. [`run_once`]). Some function calls should only
1724/// happen in a top-level task (e.g. [`Effects::apply`][crate::Effects::apply]).
1725#[track_caller]
1726pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1727    if !cfg!(debug_assertions) {
1728        return;
1729    }
1730
1731    let in_top_level = CURRENT_TASK_STATE
1732        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1733        .unwrap_or(true);
1734    if !in_top_level {
1735        panic!("{message}");
1736    }
1737}
1738
1739#[track_caller]
1740pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1741    if !cfg!(debug_assertions) {
1742        return;
1743    }
1744
1745    // HACK: We set this inside of `ReadRawVcFuture` to suppress warnings about an internal
1746    // consistency bug
1747    let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1748        .try_with(|&suppressed| suppressed)
1749        .unwrap_or(false);
1750    if suppressed {
1751        return;
1752    }
1753
1754    let in_top_level = CURRENT_TASK_STATE
1755        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1756        .unwrap_or(false);
1757    if in_top_level {
1758        panic!(
1759            "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1760             Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1761             reads to avoid leaking inconsistent return values."
1762        );
1763    }
1764}
1765
1766pub async fn run<T: Send + 'static>(
1767    tt: Arc<dyn TurboTasksApi>,
1768    future: impl Future<Output = Result<T>> + Send + 'static,
1769) -> Result<T> {
1770    let (tx, rx) = tokio::sync::oneshot::channel();
1771
1772    tt.run(Box::pin(async move {
1773        let result = future.await?;
1774        tx.send(result)
1775            .map_err(|_| anyhow!("unable to send result"))?;
1776        Ok(())
1777    }))
1778    .await?;
1779
1780    Ok(rx.await?)
1781}
1782
1783pub async fn run_once<T: Send + 'static>(
1784    tt: Arc<dyn TurboTasksApi>,
1785    future: impl Future<Output = Result<T>> + Send + 'static,
1786) -> Result<T> {
1787    let (tx, rx) = tokio::sync::oneshot::channel();
1788
1789    tt.run_once(Box::pin(async move {
1790        let result = future.await?;
1791        tx.send(result)
1792            .map_err(|_| anyhow!("unable to send result"))?;
1793        Ok(())
1794    }))
1795    .await?;
1796
1797    Ok(rx.await?)
1798}
1799
1800pub async fn run_once_with_reason<T: Send + 'static>(
1801    tt: Arc<dyn TurboTasksApi>,
1802    reason: impl InvalidationReason,
1803    future: impl Future<Output = Result<T>> + Send + 'static,
1804) -> Result<T> {
1805    let (tx, rx) = tokio::sync::oneshot::channel();
1806
1807    tt.run_once_with_reason(
1808        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1809        Box::pin(async move {
1810            let result = future.await?;
1811            tx.send(result)
1812                .map_err(|_| anyhow!("unable to send result"))?;
1813            Ok(())
1814        }),
1815    )
1816    .await?;
1817
1818    Ok(rx.await?)
1819}
1820
1821/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1822pub fn dynamic_call(
1823    func: &'static NativeFunction,
1824    this: Option<RawVc>,
1825    arg: &mut dyn StackDynTaskInputs,
1826    persistence: TaskPersistence,
1827) -> RawVc {
1828    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1829}
1830
1831/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1832pub fn trait_call(
1833    trait_method: &'static TraitMethod,
1834    this: RawVc,
1835    arg: &mut dyn StackDynTaskInputs,
1836    persistence: TaskPersistence,
1837) -> RawVc {
1838    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1839}
1840
1841pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1842    TURBO_TASKS.with(|arc| arc.clone())
1843}
1844
1845pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1846    TURBO_TASKS.with(Arc::downgrade)
1847}
1848
1849pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1850    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1851}
1852
1853pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1854    TURBO_TASKS.with(|arc| func(arc))
1855}
1856
1857pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1858    TURBO_TASKS.sync_scope(tt, f)
1859}
1860
1861pub fn turbo_tasks_future_scope<T>(
1862    tt: Arc<dyn TurboTasksApi>,
1863    f: impl Future<Output = T>,
1864) -> impl Future<Output = T> {
1865    TURBO_TASKS.scope(tt, f)
1866}
1867
1868pub fn with_turbo_tasks_for_testing<T>(
1869    tt: Arc<dyn TurboTasksApi>,
1870    current_task: TaskId,
1871    execution_id: ExecutionId,
1872    f: impl Future<Output = T>,
1873) -> impl Future<Output = T> {
1874    TURBO_TASKS.scope(
1875        tt,
1876        CURRENT_TASK_STATE.scope(
1877            Arc::new(RwLock::new(CurrentTaskState::new(
1878                current_task,
1879                execution_id,
1880                TaskPriority::initial(),
1881                false, // in_top_level_task
1882            ))),
1883            f,
1884        ),
1885    )
1886}
1887
1888/// Spawns the given future within the context of the current task.
1889///
1890/// Beware: this method is not safe to use in production code. It is only
1891/// intended for use in tests and for debugging purposes.
1892pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1893    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1894}
1895
1896pub fn current_task_for_testing() -> Option<TaskId> {
1897    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1898}
1899
1900/// Marks the current task as dirty when restored from filesystem cache.
1901pub fn mark_session_dependent() {
1902    with_turbo_tasks(|tt| {
1903        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1904    });
1905}
1906
1907/// Marks the current task as finished. This excludes it from waiting for
1908/// strongly consistency.
1909pub fn mark_finished() {
1910    with_turbo_tasks(|tt| {
1911        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1912    });
1913}
1914
1915/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1916/// serialization of the current task cells.
1917///
1918/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1919/// since State allocation implies interior mutability.
1920pub fn get_serialization_invalidator() -> SerializationInvalidator {
1921    CURRENT_TASK_STATE.with(|cell| {
1922        let CurrentTaskState {
1923            task_id,
1924            #[cfg(feature = "verify_determinism")]
1925            stateful,
1926            ..
1927        } = &mut *cell.write().unwrap();
1928        #[cfg(feature = "verify_determinism")]
1929        {
1930            *stateful = true;
1931        }
1932        let Some(task_id) = *task_id else {
1933            panic!(
1934                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1935                 task execution"
1936            );
1937        };
1938        SerializationInvalidator::new(task_id)
1939    })
1940}
1941
1942pub fn mark_invalidator() {
1943    CURRENT_TASK_STATE.with(|cell| {
1944        let CurrentTaskState {
1945            has_invalidator, ..
1946        } = &mut *cell.write().unwrap();
1947        *has_invalidator = true;
1948    })
1949}
1950
1951/// Marks the current task as stateful. This is used to indicate that the task
1952/// has interior mutability (e.g., via State or TransientState), which means
1953/// the task may produce different outputs even with the same inputs.
1954///
1955/// Only has an effect when the `verify_determinism` feature is enabled.
1956pub fn mark_stateful() {
1957    #[cfg(feature = "verify_determinism")]
1958    {
1959        CURRENT_TASK_STATE.with(|cell| {
1960            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1961            *stateful = true;
1962        })
1963    }
1964    // No-op when verify_determinism is not enabled
1965}
1966
1967/// Marks the current task context as being in a top-level task. When in a top-level task,
1968/// eventually consistent reads will panic. It is almost always a mistake to perform an eventually
1969/// consistent read at the top-level of the application.
1970pub fn mark_top_level_task() {
1971    if cfg!(debug_assertions) {
1972        CURRENT_TASK_STATE.with(|cell| {
1973            cell.write().unwrap().in_top_level_task = true;
1974        })
1975    }
1976}
1977
1978/// Unmarks the current task context as being in a top-level task. The opposite of
1979/// [`mark_top_level_task`].
1980///
1981/// This utility can be okay in unit tests, where we're observing the internal behavior of
1982/// turbo-tasks, but otherwise, it is probably a mistake to call this function.
1983///
1984/// Calling this will allow eventually-consistent reads at the top-level, potentially exposing
1985/// incomplete computations and internal errors caused by eventual consistency that would've been
1986/// caught when the function was re-run. A strongly-consistent read re-runs parts of a task until
1987/// all of the dependencies have settled.
1988pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1989    if cfg!(debug_assertions) {
1990        CURRENT_TASK_STATE.with(|cell| {
1991            cell.write().unwrap().in_top_level_task = false;
1992        })
1993    }
1994}
1995
1996pub fn prevent_gc() {
1997    // TODO implement garbage collection
1998}
1999
2000pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
2001    with_turbo_tasks(|tt| {
2002        let raw_vc = collectible.node.node;
2003        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
2004    })
2005}
2006
2007pub(crate) async fn read_task_output(
2008    this: &dyn TurboTasksApi,
2009    id: TaskId,
2010    options: ReadOutputOptions,
2011) -> Result<RawVc> {
2012    loop {
2013        match this.try_read_task_output(id, options)? {
2014            Ok(result) => return Ok(result),
2015            Err(listener) => listener.await,
2016        }
2017    }
2018}
2019
2020/// A reference to a task's cell with methods that allow updating the contents
2021/// of the cell.
2022///
2023/// Mutations should not outside of the task that that owns this cell. Doing so
2024/// is a logic error, and may lead to incorrect caching behavior.
2025#[derive(Clone, Copy)]
2026pub struct CurrentCellRef {
2027    current_task: TaskId,
2028    index: CellId,
2029}
2030
2031type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2032
2033impl CurrentCellRef {
2034    /// Updates the cell if the given `functor` returns a value.
2035    fn conditional_update<T>(
2036        &self,
2037        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2038    ) where
2039        T: VcValueType,
2040    {
2041        self.conditional_update_with_shared_reference(|old_shared_reference| {
2042            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2043            let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2044            Some((
2045                SharedReference::new(triomphe::Arc::new(new_value)),
2046                updated_key_hashes,
2047                content_hash,
2048            ))
2049        })
2050    }
2051
2052    /// Updates the cell if the given `functor` returns a `SharedReference`.
2053    fn conditional_update_with_shared_reference(
2054        &self,
2055        functor: impl FnOnce(
2056            Option<&SharedReference>,
2057        ) -> Option<(
2058            SharedReference,
2059            Option<SmallVec<[u64; 2]>>,
2060            Option<CellHash>,
2061        )>,
2062    ) {
2063        let tt = turbo_tasks();
2064        let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2065        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2066        if let Some((update, updated_key_hashes, content_hash)) = update {
2067            tt.update_own_task_cell(
2068                self.current_task,
2069                self.index,
2070                CellContent(Some(update)),
2071                updated_key_hashes,
2072                content_hash,
2073                VerificationMode::EqualityCheck,
2074            )
2075        }
2076    }
2077
2078    /// Replace the current cell's content with `new_value` if the current content is not equal by
2079    /// value with the existing content.
2080    ///
2081    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
2082    ///
2083    /// Take this example of a custom equality implementation on a transparent wrapper type:
2084    ///
2085    /// ```
2086    /// #[turbo_tasks::value(transparent, eq = "manual")]
2087    /// struct Wrapper(Vec<u32>);
2088    ///
2089    /// impl PartialEq for Wrapper {
2090    ///     fn eq(&self, other: Wrapper) {
2091    ///         // Example: order doesn't matter for equality
2092    ///         let (mut this, mut other) = (self.clone(), other.clone());
2093    ///         this.sort_unstable();
2094    ///         other.sort_unstable();
2095    ///         this == other
2096    ///     }
2097    /// }
2098    ///
2099    /// impl Eq for Wrapper {}
2100    /// ```
2101    ///
2102    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2103    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2104    ///
2105    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2106    /// just forwards to the inner value's [`PartialEq`].
2107    ///
2108    /// If you already have a `SharedReference`, consider calling
2109    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2110    /// object.
2111    pub fn compare_and_update<T>(&self, new_value: T)
2112    where
2113        T: PartialEq + VcValueType,
2114    {
2115        self.conditional_update(|old_value| {
2116            if let Some(old_value) = old_value
2117                && old_value == &new_value
2118            {
2119                return None;
2120            }
2121            Some((new_value, None, None))
2122        });
2123    }
2124
2125    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2126    /// equal by value with the existing content.
2127    ///
2128    /// If you already have a `SharedReference`, this is a faster version of
2129    /// [`CurrentCellRef::compare_and_update`].
2130    ///
2131    /// The value should be stored in [`SharedReference`] using the type `T`.
2132    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2133    where
2134        T: VcValueType + PartialEq,
2135    {
2136        self.conditional_update_with_shared_reference(|old_sr| {
2137            if let Some(old_sr) = old_sr {
2138                let old_value = extract_sr_value::<T>(old_sr);
2139                let new_value = extract_sr_value::<T>(&new_shared_reference);
2140                if old_value == new_value {
2141                    return None;
2142                }
2143            }
2144            Some((new_shared_reference, None, None))
2145        });
2146    }
2147
2148    /// Replace the current cell's content if the new value is different.
2149    ///
2150    /// Like [`Self::compare_and_update`], but also computes and stores a hash of the value.
2151    /// When the cell's transient data is evicted, the stored hash enables the backend to detect
2152    /// whether the value actually changed without re-comparing values—avoiding unnecessary
2153    /// downstream invalidation.
2154    ///
2155    /// Requires `T: DeterministicHash` in addition to `T: PartialEq`.
2156    pub fn hashed_compare_and_update<T>(&self, new_value: T)
2157    where
2158        T: PartialEq + DeterministicHash + VcValueType,
2159    {
2160        self.conditional_update(|old_value| {
2161            if let Some(old_value) = old_value
2162                && old_value == &new_value
2163            {
2164                return None;
2165            }
2166            let content_hash = hash_xxh3_hash128(&new_value);
2167            Some((new_value, None, Some(content_hash)))
2168        });
2169    }
2170
2171    /// Replace the current cell's content if the new value (from a pre-existing
2172    /// [`SharedReference`]) is different.
2173    ///
2174    /// Like [`Self::compare_and_update_with_shared_reference`], but also passes a hash
2175    /// for hash-based change detection when transient data has been evicted.
2176    pub fn hashed_compare_and_update_with_shared_reference<T>(
2177        &self,
2178        new_shared_reference: SharedReference,
2179    ) where
2180        T: VcValueType + PartialEq + DeterministicHash,
2181    {
2182        self.conditional_update_with_shared_reference(move |old_sr| {
2183            if let Some(old_sr) = old_sr {
2184                let old_value = extract_sr_value::<T>(old_sr);
2185                let new_value = extract_sr_value::<T>(&new_shared_reference);
2186                if old_value == new_value {
2187                    return None;
2188                }
2189            }
2190            let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2191            Some((new_shared_reference, None, Some(content_hash)))
2192        });
2193    }
2194
2195    /// See [`Self::compare_and_update`], but selectively update individual keys.
2196    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2197    where
2198        T: PartialEq + VcValueType,
2199        VcReadTarget<T>: KeyedEq,
2200        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2201    {
2202        self.conditional_update(|old_value| {
2203            let Some(old_value) = old_value else {
2204                return Some((new_value, None, None));
2205            };
2206            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2207            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2208            let updated_keys = old_value.different_keys(new_value_ref);
2209            if updated_keys.is_empty() {
2210                return None;
2211            }
2212            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2213            let updated_key_hashes = updated_keys
2214                .into_iter()
2215                .map(|key| FxBuildHasher.hash_one(key))
2216                .collect();
2217            Some((new_value, Some(updated_key_hashes), None))
2218        });
2219    }
2220
2221    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2222    /// keys.
2223    pub fn keyed_compare_and_update_with_shared_reference<T>(
2224        &self,
2225        new_shared_reference: SharedReference,
2226    ) where
2227        T: VcValueType + PartialEq,
2228        VcReadTarget<T>: KeyedEq,
2229        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2230    {
2231        self.conditional_update_with_shared_reference(|old_sr| {
2232            let Some(old_sr) = old_sr else {
2233                return Some((new_shared_reference, None, None));
2234            };
2235            let old_value = extract_sr_value::<T>(old_sr);
2236            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2237            let new_value = extract_sr_value::<T>(&new_shared_reference);
2238            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2239            let updated_keys = old_value.different_keys(new_value);
2240            if updated_keys.is_empty() {
2241                return None;
2242            }
2243            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2244            let updated_key_hashes = updated_keys
2245                .into_iter()
2246                .map(|key| FxBuildHasher.hash_one(key))
2247                .collect();
2248            Some((new_shared_reference, Some(updated_key_hashes), None))
2249        });
2250    }
2251
2252    /// Unconditionally updates the content of the cell.
2253    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2254    where
2255        T: VcValueType,
2256    {
2257        let tt = turbo_tasks();
2258        tt.update_own_task_cell(
2259            self.current_task,
2260            self.index,
2261            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2262            None,
2263            None,
2264            verification_mode,
2265        )
2266    }
2267
2268    /// A faster version of [`Self::update`] if you already have a
2269    /// [`SharedReference`].
2270    ///
2271    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2272    /// by identity, no update is performed.
2273    ///
2274    /// The value should be stored in [`SharedReference`] using the type `T`.
2275    pub fn update_with_shared_reference(
2276        &self,
2277        shared_ref: SharedReference,
2278        verification_mode: VerificationMode,
2279    ) {
2280        let tt = turbo_tasks();
2281        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2282            let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2283            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2284                // pointer equality (not value equality)
2285                shared_ref_exp != shared_ref
2286            } else {
2287                true
2288            }
2289        } else {
2290            true
2291        };
2292        if update {
2293            tt.update_own_task_cell(
2294                self.current_task,
2295                self.index,
2296                CellContent(Some(shared_ref)),
2297                None,
2298                None,
2299                verification_mode,
2300            )
2301        }
2302    }
2303}
2304
2305impl From<CurrentCellRef> for RawVc {
2306    fn from(cell: CurrentCellRef) -> Self {
2307        RawVc::TaskCell(cell.current_task, cell.index)
2308    }
2309}
2310
2311fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2312    sr.0.downcast_ref::<T>()
2313        .expect("cannot update SharedReference of different type")
2314}
2315
2316pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2317    find_cell_by_id(T::get_value_type_id())
2318}
2319
2320pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2321    CURRENT_TASK_STATE.with(|ts| {
2322        let current_task = current_task("celling turbo_tasks values");
2323        let mut ts = ts.write().unwrap();
2324        let map = ts.cell_counters.as_mut().unwrap();
2325        let current_index = map.entry(ty).or_default();
2326        let index = *current_index;
2327        *current_index += 1;
2328        CurrentCellRef {
2329            current_task,
2330            index: CellId { type_id: ty, index },
2331        }
2332    })
2333}
2334
2335pub(crate) async fn read_local_output(
2336    this: &dyn TurboTasksApi,
2337    execution_id: ExecutionId,
2338    local_task_id: LocalTaskId,
2339) -> Result<RawVc> {
2340    loop {
2341        match this.try_read_local_output(execution_id, local_task_id)? {
2342            Ok(raw_vc) => return Ok(raw_vc),
2343            Err(event_listener) => event_listener.await,
2344        }
2345    }
2346}