Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    panic::AssertUnwindSafe,
8    pin::Pin,
9    process::abort,
10    sync::{
11        Arc, Mutex, RwLock, Weak,
12        atomic::{AtomicBool, AtomicUsize, Ordering},
13    },
14    time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
31    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
32    VcValueTrait, VcValueType,
33    backend::{
34        Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35        TurboTasksExecutionError, TypedCellContent, VerificationMode,
36    },
37    capture_future::CaptureFuture,
38    dyn_task_inputs::StackDynTaskInputs,
39    event::{Event, EventListener},
40    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
41    id_factory::IdFactoryWithReuse,
42    keyed::KeyedEq,
43    local_task_tracker::LocalTaskTracker,
44    macro_helpers::NativeFunction,
45    message_queue::{CompilationEvent, CompilationEventQueue},
46    priority_runner::{Executor, PriorityRunner},
47    raw_vc::{CellId, RawVc},
48    registry,
49    serialization_invalidation::SerializationInvalidator,
50    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
51    task_statistics::TaskStatisticsApi,
52    trace::TraceRawVcs,
53    util::{IdFactory, StaticOrArc},
54};
55
56/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
57/// tasks from function calls.
58pub trait TurboTasksCallApi: Sync + Send {
59    /// Calls a native function with arguments. Resolves arguments when needed
60    /// with a wrapper task.
61    fn dynamic_call(
62        &self,
63        native_fn: &'static NativeFunction,
64        this: Option<RawVc>,
65        arg: &mut dyn StackDynTaskInputs,
66        persistence: TaskPersistence,
67    ) -> RawVc;
68    /// Call a native function with arguments.
69    /// All inputs must be resolved.
70    fn native_call(
71        &self,
72        native_fn: &'static NativeFunction,
73        this: Option<RawVc>,
74        arg: &mut dyn StackDynTaskInputs,
75        persistence: TaskPersistence,
76    ) -> RawVc;
77    /// Calls a trait method with arguments. First input is the `self` object.
78    /// Uses a wrapper task to resolve
79    fn trait_call(
80        &self,
81        trait_method: &'static TraitMethod,
82        this: RawVc,
83        arg: &mut dyn StackDynTaskInputs,
84        persistence: TaskPersistence,
85    ) -> RawVc;
86
87    fn run(
88        &self,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
91    fn run_once(
92        &self,
93        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
95    fn run_once_with_reason(
96        &self,
97        reason: StaticOrArc<dyn InvalidationReason>,
98        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
99    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
100    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
101
102    /// Sends a compilation event to subscribers.
103    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
104
105    /// Returns a human-readable name for the given task.
106    fn get_task_name(&self, task: TaskId) -> String;
107}
108
109/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
110/// context. Returned by the [`turbo_tasks`] helper function.
111///
112/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
113/// parameter.
114pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
115    fn invalidate(&self, task: TaskId);
116    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
117
118    fn invalidate_serialization(&self, task: TaskId);
119
120    fn try_read_task_output(
121        &self,
122        task: TaskId,
123        options: ReadOutputOptions,
124    ) -> Result<Result<RawVc, EventListener>>;
125
126    fn try_read_task_cell(
127        &self,
128        task: TaskId,
129        index: CellId,
130        options: ReadCellOptions,
131    ) -> Result<Result<TypedCellContent, EventListener>>;
132
133    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
134    /// task points to.
135    ///
136    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
137    /// recursively or in a loop.
138    ///
139    /// This does not accept a consistency argument, as you cannot control consistency of a read of
140    /// an operation owned by your own task. Strongly consistent reads are only allowed on
141    /// [`OperationVc`]s, which should never be local tasks.
142    ///
143    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
144    /// task to depend on itself.
145    ///
146    /// [`OperationVc`]: crate::OperationVc
147    fn try_read_local_output(
148        &self,
149        execution_id: ExecutionId,
150        local_task_id: LocalTaskId,
151    ) -> Result<Result<RawVc, EventListener>>;
152
153    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
154
155    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
156    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
157    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
158
159    /// INVALIDATION: Be careful with this, it will not track dependencies, so
160    /// using it could break cache invalidation.
161    fn try_read_own_task_cell(
162        &self,
163        current_task: TaskId,
164        index: CellId,
165    ) -> Result<TypedCellContent>;
166
167    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
168    fn update_own_task_cell(
169        &self,
170        task: TaskId,
171        index: CellId,
172        content: CellContent,
173        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
174        content_hash: Option<CellHash>,
175        verification_mode: VerificationMode,
176    );
177    fn mark_own_task_as_finished(&self, task: TaskId);
178    fn mark_own_task_as_session_dependent(&self, task: TaskId);
179
180    fn connect_task(&self, task: TaskId);
181
182    /// Wraps the given future in the current task.
183    ///
184    /// Beware: this method is not safe to use in production code. It is only intended for use in
185    /// tests and for debugging purposes.
186    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
187
188    fn task_statistics(&self) -> &TaskStatisticsApi;
189
190    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
191
192    fn subscribe_to_compilation_events(
193        &self,
194        event_types: Option<Vec<String>>,
195    ) -> Receiver<Arc<dyn CompilationEvent>>;
196
197    // Returns true if TurboTasks is configured to track dependencies.
198    fn is_tracking_dependencies(&self) -> bool;
199}
200
201/// A wrapper around a value that is unused.
202pub struct Unused<T> {
203    inner: T,
204}
205
206impl<T> Unused<T> {
207    /// Creates a new unused value.
208    ///
209    /// # Safety
210    ///
211    /// The wrapped value must not be used.
212    pub unsafe fn new_unchecked(inner: T) -> Self {
213        Self { inner }
214    }
215
216    /// Get the inner value, without consuming the `Unused` wrapper.
217    ///
218    /// # Safety
219    ///
220    /// The user need to make sure that the value stays unused.
221    pub unsafe fn get_unchecked(&self) -> &T {
222        &self.inner
223    }
224
225    /// Unwraps the value, consuming the `Unused` wrapper.
226    pub fn into(self) -> T {
227        self.inner
228    }
229}
230
231/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
232pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
233    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
234
235    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
236    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
237    /// # Safety
238    ///
239    /// The caller must ensure that the task id is not used anymore.
240    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
241    /// # Safety
242    ///
243    /// The caller must ensure that the task id is not used anymore.
244    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
245
246    /// Schedule a task for execution.
247    fn schedule(&self, task: TaskId, priority: TaskPriority);
248
249    /// Returns the priority of the current task.
250    fn get_current_task_priority(&self) -> TaskPriority;
251
252    /// Schedule a foreground backend job for execution.
253    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
254
255    /// Schedule a background backend job for execution.
256    ///
257    /// Background jobs are not counted towards activeness of the system. The system is considered
258    /// idle even with active background jobs.
259    fn schedule_backend_background_job(&self, job: B::BackendJob);
260
261    /// Returns the duration from the start of the program to the given instant.
262    fn program_duration_until(&self, instant: Instant) -> Duration;
263
264    /// Returns true if the system is idle.
265    fn is_idle(&self) -> bool;
266
267    /// Returns a reference to the backend.
268    fn backend(&self) -> &B;
269}
270
271#[allow(clippy::manual_non_exhaustive)]
272pub struct UpdateInfo {
273    pub duration: Duration,
274    pub tasks: usize,
275    pub reasons: InvalidationReasonSet,
276    #[allow(dead_code)]
277    placeholder_for_future_fields: (),
278}
279
280#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
281pub enum TaskPersistence {
282    /// Tasks that may be persisted across sessions using serialization.
283    Persistent,
284
285    /// Tasks that will be persisted in memory for the life of this session, but won't persist
286    /// between sessions.
287    ///
288    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
289    /// type [`TransientValue`][crate::value::TransientValue] or
290    /// [`TransientInstance`][crate::value::TransientInstance].
291    Transient,
292}
293
294impl Display for TaskPersistence {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        match self {
297            TaskPersistence::Persistent => write!(f, "persistent"),
298            TaskPersistence::Transient => write!(f, "transient"),
299        }
300    }
301}
302
303#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
304pub enum ReadConsistency {
305    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
306    /// may later trigger re-computation.
307    #[default]
308    Eventual,
309    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
310    /// the cost of slower reads.
311    ///
312    /// Top-level code that returns data to the user should use strongly consistent reads.
313    Strong,
314}
315
316#[derive(Clone, Copy, Debug, Eq, PartialEq)]
317pub enum ReadCellTracking {
318    /// Reads are tracked as dependencies of the current task.
319    Tracked {
320        /// The key used for the dependency
321        key: Option<u64>,
322    },
323    /// The read is only tracked when there is an error, otherwise it is untracked.
324    ///
325    /// INVALIDATION: Be careful with this, it will not track dependencies, so
326    /// using it could break cache invalidation.
327    TrackOnlyError,
328    /// The read is not tracked as a dependency of the current task.
329    ///
330    /// INVALIDATION: Be careful with this, it will not track dependencies, so
331    /// using it could break cache invalidation.
332    Untracked,
333}
334
335impl ReadCellTracking {
336    pub fn should_track(&self, is_err: bool) -> bool {
337        match self {
338            ReadCellTracking::Tracked { .. } => true,
339            ReadCellTracking::TrackOnlyError => is_err,
340            ReadCellTracking::Untracked => false,
341        }
342    }
343
344    pub fn key(&self) -> Option<u64> {
345        match self {
346            ReadCellTracking::Tracked { key } => *key,
347            ReadCellTracking::TrackOnlyError => None,
348            ReadCellTracking::Untracked => None,
349        }
350    }
351}
352
353impl Default for ReadCellTracking {
354    fn default() -> Self {
355        ReadCellTracking::Tracked { key: None }
356    }
357}
358
359impl Display for ReadCellTracking {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        match self {
362            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
363            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
364            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
365            ReadCellTracking::Untracked => write!(f, "untracked"),
366        }
367    }
368}
369
370#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
371pub enum ReadTracking {
372    /// Reads are tracked as dependencies of the current task.
373    #[default]
374    Tracked,
375    /// The read is only tracked when there is an error, otherwise it is untracked.
376    ///
377    /// INVALIDATION: Be careful with this, it will not track dependencies, so
378    /// using it could break cache invalidation.
379    TrackOnlyError,
380    /// The read is not tracked as a dependency of the current task.
381    ///
382    /// INVALIDATION: Be careful with this, it will not track dependencies, so
383    /// using it could break cache invalidation.
384    Untracked,
385}
386
387impl ReadTracking {
388    pub fn should_track(&self, is_err: bool) -> bool {
389        match self {
390            ReadTracking::Tracked => true,
391            ReadTracking::TrackOnlyError => is_err,
392            ReadTracking::Untracked => false,
393        }
394    }
395}
396
397impl Display for ReadTracking {
398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399        match self {
400            ReadTracking::Tracked => write!(f, "tracked"),
401            ReadTracking::TrackOnlyError => write!(f, "track only error"),
402            ReadTracking::Untracked => write!(f, "untracked"),
403        }
404    }
405}
406
407#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
408pub enum TaskPriority {
409    #[default]
410    Initial,
411    Invalidation {
412        priority: Reverse<u32>,
413    },
414}
415
416impl TaskPriority {
417    pub fn invalidation(priority: u32) -> Self {
418        Self::Invalidation {
419            priority: Reverse(priority),
420        }
421    }
422
423    pub fn initial() -> Self {
424        Self::Initial
425    }
426
427    pub fn leaf() -> Self {
428        Self::Invalidation {
429            priority: Reverse(0),
430        }
431    }
432
433    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
434        match self {
435            TaskPriority::Initial => parent_priority,
436            TaskPriority::Invalidation { priority } => {
437                if let TaskPriority::Invalidation {
438                    priority: parent_priority,
439                } = parent_priority
440                    && priority.0 < parent_priority.0
441                {
442                    Self::Invalidation {
443                        priority: Reverse(parent_priority.0.saturating_add(1)),
444                    }
445                } else {
446                    *self
447                }
448            }
449        }
450    }
451}
452
453impl Display for TaskPriority {
454    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455        match self {
456            TaskPriority::Initial => write!(f, "initial"),
457            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
458        }
459    }
460}
461
462enum ScheduledTask {
463    Task {
464        task_id: TaskId,
465        span: Span,
466    },
467    LocalTask {
468        ty: LocalTaskSpec,
469        persistence: TaskPersistence,
470        local_task_id: LocalTaskId,
471        global_task_state: Arc<RwLock<CurrentTaskState>>,
472        span: Span,
473    },
474}
475
476pub struct TurboTasks<B: Backend + 'static> {
477    this: Weak<Self>,
478    backend: B,
479    task_id_factory: IdFactoryWithReuse<TaskId>,
480    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
481    execution_id_factory: IdFactory<ExecutionId>,
482    stopped: AtomicBool,
483    currently_scheduled_foreground_jobs: AtomicUsize,
484    currently_scheduled_background_jobs: AtomicUsize,
485    scheduled_tasks: AtomicUsize,
486    priority_runner:
487        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
488    start: Mutex<Option<Instant>>,
489    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
490    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
491    event_foreground_start: Event,
492    /// Event that is triggered when all foreground jobs are done
493    /// (currently_scheduled_foreground_jobs becomes zero)
494    event_foreground_done: Event,
495    /// Event that is triggered when all background jobs are done
496    event_background_done: Event,
497    program_start: Instant,
498    compilation_events: CompilationEventQueue,
499}
500
501/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
502/// all share the same non-local task state.
503///
504/// A non-local task is one that:
505///
506/// - Has a unique task id.
507/// - Is potentially cached.
508/// - The backend is aware of.
509struct CurrentTaskState {
510    task_id: Option<TaskId>,
511    execution_id: ExecutionId,
512    priority: TaskPriority,
513
514    /// True if the current task has state in cells (interior mutability).
515    /// Only tracked when verify_determinism feature is enabled.
516    #[cfg(feature = "verify_determinism")]
517    stateful: bool,
518
519    /// True if the current task uses an external invalidator
520    has_invalidator: bool,
521
522    /// True if we're in a top-level task (e.g. `.run_once(...)` or `.run(...)`).
523    /// Eventually consistent reads are not allowed in top-level tasks.
524    in_top_level_task: bool,
525
526    /// Tracks how many cells of each type has been allocated so far during this task execution.
527    /// When a task is re-executed, the cell count may not match the existing cell vec length.
528    ///
529    /// This is taken (and becomes `None`) during teardown of a task.
530    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
531
532    /// Tracks execution of Local tasks (and detached test futures) created during this global
533    /// task's execution.
534    local_tasks: LocalTaskTracker,
535}
536
537impl CurrentTaskState {
538    fn new(
539        task_id: TaskId,
540        execution_id: ExecutionId,
541        priority: TaskPriority,
542        in_top_level_task: bool,
543    ) -> Self {
544        Self {
545            task_id: Some(task_id),
546            execution_id,
547            priority,
548            #[cfg(feature = "verify_determinism")]
549            stateful: false,
550            has_invalidator: false,
551            in_top_level_task,
552            cell_counters: Some(AutoMap::default()),
553            local_tasks: LocalTaskTracker::new(),
554        }
555    }
556
557    fn new_temporary(
558        execution_id: ExecutionId,
559        priority: TaskPriority,
560        in_top_level_task: bool,
561    ) -> Self {
562        Self {
563            task_id: None,
564            execution_id,
565            priority,
566            #[cfg(feature = "verify_determinism")]
567            stateful: false,
568            has_invalidator: false,
569            in_top_level_task,
570            cell_counters: None,
571            local_tasks: LocalTaskTracker::new(),
572        }
573    }
574
575    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
576        if self.execution_id != expected_execution_id {
577            panic!(
578                "Local tasks can only be scheduled/awaited within the same execution of the \
579                 parent task that created them"
580            );
581        }
582    }
583}
584
585// TODO implement our own thread pool and make these thread locals instead
586task_local! {
587    /// The current TurboTasks instance
588    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
589
590    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
591
592    /// Temporarily suppresses the eventual consistency check in top-level tasks.
593    /// This is used by strongly consistent reads to allow them to succeed in top-level tasks.
594    /// This is NOT shared across local tasks (unlike CURRENT_TASK_STATE), so it's safe
595    /// to set/unset without race conditions.
596    pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
597}
598
599impl<B: Backend + 'static> TurboTasks<B> {
600    // TODO better lifetime management for turbo tasks
601    // consider using unsafe for the task_local turbo tasks
602    // that should be safe as long tasks can't outlife turbo task
603    // so we probably want to make sure that all tasks are joined
604    // when trying to drop turbo tasks
605    pub fn new(backend: B) -> Arc<Self> {
606        let task_id_factory = IdFactoryWithReuse::new(
607            TaskId::MIN,
608            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
609        );
610        let transient_task_id_factory =
611            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
612        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
613        let this = Arc::new_cyclic(|this| Self {
614            this: this.clone(),
615            backend,
616            task_id_factory,
617            transient_task_id_factory,
618            execution_id_factory,
619            stopped: AtomicBool::new(false),
620            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
621            currently_scheduled_background_jobs: AtomicUsize::new(0),
622            scheduled_tasks: AtomicUsize::new(0),
623            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
624            start: Default::default(),
625            aggregated_update: Default::default(),
626            event_foreground_done: Event::new(|| {
627                || "TurboTasks::event_foreground_done".to_string()
628            }),
629            event_foreground_start: Event::new(|| {
630                || "TurboTasks::event_foreground_start".to_string()
631            }),
632            event_background_done: Event::new(|| {
633                || "TurboTasks::event_background_done".to_string()
634            }),
635            program_start: Instant::now(),
636            compilation_events: CompilationEventQueue::default(),
637        });
638        this.backend.startup(&*this);
639        this
640    }
641
642    pub fn pin(&self) -> Arc<Self> {
643        self.this.upgrade().unwrap()
644    }
645
646    /// Creates a new root task
647    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
648    where
649        T: ?Sized,
650        F: Fn() -> Fut + Send + Sync + Clone + 'static,
651        Fut: Future<Output = Result<Vc<T>>> + Send,
652    {
653        let id = self.backend.create_transient_task(
654            TransientTaskType::Root(Box::new(move || {
655                let functor = functor.clone();
656                Box::pin(async move {
657                    mark_top_level_task();
658                    let raw_vc = functor().await?.node;
659                    raw_vc.to_non_local().await
660                })
661            })),
662            self,
663        );
664        self.schedule(id, TaskPriority::initial());
665        id
666    }
667
668    pub fn dispose_root_task(&self, task_id: TaskId) {
669        self.backend.dispose_root_task(task_id, self);
670    }
671
672    // TODO make sure that all dependencies settle before reading them
673    /// Creates a new root task, that is only executed once.
674    /// Dependencies will not invalidate the task.
675    #[track_caller]
676    fn spawn_once_task<T, Fut>(&self, future: Fut)
677    where
678        T: ?Sized,
679        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
680    {
681        let id = self.backend.create_transient_task(
682            TransientTaskType::Once(Box::pin(async move {
683                mark_top_level_task();
684                let raw_vc = future.await?.node;
685                raw_vc.to_non_local().await
686            })),
687            self,
688        );
689        self.schedule(id, TaskPriority::initial());
690    }
691
692    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
693        &self,
694        future: impl Future<Output = Result<T>> + Send + 'static,
695    ) -> Result<T> {
696        let (tx, rx) = tokio::sync::oneshot::channel();
697        self.spawn_once_task(async move {
698            mark_top_level_task();
699            let result = future.await;
700            tx.send(result)
701                .map_err(|_| anyhow!("unable to send result"))?;
702            Ok(Completion::new())
703        });
704
705        rx.await?
706    }
707
708    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
709    pub async fn run<T: TraceRawVcs + Send + 'static>(
710        &self,
711        future: impl Future<Output = Result<T>> + Send + 'static,
712    ) -> Result<T, TurboTasksExecutionError> {
713        self.begin_foreground_job();
714        // it's okay for execution ids to overflow and wrap, they're just used for an assert
715        let execution_id = self.execution_id_factory.wrapping_get();
716        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
717            execution_id,
718            TaskPriority::initial(),
719            true, // in_top_level_task
720        )));
721
722        let result = TURBO_TASKS
723            .scope(
724                self.pin(),
725                CURRENT_TASK_STATE.scope(current_task_state, async {
726                    let result = CaptureFuture::new(future).await;
727
728                    // wait for all spawned local tasks using `local` to finish
729                    wait_for_local_tasks().await;
730
731                    match result {
732                        Ok(Ok(value)) => Ok(value),
733                        Ok(Err(err)) => Err(err.into()),
734                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
735                    }
736                }),
737            )
738            .await;
739        self.finish_foreground_job();
740        result
741    }
742
743    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
744        let this = self.pin();
745        tokio::spawn(async move {
746            this.pin()
747                .run_once(async move {
748                    this.finish_foreground_job();
749                    future.await;
750                    this.begin_foreground_job();
751                    Ok(())
752                })
753                .await
754                .unwrap()
755        });
756    }
757
758    pub(crate) fn native_call(
759        &self,
760        native_fn: &'static NativeFunction,
761        this: Option<RawVc>,
762        arg: &mut dyn StackDynTaskInputs,
763        persistence: TaskPersistence,
764    ) -> RawVc {
765        RawVc::TaskOutput(self.backend.get_or_create_task(
766            native_fn,
767            this,
768            arg,
769            current_task_if_available("turbo_function calls"),
770            persistence,
771            self,
772        ))
773    }
774
775    pub fn dynamic_call(
776        &self,
777        native_fn: &'static NativeFunction,
778        this: Option<RawVc>,
779        arg: &mut dyn StackDynTaskInputs,
780        persistence: TaskPersistence,
781    ) -> RawVc {
782        if this.is_none_or(|this| this.is_resolved())
783            && native_fn.arg_meta.is_resolved(arg.as_ref())
784        {
785            return self.native_call(native_fn, this, arg, persistence);
786        }
787        // Need async resolution — must move the arg to the heap now
788        let arg = arg.take_box();
789        let task_type = LocalTaskSpec {
790            task_type: LocalTaskType::ResolveNative { native_fn },
791            this,
792            arg,
793        };
794        self.schedule_local_task(task_type, persistence)
795    }
796
797    pub fn trait_call(
798        &self,
799        trait_method: &'static TraitMethod,
800        this: RawVc,
801        arg: &mut dyn StackDynTaskInputs,
802        persistence: TaskPersistence,
803    ) -> RawVc {
804        // avoid creating a wrapper task if self is already resolved
805        // for resolved cells we already know the value type so we can lookup the
806        // function
807        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
808            match registry::get_value_type(type_id).get_trait_method(trait_method) {
809                Some(native_fn) => {
810                    if let Some(filter) = native_fn.arg_meta.filter_owned {
811                        let mut arg = (filter)(arg);
812                        return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
813                    } else {
814                        return self.dynamic_call(native_fn, Some(this), arg, persistence);
815                    }
816                }
817                None => {
818                    // We are destined to fail at this point, but we just retry resolution in the
819                    // local task since we cannot report an error from here.
820                    // TODO: A panic seems appropriate since the immediate caller is to blame
821                }
822            }
823        }
824
825        // create a wrapper task to resolve all inputs
826        let task_type = LocalTaskSpec {
827            task_type: LocalTaskType::ResolveTrait { trait_method },
828            this: Some(this),
829            arg: arg.take_box(),
830        };
831
832        self.schedule_local_task(task_type, persistence)
833    }
834
835    #[track_caller]
836    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
837        self.begin_foreground_job();
838        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
839
840        self.priority_runner.schedule(
841            &self.pin(),
842            ScheduledTask::Task {
843                task_id,
844                span: Span::current(),
845            },
846            priority,
847        );
848    }
849
850    fn schedule_local_task(
851        &self,
852        ty: LocalTaskSpec,
853        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
854        persistence: TaskPersistence,
855    ) -> RawVc {
856        let task_type = ty.task_type;
857        let (global_task_state, execution_id, priority, local_task_id) =
858            CURRENT_TASK_STATE.with(|gts| {
859                let mut gts_write = gts.write().unwrap();
860                let local_task_id = gts_write.local_tasks.create(task_type);
861                (
862                    Arc::clone(gts),
863                    gts_write.execution_id,
864                    gts_write.priority,
865                    local_task_id,
866                )
867            });
868
869        self.priority_runner.schedule(
870            &self.pin(),
871            ScheduledTask::LocalTask {
872                ty,
873                persistence,
874                local_task_id,
875                global_task_state,
876                span: Span::current(),
877            },
878            priority,
879        );
880
881        RawVc::LocalOutput(execution_id, local_task_id, persistence)
882    }
883
884    fn begin_foreground_job(&self) {
885        if self
886            .currently_scheduled_foreground_jobs
887            .fetch_add(1, Ordering::AcqRel)
888            == 0
889        {
890            *self.start.lock().unwrap() = Some(Instant::now());
891            self.event_foreground_start.notify(usize::MAX);
892            self.backend.idle_end(self);
893        }
894    }
895
896    fn finish_foreground_job(&self) {
897        if self
898            .currently_scheduled_foreground_jobs
899            .fetch_sub(1, Ordering::AcqRel)
900            == 1
901        {
902            self.backend.idle_start(self);
903            // That's not super race-condition-safe, but it's only for
904            // statistical reasons
905            let total = self.scheduled_tasks.load(Ordering::Acquire);
906            self.scheduled_tasks.store(0, Ordering::Release);
907            if let Some(start) = *self.start.lock().unwrap() {
908                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
909                if let Some(update) = update.as_mut() {
910                    update.0 += start.elapsed();
911                    update.1 += total;
912                } else {
913                    *update = Some((start.elapsed(), total));
914                }
915            }
916            self.event_foreground_done.notify(usize::MAX);
917        }
918    }
919
920    fn begin_background_job(&self) {
921        self.currently_scheduled_background_jobs
922            .fetch_add(1, Ordering::Relaxed);
923    }
924
925    fn finish_background_job(&self) {
926        if self
927            .currently_scheduled_background_jobs
928            .fetch_sub(1, Ordering::Relaxed)
929            == 1
930        {
931            self.event_background_done.notify(usize::MAX);
932        }
933    }
934
935    pub fn get_in_progress_count(&self) -> usize {
936        self.currently_scheduled_foreground_jobs
937            .load(Ordering::Acquire)
938    }
939
940    /// Waits for the given task to finish executing. This works by performing an untracked read,
941    /// and discarding the value of the task output.
942    ///
943    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
944    /// before all dependencies have completely settled.
945    ///
946    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
947    /// to fully settle before returning.
948    ///
949    /// As this function is typically called in top-level code that waits for results to be ready
950    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
951    pub async fn wait_task_completion(
952        &self,
953        id: TaskId,
954        consistency: ReadConsistency,
955    ) -> Result<()> {
956        read_task_output(
957            self,
958            id,
959            ReadOutputOptions {
960                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
961                tracking: ReadTracking::Untracked,
962                consistency,
963            },
964        )
965        .await?;
966        Ok(())
967    }
968
969    /// Returns [UpdateInfo] with all updates aggregated over a given duration
970    /// (`aggregation`). Will wait until an update happens.
971    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
972        self.aggregated_update_info(aggregation, Duration::MAX)
973            .await
974            .unwrap()
975    }
976
977    /// Returns [UpdateInfo] with all updates aggregated over a given duration
978    /// (`aggregation`). Will only return None when the timeout is reached while
979    /// waiting for the first update.
980    pub async fn aggregated_update_info(
981        &self,
982        aggregation: Duration,
983        timeout: Duration,
984    ) -> Option<UpdateInfo> {
985        let listener = self
986            .event_foreground_done
987            .listen_with_note(|| || "wait for update info".to_string());
988        let wait_for_finish = {
989            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
990            if aggregation.is_zero() {
991                if let Some((duration, tasks)) = update.take() {
992                    return Some(UpdateInfo {
993                        duration,
994                        tasks,
995                        reasons: take(reason_set),
996                        placeholder_for_future_fields: (),
997                    });
998                } else {
999                    true
1000                }
1001            } else {
1002                update.is_none()
1003            }
1004        };
1005        if wait_for_finish {
1006            if timeout == Duration::MAX {
1007                // wait for finish
1008                listener.await;
1009            } else {
1010                // wait for start, then wait for finish or timeout
1011                let start_listener = self
1012                    .event_foreground_start
1013                    .listen_with_note(|| || "wait for update info".to_string());
1014                if self
1015                    .currently_scheduled_foreground_jobs
1016                    .load(Ordering::Acquire)
1017                    == 0
1018                {
1019                    start_listener.await;
1020                } else {
1021                    drop(start_listener);
1022                }
1023                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1024                    // Timeout
1025                    return None;
1026                }
1027            }
1028        }
1029        if !aggregation.is_zero() {
1030            loop {
1031                select! {
1032                    () = tokio::time::sleep(aggregation) => {
1033                        break;
1034                    }
1035                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1036                        // Resets the sleep
1037                    }
1038                }
1039            }
1040        }
1041        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1042        if let Some((duration, tasks)) = update.take() {
1043            Some(UpdateInfo {
1044                duration,
1045                tasks,
1046                reasons: take(reason_set),
1047                placeholder_for_future_fields: (),
1048            })
1049        } else {
1050            panic!("aggregated_update_info must not called concurrently")
1051        }
1052    }
1053
1054    pub async fn wait_background_done(&self) {
1055        let listener = self.event_background_done.listen();
1056        if self
1057            .currently_scheduled_background_jobs
1058            .load(Ordering::Acquire)
1059            != 0
1060        {
1061            listener.await;
1062        }
1063    }
1064
1065    pub async fn stop_and_wait(&self) {
1066        turbo_tasks_future_scope(self.pin(), async move {
1067            self.backend.stopping(self);
1068            self.stopped.store(true, Ordering::Release);
1069            {
1070                let listener = self
1071                    .event_foreground_done
1072                    .listen_with_note(|| || "wait for stop".to_string());
1073                if self
1074                    .currently_scheduled_foreground_jobs
1075                    .load(Ordering::Acquire)
1076                    != 0
1077                {
1078                    listener.await;
1079                }
1080            }
1081            {
1082                let listener = self.event_background_done.listen();
1083                if self
1084                    .currently_scheduled_background_jobs
1085                    .load(Ordering::Acquire)
1086                    != 0
1087                {
1088                    listener.await;
1089                }
1090            }
1091            self.backend.stop(self);
1092        })
1093        .await;
1094    }
1095
1096    #[track_caller]
1097    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1098    where
1099        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1100        T::CallOnceFuture: Send,
1101    {
1102        let mut this = self.pin();
1103        this.begin_foreground_job();
1104        tokio::spawn(
1105            TURBO_TASKS
1106                .scope(this.clone(), async move {
1107                    if !this.stopped.load(Ordering::Acquire) {
1108                        this = func(this.clone()).await;
1109                    }
1110                    this.finish_foreground_job();
1111                })
1112                .in_current_span(),
1113        );
1114    }
1115
1116    #[track_caller]
1117    pub(crate) fn schedule_background_job<T>(&self, func: T)
1118    where
1119        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1120        T::CallOnceFuture: Send,
1121    {
1122        let mut this = self.pin();
1123        self.begin_background_job();
1124        tokio::spawn(
1125            TURBO_TASKS
1126                .scope(this.clone(), async move {
1127                    if !this.stopped.load(Ordering::Acquire) {
1128                        this = func(this).await;
1129                    }
1130                    this.finish_background_job();
1131                })
1132                .in_current_span(),
1133        );
1134    }
1135
1136    fn finish_current_task_state(&self) -> FinishedTaskState {
1137        CURRENT_TASK_STATE.with(|cell| {
1138            let current_task_state = &*cell.write().unwrap();
1139            FinishedTaskState {
1140                #[cfg(feature = "verify_determinism")]
1141                stateful: current_task_state.stateful,
1142                has_invalidator: current_task_state.has_invalidator,
1143            }
1144        })
1145    }
1146
1147    pub fn backend(&self) -> &B {
1148        &self.backend
1149    }
1150}
1151
1152struct TurboTasksExecutor;
1153
1154/// Run a future and abort the process if a panic is reported
1155///
1156/// Turbtasks catches panics from user code and propagates throught the task tree, but if it happens
1157/// as part of state management we have to abort
1158async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1159    match AssertUnwindSafe(f).catch_unwind().await {
1160        Ok(r) => r,
1161        Err(_) => {
1162            eprintln!(
1163                "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1164                 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1165                 https://github.com/vercel/next.js/discussions and include the panic message \
1166                 and stack trace above.\n\nAborting."
1167            );
1168            abort();
1169        }
1170    }
1171}
1172
1173impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1174    type Future = impl Future<Output = ()> + Send + 'static;
1175
1176    fn execute(
1177        &self,
1178        this: &Arc<TurboTasks<B>>,
1179        scheduled_task: ScheduledTask,
1180        priority: TaskPriority,
1181    ) -> Self::Future {
1182        match scheduled_task {
1183            ScheduledTask::Task { task_id, span } => {
1184                let this2 = this.clone();
1185                let this = this.clone();
1186                let future = async move {
1187                    abort_on_panic(async {
1188                        let mut schedule_again = true;
1189                        while schedule_again {
1190                            // it's okay for execution ids to overflow and wrap, they're just used
1191                            // for an assert
1192                            let execution_id = this.execution_id_factory.wrapping_get();
1193                            let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1194                                task_id,
1195                                execution_id,
1196                                priority,
1197                                false, // in_top_level_task
1198                            )));
1199                            let single_execution_future = async {
1200                                if this.stopped.load(Ordering::Acquire) {
1201                                    this.backend.task_execution_canceled(task_id, &*this);
1202                                    return false;
1203                                }
1204
1205                                let Some(TaskExecutionSpec { future, span }) = this
1206                                    .backend
1207                                    .try_start_task_execution(task_id, priority, &*this)
1208                                else {
1209                                    return false;
1210                                };
1211
1212                                async {
1213                                    let result = CaptureFuture::new(future).await;
1214
1215                                    // wait for all spawned local tasks using `local` to finish
1216                                    wait_for_local_tasks().await;
1217
1218                                    let result = match result {
1219                                        Ok(Ok(raw_vc)) => {
1220                                            // This is safe because we waited for all local tasks to
1221                                            // complete above
1222                                            raw_vc
1223                                                .to_non_local_unchecked_sync(&*this)
1224                                                .map_err(|err| err.into())
1225                                        }
1226                                        Ok(Err(err)) => Err(err.into()),
1227                                        Err(err) => {
1228                                            Err(TurboTasksExecutionError::Panic(Arc::new(err)))
1229                                        }
1230                                    };
1231
1232                                    let finished_state = this.finish_current_task_state();
1233                                    let cell_counters = CURRENT_TASK_STATE.with(|ts| {
1234                                        ts.write().unwrap().cell_counters.take().unwrap()
1235                                    });
1236                                    this.backend.task_execution_completed(
1237                                        task_id,
1238                                        result,
1239                                        &cell_counters,
1240                                        #[cfg(feature = "verify_determinism")]
1241                                        finished_state.stateful,
1242                                        finished_state.has_invalidator,
1243                                        &*this,
1244                                    )
1245                                }
1246                                .instrument(span)
1247                                .await
1248                            };
1249                            schedule_again = CURRENT_TASK_STATE
1250                                .scope(current_task_state, single_execution_future)
1251                                .await;
1252                        }
1253                        this.finish_foreground_job();
1254                    })
1255                    .await
1256                };
1257
1258                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1259            }
1260            ScheduledTask::LocalTask {
1261                ty,
1262                persistence,
1263                local_task_id,
1264                global_task_state,
1265                span,
1266            } => {
1267                let this2 = this.clone();
1268                let this = this.clone();
1269                let task_type = ty.task_type;
1270                let future = async move {
1271                    let span = match &ty.task_type {
1272                        LocalTaskType::ResolveNative { native_fn } => {
1273                            native_fn.resolve_span(priority)
1274                        }
1275                        LocalTaskType::ResolveTrait { trait_method } => {
1276                            trait_method.resolve_span(priority)
1277                        }
1278                    };
1279                    abort_on_panic(
1280                        async move {
1281                            let result = match ty.task_type {
1282                                LocalTaskType::ResolveNative { native_fn } => {
1283                                    LocalTaskType::run_resolve_native(
1284                                        native_fn,
1285                                        ty.this,
1286                                        &*ty.arg,
1287                                        persistence,
1288                                        this,
1289                                    )
1290                                    .await
1291                                }
1292                                LocalTaskType::ResolveTrait { trait_method } => {
1293                                    LocalTaskType::run_resolve_trait(
1294                                        trait_method,
1295                                        ty.this.unwrap(),
1296                                        &*ty.arg,
1297                                        persistence,
1298                                        this,
1299                                    )
1300                                    .await
1301                                }
1302                            };
1303
1304                            let output = match result {
1305                                Ok(raw_vc) => OutputContent::Link(raw_vc),
1306                                Err(err) => OutputContent::Error(
1307                                    TurboTasksExecutionError::from(err)
1308                                        .with_local_task_context(task_type.to_string()),
1309                                ),
1310                            };
1311
1312                            CURRENT_TASK_STATE.with(move |gts| {
1313                                gts.write()
1314                                    .unwrap()
1315                                    .local_tasks
1316                                    .complete(local_task_id, output);
1317                            });
1318                        }
1319                        .instrument(span),
1320                    )
1321                    .await
1322                };
1323                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1324
1325                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1326            }
1327        }
1328    }
1329}
1330
1331struct FinishedTaskState {
1332    /// True if the task has state in cells (interior mutability).
1333    /// Only tracked when verify_determinism feature is enabled.
1334    #[cfg(feature = "verify_determinism")]
1335    stateful: bool,
1336
1337    /// True if the task uses an external invalidator
1338    has_invalidator: bool,
1339}
1340
1341impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1342    fn dynamic_call(
1343        &self,
1344        native_fn: &'static NativeFunction,
1345        this: Option<RawVc>,
1346        arg: &mut dyn StackDynTaskInputs,
1347        persistence: TaskPersistence,
1348    ) -> RawVc {
1349        self.dynamic_call(native_fn, this, arg, persistence)
1350    }
1351    fn native_call(
1352        &self,
1353        native_fn: &'static NativeFunction,
1354        this: Option<RawVc>,
1355        arg: &mut dyn StackDynTaskInputs,
1356        persistence: TaskPersistence,
1357    ) -> RawVc {
1358        self.native_call(native_fn, this, arg, persistence)
1359    }
1360    fn trait_call(
1361        &self,
1362        trait_method: &'static TraitMethod,
1363        this: RawVc,
1364        arg: &mut dyn StackDynTaskInputs,
1365        persistence: TaskPersistence,
1366    ) -> RawVc {
1367        self.trait_call(trait_method, this, arg, persistence)
1368    }
1369
1370    #[track_caller]
1371    fn run(
1372        &self,
1373        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1374    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1375        let this = self.pin();
1376        Box::pin(async move { this.run(future).await })
1377    }
1378
1379    #[track_caller]
1380    fn run_once(
1381        &self,
1382        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1383    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1384        let this = self.pin();
1385        Box::pin(async move { this.run_once(future).await })
1386    }
1387
1388    #[track_caller]
1389    fn run_once_with_reason(
1390        &self,
1391        reason: StaticOrArc<dyn InvalidationReason>,
1392        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1393    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1394        {
1395            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1396            reason_set.insert(reason);
1397        }
1398        let this = self.pin();
1399        Box::pin(async move { this.run_once(future).await })
1400    }
1401
1402    #[track_caller]
1403    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1404        self.start_once_process(future)
1405    }
1406
1407    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1408        if let Err(e) = self.compilation_events.send(event) {
1409            tracing::warn!("Failed to send compilation event: {e}");
1410        }
1411    }
1412
1413    fn get_task_name(&self, task: TaskId) -> String {
1414        self.backend.get_task_name(task, self)
1415    }
1416}
1417
1418impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1419    #[instrument(level = "info", skip_all, name = "invalidate")]
1420    fn invalidate(&self, task: TaskId) {
1421        self.backend.invalidate_task(task, self);
1422    }
1423
1424    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1425    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1426        {
1427            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1428            reason_set.insert(reason);
1429        }
1430        self.backend.invalidate_task(task, self);
1431    }
1432
1433    fn invalidate_serialization(&self, task: TaskId) {
1434        self.backend.invalidate_serialization(task, self);
1435    }
1436
1437    #[track_caller]
1438    fn try_read_task_output(
1439        &self,
1440        task: TaskId,
1441        options: ReadOutputOptions,
1442    ) -> Result<Result<RawVc, EventListener>> {
1443        if options.consistency == ReadConsistency::Eventual {
1444            debug_assert_not_in_top_level_task("read_task_output");
1445        }
1446        self.backend.try_read_task_output(
1447            task,
1448            current_task_if_available("reading Vcs"),
1449            options,
1450            self,
1451        )
1452    }
1453
1454    #[track_caller]
1455    fn try_read_task_cell(
1456        &self,
1457        task: TaskId,
1458        index: CellId,
1459        options: ReadCellOptions,
1460    ) -> Result<Result<TypedCellContent, EventListener>> {
1461        let reader = current_task_if_available("reading Vcs");
1462        self.backend
1463            .try_read_task_cell(task, index, reader, options, self)
1464    }
1465
1466    fn try_read_own_task_cell(
1467        &self,
1468        current_task: TaskId,
1469        index: CellId,
1470    ) -> Result<TypedCellContent> {
1471        self.backend
1472            .try_read_own_task_cell(current_task, index, self)
1473    }
1474
1475    #[track_caller]
1476    fn try_read_local_output(
1477        &self,
1478        execution_id: ExecutionId,
1479        local_task_id: LocalTaskId,
1480    ) -> Result<Result<RawVc, EventListener>> {
1481        debug_assert_not_in_top_level_task("read_local_output");
1482        CURRENT_TASK_STATE.with(|gts| {
1483            let gts_read = gts.read().unwrap();
1484
1485            // Local Vcs are local to their parent task's current execution, and do not exist
1486            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1487            // marker trait. This assertion exists to handle any potential escapes that the
1488            // compile-time checks cannot capture.
1489            gts_read.assert_execution_id(execution_id);
1490
1491            match gts_read.local_tasks.get(local_task_id) {
1492                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1493                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1494            }
1495        })
1496    }
1497
1498    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1499        // TODO: Add assert_not_in_top_level_task("read_task_collectibles") check here.
1500        // Collectible reads are eventually consistent.
1501        self.backend.read_task_collectibles(
1502            task,
1503            trait_id,
1504            current_task_if_available("reading collectibles"),
1505            self,
1506        )
1507    }
1508
1509    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1510        self.backend.emit_collectible(
1511            trait_type,
1512            collectible,
1513            current_task("emitting collectible"),
1514            self,
1515        );
1516    }
1517
1518    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1519        self.backend.unemit_collectible(
1520            trait_type,
1521            collectible,
1522            count,
1523            current_task("emitting collectible"),
1524            self,
1525        );
1526    }
1527
1528    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1529        for (&collectible, &count) in collectibles {
1530            if count > 0 {
1531                self.backend.unemit_collectible(
1532                    trait_type,
1533                    collectible,
1534                    count as u32,
1535                    current_task("emitting collectible"),
1536                    self,
1537                );
1538            }
1539        }
1540    }
1541
1542    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1543        self.try_read_own_task_cell(task, index)
1544    }
1545
1546    fn update_own_task_cell(
1547        &self,
1548        task: TaskId,
1549        index: CellId,
1550        content: CellContent,
1551        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1552        content_hash: Option<CellHash>,
1553        verification_mode: VerificationMode,
1554    ) {
1555        self.backend.update_task_cell(
1556            task,
1557            index,
1558            content,
1559            updated_key_hashes,
1560            content_hash,
1561            verification_mode,
1562            self,
1563        );
1564    }
1565
1566    fn connect_task(&self, task: TaskId) {
1567        self.backend
1568            .connect_task(task, current_task_if_available("connecting task"), self);
1569    }
1570
1571    fn mark_own_task_as_finished(&self, task: TaskId) {
1572        self.backend.mark_own_task_as_finished(task, self);
1573    }
1574
1575    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1576        self.backend.mark_own_task_as_session_dependent(task, self);
1577    }
1578
1579    /// Creates a future that inherits the current task id and task state. The current global task
1580    /// will wait for this future to be dropped before exiting.
1581    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1582        // this is similar to what happens for a local task, except that we keep the local task's
1583        // state as well.
1584        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1585        global_task_state
1586            .write()
1587            .unwrap()
1588            .local_tasks
1589            .register_detached();
1590        let wrapped = async move {
1591            // use a drop guard for panic safety
1592            struct DropGuard;
1593            impl Drop for DropGuard {
1594                fn drop(&mut self) {
1595                    CURRENT_TASK_STATE
1596                        .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1597                }
1598            }
1599            let _guard = DropGuard;
1600            fut.await;
1601        };
1602        tokio::spawn(TURBO_TASKS.scope(
1603            turbo_tasks(),
1604            CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1605        ));
1606    }
1607
1608    fn task_statistics(&self) -> &TaskStatisticsApi {
1609        self.backend.task_statistics()
1610    }
1611
1612    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1613        let this = self.pin();
1614        Box::pin(async move {
1615            this.stop_and_wait().await;
1616        })
1617    }
1618
1619    fn subscribe_to_compilation_events(
1620        &self,
1621        event_types: Option<Vec<String>>,
1622    ) -> Receiver<Arc<dyn CompilationEvent>> {
1623        self.compilation_events.subscribe(event_types)
1624    }
1625
1626    fn is_tracking_dependencies(&self) -> bool {
1627        self.backend.is_tracking_dependencies()
1628    }
1629}
1630
1631impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1632    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1633        self.pin()
1634    }
1635    fn backend(&self) -> &B {
1636        &self.backend
1637    }
1638
1639    #[track_caller]
1640    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1641        self.schedule_background_job(async move |this| {
1642            this.backend.run_backend_job(job, &*this).await;
1643            this
1644        })
1645    }
1646
1647    #[track_caller]
1648    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1649        self.schedule_foreground_job(async move |this| {
1650            this.backend.run_backend_job(job, &*this).await;
1651            this
1652        })
1653    }
1654
1655    #[track_caller]
1656    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1657        self.schedule(task, priority)
1658    }
1659
1660    fn get_current_task_priority(&self) -> TaskPriority {
1661        CURRENT_TASK_STATE
1662            .try_with(|task_state| task_state.read().unwrap().priority)
1663            .unwrap_or(TaskPriority::initial())
1664    }
1665
1666    fn program_duration_until(&self, instant: Instant) -> Duration {
1667        instant - self.program_start
1668    }
1669
1670    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1671        // SAFETY: This is a fresh id from the factory
1672        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1673    }
1674
1675    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1676        // SAFETY: This is a fresh id from the factory
1677        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1678    }
1679
1680    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1681        unsafe { self.task_id_factory.reuse(id.into()) }
1682    }
1683
1684    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1685        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1686    }
1687
1688    fn is_idle(&self) -> bool {
1689        self.currently_scheduled_foreground_jobs
1690            .load(Ordering::Acquire)
1691            == 0
1692    }
1693}
1694
1695async fn wait_for_local_tasks() {
1696    let listener =
1697        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1698    let Some(listener) = listener else {
1699        return;
1700    };
1701    listener.await;
1702}
1703
1704pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1705    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1706        Ok(id) => id,
1707        Err(_) => panic!(
1708            "{from} can only be used in the context of a turbo_tasks task execution or \
1709             turbo_tasks run"
1710        ),
1711    }
1712}
1713
1714pub(crate) fn current_task(from: &str) -> TaskId {
1715    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1716        Ok(Some(id)) => id,
1717        Ok(None) | Err(_) => {
1718            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1719        }
1720    }
1721}
1722
1723/// Panics if we're not in a top-level task (e.g. [`run_once`]). Some function calls should only
1724/// happen in a top-level task (e.g. [`Effects::apply`][crate::Effects::apply]).
1725#[track_caller]
1726pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1727    if !cfg!(debug_assertions) {
1728        return;
1729    }
1730
1731    let in_top_level = CURRENT_TASK_STATE
1732        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1733        .unwrap_or(true);
1734    if !in_top_level {
1735        panic!("{message}");
1736    }
1737}
1738
1739#[track_caller]
1740pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1741    if !cfg!(debug_assertions) {
1742        return;
1743    }
1744
1745    // HACK: We set this inside of `ReadRawVcFuture` to suppress warnings about an internal
1746    // consistency bug
1747    let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1748        .try_with(|&suppressed| suppressed)
1749        .unwrap_or(false);
1750    if suppressed {
1751        return;
1752    }
1753
1754    let in_top_level = CURRENT_TASK_STATE
1755        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1756        .unwrap_or(false);
1757    if in_top_level {
1758        panic!(
1759            "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1760             Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1761             reads to avoid leaking inconsistent return values."
1762        );
1763    }
1764}
1765
1766pub async fn run<T: Send + 'static>(
1767    tt: Arc<dyn TurboTasksApi>,
1768    future: impl Future<Output = Result<T>> + Send + 'static,
1769) -> Result<T> {
1770    let (tx, rx) = tokio::sync::oneshot::channel();
1771
1772    tt.run(Box::pin(async move {
1773        let result = future.await?;
1774        tx.send(result)
1775            .map_err(|_| anyhow!("unable to send result"))?;
1776        Ok(())
1777    }))
1778    .await?;
1779
1780    Ok(rx.await?)
1781}
1782
1783pub async fn run_once<T: Send + 'static>(
1784    tt: Arc<dyn TurboTasksApi>,
1785    future: impl Future<Output = Result<T>> + Send + 'static,
1786) -> Result<T> {
1787    let (tx, rx) = tokio::sync::oneshot::channel();
1788
1789    tt.run_once(Box::pin(async move {
1790        let result = future.await?;
1791        tx.send(result)
1792            .map_err(|_| anyhow!("unable to send result"))?;
1793        Ok(())
1794    }))
1795    .await?;
1796
1797    Ok(rx.await?)
1798}
1799
1800pub async fn run_once_with_reason<T: Send + 'static>(
1801    tt: Arc<dyn TurboTasksApi>,
1802    reason: impl InvalidationReason,
1803    future: impl Future<Output = Result<T>> + Send + 'static,
1804) -> Result<T> {
1805    let (tx, rx) = tokio::sync::oneshot::channel();
1806
1807    tt.run_once_with_reason(
1808        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1809        Box::pin(async move {
1810            let result = future.await?;
1811            tx.send(result)
1812                .map_err(|_| anyhow!("unable to send result"))?;
1813            Ok(())
1814        }),
1815    )
1816    .await?;
1817
1818    Ok(rx.await?)
1819}
1820
1821/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1822pub fn dynamic_call(
1823    func: &'static NativeFunction,
1824    this: Option<RawVc>,
1825    arg: &mut dyn StackDynTaskInputs,
1826    persistence: TaskPersistence,
1827) -> RawVc {
1828    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1829}
1830
1831/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1832pub fn trait_call(
1833    trait_method: &'static TraitMethod,
1834    this: RawVc,
1835    arg: &mut dyn StackDynTaskInputs,
1836    persistence: TaskPersistence,
1837) -> RawVc {
1838    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1839}
1840
1841pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1842    TURBO_TASKS.with(|arc| arc.clone())
1843}
1844
1845pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1846    TURBO_TASKS.with(Arc::downgrade)
1847}
1848
1849pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1850    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1851}
1852
1853pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1854    TURBO_TASKS.with(|arc| func(arc))
1855}
1856
1857pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1858    TURBO_TASKS.sync_scope(tt, f)
1859}
1860
1861pub fn turbo_tasks_future_scope<T>(
1862    tt: Arc<dyn TurboTasksApi>,
1863    f: impl Future<Output = T>,
1864) -> impl Future<Output = T> {
1865    TURBO_TASKS.scope(tt, f)
1866}
1867
1868pub fn with_turbo_tasks_for_testing<T>(
1869    tt: Arc<dyn TurboTasksApi>,
1870    current_task: TaskId,
1871    execution_id: ExecutionId,
1872    f: impl Future<Output = T>,
1873) -> impl Future<Output = T> {
1874    TURBO_TASKS.scope(
1875        tt,
1876        CURRENT_TASK_STATE.scope(
1877            Arc::new(RwLock::new(CurrentTaskState::new(
1878                current_task,
1879                execution_id,
1880                TaskPriority::initial(),
1881                false, // in_top_level_task
1882            ))),
1883            f,
1884        ),
1885    )
1886}
1887
1888/// Spawns the given future within the context of the current task.
1889///
1890/// Beware: this method is not safe to use in production code. It is only
1891/// intended for use in tests and for debugging purposes.
1892pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1893    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1894}
1895
1896pub fn current_task_for_testing() -> Option<TaskId> {
1897    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1898}
1899
1900/// Marks the current task as dirty when restored from filesystem cache.
1901pub fn mark_session_dependent() {
1902    with_turbo_tasks(|tt| {
1903        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1904    });
1905}
1906
1907/// Marks the current task as finished. This excludes it from waiting for
1908/// strongly consistency.
1909pub fn mark_finished() {
1910    with_turbo_tasks(|tt| {
1911        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1912    });
1913}
1914
1915/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1916/// serialization of the current task cells.
1917///
1918/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1919/// since State allocation implies interior mutability.
1920pub fn get_serialization_invalidator() -> SerializationInvalidator {
1921    CURRENT_TASK_STATE.with(|cell| {
1922        let CurrentTaskState {
1923            task_id,
1924            #[cfg(feature = "verify_determinism")]
1925            stateful,
1926            ..
1927        } = &mut *cell.write().unwrap();
1928        #[cfg(feature = "verify_determinism")]
1929        {
1930            *stateful = true;
1931        }
1932        let Some(task_id) = *task_id else {
1933            panic!(
1934                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1935                 task execution"
1936            );
1937        };
1938        SerializationInvalidator::new(task_id)
1939    })
1940}
1941
1942pub fn mark_invalidator() {
1943    CURRENT_TASK_STATE.with(|cell| {
1944        let CurrentTaskState {
1945            has_invalidator, ..
1946        } = &mut *cell.write().unwrap();
1947        *has_invalidator = true;
1948    })
1949}
1950
1951/// Marks the current task as stateful. This is used to indicate that the task
1952/// has interior mutability (e.g., via [`State`][crate::State]), which means
1953/// the task may produce different outputs even with the same inputs.
1954///
1955/// Only has an effect when the `verify_determinism` feature is enabled.
1956pub fn mark_stateful() {
1957    #[cfg(feature = "verify_determinism")]
1958    {
1959        CURRENT_TASK_STATE.with(|cell| {
1960            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1961            *stateful = true;
1962        })
1963    }
1964    // No-op when verify_determinism is not enabled
1965}
1966
1967/// Marks the current task context as being in a top-level task. When in a top-level task,
1968/// eventually consistent reads will panic. It is almost always a mistake to perform an eventually
1969/// consistent read at the top-level of the application.
1970pub fn mark_top_level_task() {
1971    if cfg!(debug_assertions) {
1972        CURRENT_TASK_STATE.with(|cell| {
1973            cell.write().unwrap().in_top_level_task = true;
1974        })
1975    }
1976}
1977
1978/// Unmarks the current task context as being in a top-level task. The opposite of
1979/// [`mark_top_level_task`].
1980///
1981/// This utility can be okay in unit tests, where we're observing the internal behavior of
1982/// turbo-tasks, but otherwise, it is probably a mistake to call this function.
1983///
1984/// Calling this will allow eventually-consistent reads at the top-level, potentially exposing
1985/// incomplete computations and internal errors caused by eventual consistency that would've been
1986/// caught when the function was re-run. A strongly-consistent read re-runs parts of a task until
1987/// all of the dependencies have settled.
1988pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1989    if cfg!(debug_assertions) {
1990        CURRENT_TASK_STATE.with(|cell| {
1991            cell.write().unwrap().in_top_level_task = false;
1992        })
1993    }
1994}
1995
1996pub fn prevent_gc() {
1997    // TODO implement garbage collection
1998}
1999
2000pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
2001    with_turbo_tasks(|tt| {
2002        let raw_vc = collectible.node.node;
2003        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
2004    })
2005}
2006
2007pub(crate) async fn read_task_output(
2008    this: &dyn TurboTasksApi,
2009    id: TaskId,
2010    options: ReadOutputOptions,
2011) -> Result<RawVc> {
2012    loop {
2013        match this.try_read_task_output(id, options)? {
2014            Ok(result) => return Ok(result),
2015            Err(listener) => listener.await,
2016        }
2017    }
2018}
2019
2020/// A reference to a task's cell with methods that allow updating the contents
2021/// of the cell.
2022///
2023/// Mutations should not outside of the task that that owns this cell. Doing so
2024/// is a logic error, and may lead to incorrect caching behavior.
2025#[derive(Clone, Copy)]
2026pub struct CurrentCellRef {
2027    current_task: TaskId,
2028    index: CellId,
2029}
2030
2031type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2032
2033impl CurrentCellRef {
2034    /// Updates the cell if the given `functor` returns a value.
2035    fn conditional_update<T>(
2036        &self,
2037        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2038    ) where
2039        T: VcValueType,
2040    {
2041        self.conditional_update_with_shared_reference(|old_shared_reference| {
2042            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2043            let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2044            Some((
2045                SharedReference::new(triomphe::Arc::new(new_value)),
2046                updated_key_hashes,
2047                content_hash,
2048            ))
2049        })
2050    }
2051
2052    /// Updates the cell if the given `functor` returns a `SharedReference`.
2053    fn conditional_update_with_shared_reference(
2054        &self,
2055        functor: impl FnOnce(
2056            Option<&SharedReference>,
2057        ) -> Option<(
2058            SharedReference,
2059            Option<SmallVec<[u64; 2]>>,
2060            Option<CellHash>,
2061        )>,
2062    ) {
2063        let tt = turbo_tasks();
2064        let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2065        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2066        if let Some((update, updated_key_hashes, content_hash)) = update {
2067            tt.update_own_task_cell(
2068                self.current_task,
2069                self.index,
2070                CellContent(Some(update)),
2071                updated_key_hashes,
2072                content_hash,
2073                VerificationMode::EqualityCheck,
2074            )
2075        }
2076    }
2077
2078    /// Replace the current cell's content with `new_value` if the current content is not equal by
2079    /// value with the existing content.
2080    ///
2081    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
2082    ///
2083    /// Take this example of a custom equality implementation on a transparent wrapper type:
2084    ///
2085    /// ```
2086    /// #[turbo_tasks::value(transparent, eq = "manual")]
2087    /// struct Wrapper(Vec<u32>);
2088    ///
2089    /// impl PartialEq for Wrapper {
2090    ///     fn eq(&self, other: Wrapper) {
2091    ///         // Example: order doesn't matter for equality
2092    ///         let (mut this, mut other) = (self.clone(), other.clone());
2093    ///         this.sort_unstable();
2094    ///         other.sort_unstable();
2095    ///         this == other
2096    ///     }
2097    /// }
2098    ///
2099    /// impl Eq for Wrapper {}
2100    /// ```
2101    ///
2102    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2103    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2104    ///
2105    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2106    /// just forwards to the inner value's [`PartialEq`].
2107    ///
2108    /// If you already have a `SharedReference`, consider calling
2109    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2110    /// object.
2111    pub fn compare_and_update<T>(&self, new_value: T)
2112    where
2113        T: PartialEq + VcValueType,
2114    {
2115        self.conditional_update(|old_value| {
2116            if let Some(old_value) = old_value
2117                && old_value == &new_value
2118            {
2119                return None;
2120            }
2121            Some((new_value, None, None))
2122        });
2123    }
2124
2125    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2126    /// equal by value with the existing content.
2127    ///
2128    /// If you already have a `SharedReference`, this is a faster version of
2129    /// [`CurrentCellRef::compare_and_update`].
2130    ///
2131    /// The value should be stored in [`SharedReference`] using the type `T`.
2132    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2133    where
2134        T: VcValueType + PartialEq,
2135    {
2136        self.conditional_update_with_shared_reference(|old_sr| {
2137            if let Some(old_sr) = old_sr {
2138                let old_value = extract_sr_value::<T>(old_sr);
2139                let new_value = extract_sr_value::<T>(&new_shared_reference);
2140                if old_value == new_value {
2141                    return None;
2142                }
2143            }
2144            Some((new_shared_reference, None, None))
2145        });
2146    }
2147
2148    /// Replace the current cell's content if the new value is different.
2149    ///
2150    /// Like [`Self::compare_and_update`], but also computes and stores a hash of the value.
2151    /// When the cell's transient data is evicted, the stored hash enables the backend to detect
2152    /// whether the value actually changed without re-comparing values—avoiding unnecessary
2153    /// downstream invalidation.
2154    ///
2155    /// Requires `T: DeterministicHash` in addition to `T: PartialEq`.
2156    pub fn hashed_compare_and_update<T>(&self, new_value: T)
2157    where
2158        T: PartialEq + DeterministicHash + VcValueType,
2159    {
2160        self.conditional_update(|old_value| {
2161            if let Some(old_value) = old_value
2162                && old_value == &new_value
2163            {
2164                return None;
2165            }
2166            let content_hash = hash_xxh3_hash128(&new_value);
2167            Some((new_value, None, Some(content_hash)))
2168        });
2169    }
2170
2171    /// Replace the current cell's content if the new value (from a pre-existing
2172    /// [`SharedReference`]) is different.
2173    ///
2174    /// Like [`Self::compare_and_update_with_shared_reference`], but also passes a hash
2175    /// for hash-based change detection when transient data has been evicted.
2176    pub fn hashed_compare_and_update_with_shared_reference<T>(
2177        &self,
2178        new_shared_reference: SharedReference,
2179    ) where
2180        T: VcValueType + PartialEq + DeterministicHash,
2181    {
2182        self.conditional_update_with_shared_reference(move |old_sr| {
2183            if let Some(old_sr) = old_sr {
2184                let old_value = extract_sr_value::<T>(old_sr);
2185                let new_value = extract_sr_value::<T>(&new_shared_reference);
2186                if old_value == new_value {
2187                    return None;
2188                }
2189            }
2190            let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2191            Some((new_shared_reference, None, Some(content_hash)))
2192        });
2193    }
2194
2195    /// See [`Self::compare_and_update`], but selectively update individual keys.
2196    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2197    where
2198        T: PartialEq + VcValueType,
2199        VcReadTarget<T>: KeyedEq,
2200        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2201    {
2202        self.conditional_update(|old_value| {
2203            let Some(old_value) = old_value else {
2204                return Some((new_value, None, None));
2205            };
2206            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2207            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2208            let updated_keys = old_value.different_keys(new_value_ref);
2209            if updated_keys.is_empty() {
2210                return None;
2211            }
2212            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2213            let updated_key_hashes = updated_keys
2214                .into_iter()
2215                .map(|key| FxBuildHasher.hash_one(key))
2216                .collect();
2217            Some((new_value, Some(updated_key_hashes), None))
2218        });
2219    }
2220
2221    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2222    /// keys.
2223    pub fn keyed_compare_and_update_with_shared_reference<T>(
2224        &self,
2225        new_shared_reference: SharedReference,
2226    ) where
2227        T: VcValueType + PartialEq,
2228        VcReadTarget<T>: KeyedEq,
2229        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2230    {
2231        self.conditional_update_with_shared_reference(|old_sr| {
2232            let Some(old_sr) = old_sr else {
2233                return Some((new_shared_reference, None, None));
2234            };
2235            let old_value = extract_sr_value::<T>(old_sr);
2236            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2237            let new_value = extract_sr_value::<T>(&new_shared_reference);
2238            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2239            let updated_keys = old_value.different_keys(new_value);
2240            if updated_keys.is_empty() {
2241                return None;
2242            }
2243            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2244            let updated_key_hashes = updated_keys
2245                .into_iter()
2246                .map(|key| FxBuildHasher.hash_one(key))
2247                .collect();
2248            Some((new_shared_reference, Some(updated_key_hashes), None))
2249        });
2250    }
2251
2252    /// Unconditionally updates the content of the cell.
2253    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2254    where
2255        T: VcValueType,
2256    {
2257        let tt = turbo_tasks();
2258        tt.update_own_task_cell(
2259            self.current_task,
2260            self.index,
2261            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2262            None,
2263            None,
2264            verification_mode,
2265        )
2266    }
2267
2268    /// A faster version of [`Self::update`] if you already have a
2269    /// [`SharedReference`].
2270    ///
2271    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2272    /// by identity, no update is performed.
2273    ///
2274    /// The value should be stored in [`SharedReference`] using the type `T`.
2275    pub fn update_with_shared_reference(
2276        &self,
2277        shared_ref: SharedReference,
2278        verification_mode: VerificationMode,
2279    ) {
2280        let tt = turbo_tasks();
2281        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2282            let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2283            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2284                // pointer equality (not value equality)
2285                shared_ref_exp != shared_ref
2286            } else {
2287                true
2288            }
2289        } else {
2290            true
2291        };
2292        if update {
2293            tt.update_own_task_cell(
2294                self.current_task,
2295                self.index,
2296                CellContent(Some(shared_ref)),
2297                None,
2298                None,
2299                verification_mode,
2300            )
2301        }
2302    }
2303}
2304
2305impl From<CurrentCellRef> for RawVc {
2306    fn from(cell: CurrentCellRef) -> Self {
2307        RawVc::TaskCell(cell.current_task, cell.index)
2308    }
2309}
2310
2311fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2312    sr.0.downcast_ref::<T>()
2313        .expect("cannot update SharedReference of different type")
2314}
2315
2316pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2317    find_cell_by_id(T::get_value_type_id())
2318}
2319
2320pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2321    CURRENT_TASK_STATE.with(|ts| {
2322        let current_task = current_task("celling turbo_tasks values");
2323        let mut ts = ts.write().unwrap();
2324        let map = ts.cell_counters.as_mut().unwrap();
2325        let current_index = map.entry(ty).or_default();
2326        let index = *current_index;
2327        *current_index += 1;
2328        CurrentCellRef {
2329            current_task,
2330            index: CellId { type_id: ty, index },
2331        }
2332    })
2333}
2334
2335pub(crate) async fn read_local_output(
2336    this: &dyn TurboTasksApi,
2337    execution_id: ExecutionId,
2338    local_task_id: LocalTaskId,
2339) -> Result<RawVc> {
2340    loop {
2341        match this.try_read_local_output(execution_id, local_task_id)? {
2342            Ok(raw_vc) => return Ok(raw_vc),
2343            Err(event_listener) => event_listener.await,
2344        }
2345    }
2346}