Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    panic::AssertUnwindSafe,
8    pin::Pin,
9    process::abort,
10    sync::{
11        Arc, Mutex, RwLock, Weak,
12        atomic::{AtomicBool, AtomicUsize, Ordering},
13    },
14    time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
31    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
32    VcValueTrait, VcValueType,
33    backend::{
34        Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35        TurboTasksExecutionError, TypedCellContent, VerificationMode,
36    },
37    capture_future::CaptureFuture,
38    dyn_task_inputs::StackDynTaskInputs,
39    event::{Event, EventListener},
40    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
41    id_factory::IdFactoryWithReuse,
42    keyed::KeyedEq,
43    local_task_tracker::LocalTaskTracker,
44    macro_helpers::NativeFunction,
45    message_queue::{CompilationEvent, CompilationEventQueue},
46    priority_runner::{Executor, PriorityRunner},
47    raw_vc::{CellId, RawVc},
48    registry,
49    serialization_invalidation::SerializationInvalidator,
50    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
51    task_statistics::TaskStatisticsApi,
52    trace::TraceRawVcs,
53    util::{IdFactory, StaticOrArc},
54};
55
56/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
57/// tasks from function calls.
58pub trait TurboTasksCallApi: Sync + Send {
59    /// Calls a native function with arguments. Resolves arguments when needed
60    /// with a wrapper task.
61    fn dynamic_call(
62        &self,
63        native_fn: &'static NativeFunction,
64        this: Option<RawVc>,
65        arg: &mut dyn StackDynTaskInputs,
66        persistence: TaskPersistence,
67    ) -> RawVc;
68    /// Call a native function with arguments.
69    /// All inputs must be resolved.
70    fn native_call(
71        &self,
72        native_fn: &'static NativeFunction,
73        this: Option<RawVc>,
74        arg: &mut dyn StackDynTaskInputs,
75        persistence: TaskPersistence,
76    ) -> RawVc;
77    /// Calls a trait method with arguments. First input is the `self` object.
78    /// Uses a wrapper task to resolve
79    fn trait_call(
80        &self,
81        trait_method: &'static TraitMethod,
82        this: RawVc,
83        arg: &mut dyn StackDynTaskInputs,
84        persistence: TaskPersistence,
85    ) -> RawVc;
86
87    fn run(
88        &self,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
91    fn run_once(
92        &self,
93        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
95    fn run_once_with_reason(
96        &self,
97        reason: StaticOrArc<dyn InvalidationReason>,
98        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
99    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
100    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
101
102    /// Sends a compilation event to subscribers.
103    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
104
105    /// Returns a human-readable name for the given task.
106    fn get_task_name(&self, task: TaskId) -> String;
107}
108
109/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
110/// context. Returned by the [`turbo_tasks`] helper function.
111///
112/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
113/// parameter.
114pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
115    fn invalidate(&self, task: TaskId);
116    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
117
118    fn invalidate_serialization(&self, task: TaskId);
119
120    fn try_read_task_output(
121        &self,
122        task: TaskId,
123        options: ReadOutputOptions,
124    ) -> Result<Result<RawVc, EventListener>>;
125
126    fn try_read_task_cell(
127        &self,
128        task: TaskId,
129        index: CellId,
130        options: ReadCellOptions,
131    ) -> Result<Result<TypedCellContent, EventListener>>;
132
133    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
134    /// task points to.
135    ///
136    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
137    /// recursively or in a loop.
138    ///
139    /// This does not accept a consistency argument, as you cannot control consistency of a read of
140    /// an operation owned by your own task. Strongly consistent reads are only allowed on
141    /// [`OperationVc`]s, which should never be local tasks.
142    ///
143    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
144    /// task to depend on itself.
145    ///
146    /// [`OperationVc`]: crate::OperationVc
147    fn try_read_local_output(
148        &self,
149        execution_id: ExecutionId,
150        local_task_id: LocalTaskId,
151    ) -> Result<Result<RawVc, EventListener>>;
152
153    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
154
155    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
156    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
157    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
158
159    /// INVALIDATION: Be careful with this, it will not track dependencies, so
160    /// using it could break cache invalidation.
161    fn try_read_own_task_cell(
162        &self,
163        current_task: TaskId,
164        index: CellId,
165    ) -> Result<TypedCellContent>;
166
167    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
168    fn update_own_task_cell(
169        &self,
170        task: TaskId,
171        index: CellId,
172        content: CellContent,
173        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
174        content_hash: Option<CellHash>,
175        verification_mode: VerificationMode,
176    );
177    fn mark_own_task_as_finished(&self, task: TaskId);
178
179    fn connect_task(&self, task: TaskId);
180
181    /// Wraps the given future in the current task.
182    ///
183    /// Beware: this method is not safe to use in production code. It is only intended for use in
184    /// tests and for debugging purposes.
185    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
186
187    fn task_statistics(&self) -> &TaskStatisticsApi;
188
189    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
190
191    fn subscribe_to_compilation_events(
192        &self,
193        event_types: Option<Vec<String>>,
194    ) -> Receiver<Arc<dyn CompilationEvent>>;
195
196    // Returns true if TurboTasks is configured to track dependencies.
197    fn is_tracking_dependencies(&self) -> bool;
198}
199
200/// A wrapper around a value that is unused.
201pub struct Unused<T> {
202    inner: T,
203}
204
205impl<T> Unused<T> {
206    /// Creates a new unused value.
207    ///
208    /// # Safety
209    ///
210    /// The wrapped value must not be used.
211    pub unsafe fn new_unchecked(inner: T) -> Self {
212        Self { inner }
213    }
214
215    /// Get the inner value, without consuming the `Unused` wrapper.
216    ///
217    /// # Safety
218    ///
219    /// The user need to make sure that the value stays unused.
220    pub unsafe fn get_unchecked(&self) -> &T {
221        &self.inner
222    }
223
224    /// Unwraps the value, consuming the `Unused` wrapper.
225    pub fn into(self) -> T {
226        self.inner
227    }
228}
229
230/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
231pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
232    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
233
234    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
235    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
236    /// # Safety
237    ///
238    /// The caller must ensure that the task id is not used anymore.
239    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
240    /// # Safety
241    ///
242    /// The caller must ensure that the task id is not used anymore.
243    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
244
245    /// Schedule a task for execution.
246    fn schedule(&self, task: TaskId, priority: TaskPriority);
247
248    /// Returns the priority of the current task.
249    fn get_current_task_priority(&self) -> TaskPriority;
250
251    /// Schedule a foreground backend job for execution.
252    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
253
254    /// Schedule a background backend job for execution.
255    ///
256    /// Background jobs are not counted towards activeness of the system. The system is considered
257    /// idle even with active background jobs.
258    fn schedule_backend_background_job(&self, job: B::BackendJob);
259
260    /// Returns the duration from the start of the program to the given instant.
261    fn program_duration_until(&self, instant: Instant) -> Duration;
262
263    /// Returns true if the system is idle.
264    fn is_idle(&self) -> bool;
265
266    /// Returns a reference to the backend.
267    fn backend(&self) -> &B;
268}
269
270#[allow(clippy::manual_non_exhaustive)]
271pub struct UpdateInfo {
272    pub duration: Duration,
273    pub tasks: usize,
274    pub reasons: InvalidationReasonSet,
275    #[allow(dead_code)]
276    placeholder_for_future_fields: (),
277}
278
279#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
280pub enum TaskPersistence {
281    /// Tasks that may be persisted across sessions using serialization.
282    Persistent,
283
284    /// Tasks that will be persisted in memory for the life of this session, but won't persist
285    /// between sessions.
286    ///
287    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
288    /// type [`TransientValue`][crate::value::TransientValue] or
289    /// [`TransientInstance`][crate::value::TransientInstance].
290    Transient,
291}
292
293impl Display for TaskPersistence {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        match self {
296            TaskPersistence::Persistent => write!(f, "persistent"),
297            TaskPersistence::Transient => write!(f, "transient"),
298        }
299    }
300}
301
302#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
303pub enum ReadConsistency {
304    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
305    /// may later trigger re-computation.
306    #[default]
307    Eventual,
308    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
309    /// the cost of slower reads.
310    ///
311    /// Top-level code that returns data to the user should use strongly consistent reads.
312    Strong,
313}
314
315#[derive(Clone, Copy, Debug, Eq, PartialEq)]
316pub enum ReadCellTracking {
317    /// Reads are tracked as dependencies of the current task.
318    Tracked {
319        /// The key used for the dependency
320        key: Option<u64>,
321    },
322    /// The read is only tracked when there is an error, otherwise it is untracked.
323    ///
324    /// INVALIDATION: Be careful with this, it will not track dependencies, so
325    /// using it could break cache invalidation.
326    TrackOnlyError,
327    /// The read is not tracked as a dependency of the current task.
328    ///
329    /// INVALIDATION: Be careful with this, it will not track dependencies, so
330    /// using it could break cache invalidation.
331    Untracked,
332}
333
334impl ReadCellTracking {
335    pub fn should_track(&self, is_err: bool) -> bool {
336        match self {
337            ReadCellTracking::Tracked { .. } => true,
338            ReadCellTracking::TrackOnlyError => is_err,
339            ReadCellTracking::Untracked => false,
340        }
341    }
342
343    pub fn key(&self) -> Option<u64> {
344        match self {
345            ReadCellTracking::Tracked { key } => *key,
346            ReadCellTracking::TrackOnlyError => None,
347            ReadCellTracking::Untracked => None,
348        }
349    }
350}
351
352impl Default for ReadCellTracking {
353    fn default() -> Self {
354        ReadCellTracking::Tracked { key: None }
355    }
356}
357
358impl Display for ReadCellTracking {
359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360        match self {
361            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
362            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
363            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
364            ReadCellTracking::Untracked => write!(f, "untracked"),
365        }
366    }
367}
368
369#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
370pub enum ReadTracking {
371    /// Reads are tracked as dependencies of the current task.
372    #[default]
373    Tracked,
374    /// The read is only tracked when there is an error, otherwise it is untracked.
375    ///
376    /// INVALIDATION: Be careful with this, it will not track dependencies, so
377    /// using it could break cache invalidation.
378    TrackOnlyError,
379    /// The read is not tracked as a dependency of the current task.
380    ///
381    /// INVALIDATION: Be careful with this, it will not track dependencies, so
382    /// using it could break cache invalidation.
383    Untracked,
384}
385
386impl ReadTracking {
387    pub fn should_track(&self, is_err: bool) -> bool {
388        match self {
389            ReadTracking::Tracked => true,
390            ReadTracking::TrackOnlyError => is_err,
391            ReadTracking::Untracked => false,
392        }
393    }
394}
395
396impl Display for ReadTracking {
397    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
398        match self {
399            ReadTracking::Tracked => write!(f, "tracked"),
400            ReadTracking::TrackOnlyError => write!(f, "track only error"),
401            ReadTracking::Untracked => write!(f, "untracked"),
402        }
403    }
404}
405
406#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
407pub enum TaskPriority {
408    #[default]
409    Initial,
410    Invalidation {
411        priority: Reverse<u32>,
412    },
413    Recomputation,
414}
415
416impl TaskPriority {
417    pub fn invalidation(priority: u32) -> Self {
418        Self::Invalidation {
419            priority: Reverse(priority),
420        }
421    }
422
423    pub fn initial() -> Self {
424        Self::Initial
425    }
426
427    pub fn leaf() -> Self {
428        Self::Invalidation {
429            priority: Reverse(0),
430        }
431    }
432
433    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
434        match self {
435            TaskPriority::Initial => parent_priority,
436            TaskPriority::Invalidation { priority } => {
437                if let TaskPriority::Invalidation {
438                    priority: parent_priority,
439                } = parent_priority
440                    && priority.0 < parent_priority.0
441                {
442                    Self::Invalidation {
443                        priority: Reverse(parent_priority.0.saturating_add(1)),
444                    }
445                } else {
446                    *self
447                }
448            }
449            TaskPriority::Recomputation => TaskPriority::Recomputation,
450        }
451    }
452}
453
454impl Display for TaskPriority {
455    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456        match self {
457            TaskPriority::Initial => write!(f, "initial"),
458            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
459            TaskPriority::Recomputation => write!(f, "recomputation"),
460        }
461    }
462}
463
464enum ScheduledTask {
465    Task {
466        task_id: TaskId,
467        span: Span,
468    },
469    LocalTask {
470        ty: LocalTaskSpec,
471        persistence: TaskPersistence,
472        local_task_id: LocalTaskId,
473        global_task_state: Arc<RwLock<CurrentTaskState>>,
474        span: Span,
475    },
476}
477
478pub struct TurboTasks<B: Backend + 'static> {
479    this: Weak<Self>,
480    backend: B,
481    task_id_factory: IdFactoryWithReuse<TaskId>,
482    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
483    execution_id_factory: IdFactory<ExecutionId>,
484    stopped: AtomicBool,
485    currently_scheduled_foreground_jobs: AtomicUsize,
486    currently_scheduled_background_jobs: AtomicUsize,
487    scheduled_tasks: AtomicUsize,
488    priority_runner:
489        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
490    start: Mutex<Option<Instant>>,
491    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
492    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
493    event_foreground_start: Event,
494    /// Event that is triggered when all foreground jobs are done
495    /// (currently_scheduled_foreground_jobs becomes zero)
496    event_foreground_done: Event,
497    /// Event that is triggered when all background jobs are done
498    event_background_done: Event,
499    program_start: Instant,
500    compilation_events: CompilationEventQueue,
501}
502
503/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
504/// all share the same non-local task state.
505///
506/// A non-local task is one that:
507///
508/// - Has a unique task id.
509/// - Is potentially cached.
510/// - The backend is aware of.
511struct CurrentTaskState {
512    task_id: Option<TaskId>,
513    execution_id: ExecutionId,
514    priority: TaskPriority,
515
516    /// True if the current task has state in cells (interior mutability).
517    /// Only tracked when verify_determinism feature is enabled.
518    #[cfg(feature = "verify_determinism")]
519    stateful: bool,
520
521    /// True if the current task uses an external invalidator
522    has_invalidator: bool,
523
524    /// True if we're in a top-level task (e.g. `.run_once(...)` or `.run(...)`).
525    /// Eventually consistent reads are not allowed in top-level tasks.
526    in_top_level_task: bool,
527
528    /// Tracks how many cells of each type has been allocated so far during this task execution.
529    /// When a task is re-executed, the cell count may not match the existing cell vec length.
530    ///
531    /// This is taken (and becomes `None`) during teardown of a task.
532    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
533
534    /// Tracks execution of Local tasks (and detached test futures) created during this global
535    /// task's execution.
536    local_tasks: LocalTaskTracker,
537}
538
539impl CurrentTaskState {
540    fn new(
541        task_id: TaskId,
542        execution_id: ExecutionId,
543        priority: TaskPriority,
544        in_top_level_task: bool,
545    ) -> Self {
546        Self {
547            task_id: Some(task_id),
548            execution_id,
549            priority,
550            #[cfg(feature = "verify_determinism")]
551            stateful: false,
552            has_invalidator: false,
553            in_top_level_task,
554            cell_counters: Some(AutoMap::default()),
555            local_tasks: LocalTaskTracker::new(),
556        }
557    }
558
559    fn new_temporary(
560        execution_id: ExecutionId,
561        priority: TaskPriority,
562        in_top_level_task: bool,
563    ) -> Self {
564        Self {
565            task_id: None,
566            execution_id,
567            priority,
568            #[cfg(feature = "verify_determinism")]
569            stateful: false,
570            has_invalidator: false,
571            in_top_level_task,
572            cell_counters: None,
573            local_tasks: LocalTaskTracker::new(),
574        }
575    }
576
577    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
578        if self.execution_id != expected_execution_id {
579            panic!(
580                "Local tasks can only be scheduled/awaited within the same execution of the \
581                 parent task that created them"
582            );
583        }
584    }
585}
586
587// TODO implement our own thread pool and make these thread locals instead
588task_local! {
589    /// The current TurboTasks instance
590    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
591
592    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
593
594    /// Temporarily suppresses the eventual consistency check in top-level tasks.
595    /// This is used by strongly consistent reads to allow them to succeed in top-level tasks.
596    /// This is NOT shared across local tasks (unlike CURRENT_TASK_STATE), so it's safe
597    /// to set/unset without race conditions.
598    pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
599}
600
601impl<B: Backend + 'static> TurboTasks<B> {
602    // TODO better lifetime management for turbo tasks
603    // consider using unsafe for the task_local turbo tasks
604    // that should be safe as long tasks can't outlife turbo task
605    // so we probably want to make sure that all tasks are joined
606    // when trying to drop turbo tasks
607    pub fn new(backend: B) -> Arc<Self> {
608        let task_id_factory = IdFactoryWithReuse::new(
609            TaskId::MIN,
610            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
611        );
612        let transient_task_id_factory =
613            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
614        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
615        let this = Arc::new_cyclic(|this| Self {
616            this: this.clone(),
617            backend,
618            task_id_factory,
619            transient_task_id_factory,
620            execution_id_factory,
621            stopped: AtomicBool::new(false),
622            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
623            currently_scheduled_background_jobs: AtomicUsize::new(0),
624            scheduled_tasks: AtomicUsize::new(0),
625            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
626            start: Default::default(),
627            aggregated_update: Default::default(),
628            event_foreground_done: Event::new(|| {
629                || "TurboTasks::event_foreground_done".to_string()
630            }),
631            event_foreground_start: Event::new(|| {
632                || "TurboTasks::event_foreground_start".to_string()
633            }),
634            event_background_done: Event::new(|| {
635                || "TurboTasks::event_background_done".to_string()
636            }),
637            program_start: Instant::now(),
638            compilation_events: CompilationEventQueue::default(),
639        });
640        this.backend.startup(&*this);
641        this
642    }
643
644    pub fn pin(&self) -> Arc<Self> {
645        self.this.upgrade().unwrap()
646    }
647
648    /// Creates a new root task
649    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
650    where
651        T: ?Sized,
652        F: Fn() -> Fut + Send + Sync + Clone + 'static,
653        Fut: Future<Output = Result<Vc<T>>> + Send,
654    {
655        let id = self.backend.create_transient_task(
656            TransientTaskType::Root(Box::new(move || {
657                let functor = functor.clone();
658                Box::pin(async move {
659                    mark_top_level_task();
660                    let raw_vc = functor().await?.node;
661                    raw_vc.to_non_local().await
662                })
663            })),
664            self,
665        );
666        self.schedule(id, TaskPriority::initial());
667        id
668    }
669
670    pub fn dispose_root_task(&self, task_id: TaskId) {
671        self.backend.dispose_root_task(task_id, self);
672    }
673
674    // TODO make sure that all dependencies settle before reading them
675    /// Creates a new root task, that is only executed once.
676    /// Dependencies will not invalidate the task.
677    #[track_caller]
678    fn spawn_once_task<T, Fut>(&self, future: Fut)
679    where
680        T: ?Sized,
681        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
682    {
683        let id = self.backend.create_transient_task(
684            TransientTaskType::Once(Box::pin(async move {
685                mark_top_level_task();
686                let raw_vc = future.await?.node;
687                raw_vc.to_non_local().await
688            })),
689            self,
690        );
691        self.schedule(id, TaskPriority::initial());
692    }
693
694    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
695        &self,
696        future: impl Future<Output = Result<T>> + Send + 'static,
697    ) -> Result<T> {
698        let (tx, rx) = tokio::sync::oneshot::channel();
699        self.spawn_once_task(async move {
700            mark_top_level_task();
701            let result = future.await;
702            tx.send(result)
703                .map_err(|_| anyhow!("unable to send result"))?;
704            Ok(Completion::new())
705        });
706
707        rx.await?
708    }
709
710    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
711    pub async fn run<T: TraceRawVcs + Send + 'static>(
712        &self,
713        future: impl Future<Output = Result<T>> + Send + 'static,
714    ) -> Result<T, TurboTasksExecutionError> {
715        self.begin_foreground_job();
716        // it's okay for execution ids to overflow and wrap, they're just used for an assert
717        let execution_id = self.execution_id_factory.wrapping_get();
718        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
719            execution_id,
720            TaskPriority::initial(),
721            true, // in_top_level_task
722        )));
723
724        let result = TURBO_TASKS
725            .scope(
726                self.pin(),
727                CURRENT_TASK_STATE.scope(current_task_state, async {
728                    let result = CaptureFuture::new(future).await;
729
730                    // wait for all spawned local tasks using `local` to finish
731                    wait_for_local_tasks().await;
732
733                    match result {
734                        Ok(Ok(value)) => Ok(value),
735                        Ok(Err(err)) => Err(err.into()),
736                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
737                    }
738                }),
739            )
740            .await;
741        self.finish_foreground_job();
742        result
743    }
744
745    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
746        let this = self.pin();
747        tokio::spawn(async move {
748            this.pin()
749                .run_once(async move {
750                    this.finish_foreground_job();
751                    future.await;
752                    this.begin_foreground_job();
753                    Ok(())
754                })
755                .await
756                .unwrap()
757        });
758    }
759
760    pub(crate) fn native_call(
761        &self,
762        native_fn: &'static NativeFunction,
763        this: Option<RawVc>,
764        arg: &mut dyn StackDynTaskInputs,
765        persistence: TaskPersistence,
766    ) -> RawVc {
767        RawVc::TaskOutput(self.backend.get_or_create_task(
768            native_fn,
769            this,
770            arg,
771            current_task_if_available("turbo_function calls"),
772            persistence,
773            self,
774        ))
775    }
776
777    pub fn dynamic_call(
778        &self,
779        native_fn: &'static NativeFunction,
780        this: Option<RawVc>,
781        arg: &mut dyn StackDynTaskInputs,
782        persistence: TaskPersistence,
783    ) -> RawVc {
784        if this.is_none_or(|this| this.is_resolved())
785            && native_fn.arg_meta.is_resolved(arg.as_ref())
786        {
787            return self.native_call(native_fn, this, arg, persistence);
788        }
789        // Need async resolution — must move the arg to the heap now
790        let arg = arg.take_box();
791        let task_type = LocalTaskSpec {
792            task_type: LocalTaskType::ResolveNative { native_fn },
793            this,
794            arg,
795        };
796        self.schedule_local_task(task_type, persistence)
797    }
798
799    pub fn trait_call(
800        &self,
801        trait_method: &'static TraitMethod,
802        this: RawVc,
803        arg: &mut dyn StackDynTaskInputs,
804        persistence: TaskPersistence,
805    ) -> RawVc {
806        // avoid creating a wrapper task if self is already resolved
807        // for resolved cells we already know the value type so we can lookup the
808        // function
809        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
810            match registry::get_value_type(type_id).get_trait_method(trait_method) {
811                Some(native_fn) => {
812                    if let Some(filter) = native_fn.arg_meta.filter_owned {
813                        let mut arg = (filter)(arg);
814                        return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
815                    } else {
816                        return self.dynamic_call(native_fn, Some(this), arg, persistence);
817                    }
818                }
819                None => {
820                    // We are destined to fail at this point, but we just retry resolution in the
821                    // local task since we cannot report an error from here.
822                    // TODO: A panic seems appropriate since the immediate caller is to blame
823                }
824            }
825        }
826
827        // create a wrapper task to resolve all inputs
828        let task_type = LocalTaskSpec {
829            task_type: LocalTaskType::ResolveTrait { trait_method },
830            this: Some(this),
831            arg: arg.take_box(),
832        };
833
834        self.schedule_local_task(task_type, persistence)
835    }
836
837    #[track_caller]
838    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
839        self.begin_foreground_job();
840        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
841
842        self.priority_runner.schedule(
843            &self.pin(),
844            ScheduledTask::Task {
845                task_id,
846                span: Span::current(),
847            },
848            priority,
849        );
850    }
851
852    fn schedule_local_task(
853        &self,
854        ty: LocalTaskSpec,
855        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
856        persistence: TaskPersistence,
857    ) -> RawVc {
858        let task_type = ty.task_type;
859        let (global_task_state, execution_id, priority, local_task_id) =
860            CURRENT_TASK_STATE.with(|gts| {
861                let mut gts_write = gts.write().unwrap();
862                let local_task_id = gts_write.local_tasks.create(task_type);
863                (
864                    Arc::clone(gts),
865                    gts_write.execution_id,
866                    gts_write.priority,
867                    local_task_id,
868                )
869            });
870
871        self.priority_runner.schedule(
872            &self.pin(),
873            ScheduledTask::LocalTask {
874                ty,
875                persistence,
876                local_task_id,
877                global_task_state,
878                span: Span::current(),
879            },
880            priority,
881        );
882
883        RawVc::LocalOutput(execution_id, local_task_id, persistence)
884    }
885
886    fn begin_foreground_job(&self) {
887        if self
888            .currently_scheduled_foreground_jobs
889            .fetch_add(1, Ordering::AcqRel)
890            == 0
891        {
892            *self.start.lock().unwrap() = Some(Instant::now());
893            self.event_foreground_start.notify(usize::MAX);
894            self.backend.idle_end(self);
895        }
896    }
897
898    fn finish_foreground_job(&self) {
899        if self
900            .currently_scheduled_foreground_jobs
901            .fetch_sub(1, Ordering::AcqRel)
902            == 1
903        {
904            self.backend.idle_start(self);
905            // That's not super race-condition-safe, but it's only for
906            // statistical reasons
907            let total = self.scheduled_tasks.load(Ordering::Acquire);
908            self.scheduled_tasks.store(0, Ordering::Release);
909            if let Some(start) = *self.start.lock().unwrap() {
910                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
911                if let Some(update) = update.as_mut() {
912                    update.0 += start.elapsed();
913                    update.1 += total;
914                } else {
915                    *update = Some((start.elapsed(), total));
916                }
917            }
918            self.event_foreground_done.notify(usize::MAX);
919        }
920    }
921
922    fn begin_background_job(&self) {
923        self.currently_scheduled_background_jobs
924            .fetch_add(1, Ordering::Relaxed);
925    }
926
927    fn finish_background_job(&self) {
928        if self
929            .currently_scheduled_background_jobs
930            .fetch_sub(1, Ordering::Relaxed)
931            == 1
932        {
933            self.event_background_done.notify(usize::MAX);
934        }
935    }
936
937    pub fn get_in_progress_count(&self) -> usize {
938        self.currently_scheduled_foreground_jobs
939            .load(Ordering::Acquire)
940    }
941
942    /// Waits for the given task to finish executing. This works by performing an untracked read,
943    /// and discarding the value of the task output.
944    ///
945    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
946    /// before all dependencies have completely settled.
947    ///
948    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
949    /// to fully settle before returning.
950    ///
951    /// As this function is typically called in top-level code that waits for results to be ready
952    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
953    pub async fn wait_task_completion(
954        &self,
955        id: TaskId,
956        consistency: ReadConsistency,
957    ) -> Result<()> {
958        read_task_output(
959            self,
960            id,
961            ReadOutputOptions {
962                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
963                tracking: ReadTracking::Untracked,
964                consistency,
965            },
966        )
967        .await?;
968        Ok(())
969    }
970
971    /// Returns [UpdateInfo] with all updates aggregated over a given duration
972    /// (`aggregation`). Will wait until an update happens.
973    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
974        self.aggregated_update_info(aggregation, Duration::MAX)
975            .await
976            .unwrap()
977    }
978
979    /// Returns [UpdateInfo] with all updates aggregated over a given duration
980    /// (`aggregation`). Will only return None when the timeout is reached while
981    /// waiting for the first update.
982    pub async fn aggregated_update_info(
983        &self,
984        aggregation: Duration,
985        timeout: Duration,
986    ) -> Option<UpdateInfo> {
987        let listener = self
988            .event_foreground_done
989            .listen_with_note(|| || "wait for update info".to_string());
990        let wait_for_finish = {
991            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
992            if aggregation.is_zero() {
993                if let Some((duration, tasks)) = update.take() {
994                    return Some(UpdateInfo {
995                        duration,
996                        tasks,
997                        reasons: take(reason_set),
998                        placeholder_for_future_fields: (),
999                    });
1000                } else {
1001                    true
1002                }
1003            } else {
1004                update.is_none()
1005            }
1006        };
1007        if wait_for_finish {
1008            if timeout == Duration::MAX {
1009                // wait for finish
1010                listener.await;
1011            } else {
1012                // wait for start, then wait for finish or timeout
1013                let start_listener = self
1014                    .event_foreground_start
1015                    .listen_with_note(|| || "wait for update info".to_string());
1016                if self
1017                    .currently_scheduled_foreground_jobs
1018                    .load(Ordering::Acquire)
1019                    == 0
1020                {
1021                    start_listener.await;
1022                } else {
1023                    drop(start_listener);
1024                }
1025                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1026                    // Timeout
1027                    return None;
1028                }
1029            }
1030        }
1031        if !aggregation.is_zero() {
1032            loop {
1033                select! {
1034                    () = tokio::time::sleep(aggregation) => {
1035                        break;
1036                    }
1037                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1038                        // Resets the sleep
1039                    }
1040                }
1041            }
1042        }
1043        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1044        if let Some((duration, tasks)) = update.take() {
1045            Some(UpdateInfo {
1046                duration,
1047                tasks,
1048                reasons: take(reason_set),
1049                placeholder_for_future_fields: (),
1050            })
1051        } else {
1052            panic!("aggregated_update_info must not called concurrently")
1053        }
1054    }
1055
1056    pub async fn wait_background_done(&self) {
1057        let listener = self.event_background_done.listen();
1058        if self
1059            .currently_scheduled_background_jobs
1060            .load(Ordering::Acquire)
1061            != 0
1062        {
1063            listener.await;
1064        }
1065    }
1066
1067    pub async fn stop_and_wait(&self) {
1068        turbo_tasks_future_scope(self.pin(), async move {
1069            self.backend.stopping(self);
1070            self.stopped.store(true, Ordering::Release);
1071            {
1072                let listener = self
1073                    .event_foreground_done
1074                    .listen_with_note(|| || "wait for stop".to_string());
1075                if self
1076                    .currently_scheduled_foreground_jobs
1077                    .load(Ordering::Acquire)
1078                    != 0
1079                {
1080                    listener.await;
1081                }
1082            }
1083            {
1084                let listener = self.event_background_done.listen();
1085                if self
1086                    .currently_scheduled_background_jobs
1087                    .load(Ordering::Acquire)
1088                    != 0
1089                {
1090                    listener.await;
1091                }
1092            }
1093            self.backend.stop(self);
1094        })
1095        .await;
1096    }
1097
1098    #[track_caller]
1099    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1100    where
1101        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1102        T::CallOnceFuture: Send,
1103    {
1104        let mut this = self.pin();
1105        this.begin_foreground_job();
1106        tokio::spawn(
1107            TURBO_TASKS
1108                .scope(this.clone(), async move {
1109                    if !this.stopped.load(Ordering::Acquire) {
1110                        this = func(this.clone()).await;
1111                    }
1112                    this.finish_foreground_job();
1113                })
1114                .in_current_span(),
1115        );
1116    }
1117
1118    #[track_caller]
1119    pub(crate) fn schedule_background_job<T>(&self, func: T)
1120    where
1121        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1122        T::CallOnceFuture: Send,
1123    {
1124        let mut this = self.pin();
1125        self.begin_background_job();
1126        tokio::spawn(
1127            TURBO_TASKS
1128                .scope(this.clone(), async move {
1129                    if !this.stopped.load(Ordering::Acquire) {
1130                        this = func(this).await;
1131                    }
1132                    this.finish_background_job();
1133                })
1134                .in_current_span(),
1135        );
1136    }
1137
1138    fn finish_current_task_state(&self) -> FinishedTaskState {
1139        CURRENT_TASK_STATE.with(|cell| {
1140            let current_task_state = &*cell.write().unwrap();
1141            FinishedTaskState {
1142                #[cfg(feature = "verify_determinism")]
1143                stateful: current_task_state.stateful,
1144                has_invalidator: current_task_state.has_invalidator,
1145            }
1146        })
1147    }
1148
1149    pub fn backend(&self) -> &B {
1150        &self.backend
1151    }
1152}
1153
1154struct TurboTasksExecutor;
1155
1156/// Run a future and abort the process if a panic is reported
1157///
1158/// Turbtasks catches panics from user code and propagates throught the task tree, but if it happens
1159/// as part of state management we have to abort
1160async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1161    match AssertUnwindSafe(f).catch_unwind().await {
1162        Ok(r) => r,
1163        Err(_) => {
1164            eprintln!(
1165                "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1166                 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1167                 https://github.com/vercel/next.js/discussions and include the panic message \
1168                 and stack trace above.\n\nAborting."
1169            );
1170            abort();
1171        }
1172    }
1173}
1174
1175impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1176    type Future = impl Future<Output = ()> + Send + 'static;
1177
1178    fn execute(
1179        &self,
1180        this: &Arc<TurboTasks<B>>,
1181        scheduled_task: ScheduledTask,
1182        priority: TaskPriority,
1183    ) -> Self::Future {
1184        match scheduled_task {
1185            ScheduledTask::Task { task_id, span } => {
1186                let this2 = this.clone();
1187                let this = this.clone();
1188                let future = async move {
1189                    abort_on_panic(async {
1190                        // it's okay for execution ids to overflow and wrap, they're just used
1191                        // for an assert
1192                        let execution_id = this.execution_id_factory.wrapping_get();
1193                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1194                            task_id,
1195                            execution_id,
1196                            priority,
1197                            false, // in_top_level_task
1198                        )));
1199                        let single_execution_future = async {
1200                            if this.stopped.load(Ordering::Acquire) {
1201                                this.backend.task_execution_canceled(task_id, &*this);
1202                                return None;
1203                            }
1204
1205                            let TaskExecutionSpec { future, span } = this
1206                                .backend
1207                                .try_start_task_execution(task_id, priority, &*this)?;
1208
1209                            async {
1210                                let result = CaptureFuture::new(future).await;
1211
1212                                // wait for all spawned local tasks using `local` to finish
1213                                wait_for_local_tasks().await;
1214
1215                                let result = match result {
1216                                    Ok(Ok(raw_vc)) => {
1217                                        // This is safe because we waited for all local tasks to
1218                                        // complete above
1219                                        raw_vc
1220                                            .to_non_local_unchecked_sync(&*this)
1221                                            .map_err(|err| err.into())
1222                                    }
1223                                    Ok(Err(err)) => Err(err.into()),
1224                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1225                                };
1226
1227                                let finished_state = this.finish_current_task_state();
1228                                let cell_counters = CURRENT_TASK_STATE
1229                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1230                                this.backend.task_execution_completed(
1231                                    task_id,
1232                                    result,
1233                                    &cell_counters,
1234                                    #[cfg(feature = "verify_determinism")]
1235                                    finished_state.stateful,
1236                                    finished_state.has_invalidator,
1237                                    &*this,
1238                                )
1239                            }
1240                            .instrument(span)
1241                            .await
1242                        };
1243                        if let Some(stale_priority) = CURRENT_TASK_STATE
1244                            .scope(current_task_state, single_execution_future)
1245                            .await
1246                        {
1247                            // Task was stale; re-schedule at the correct invalidation priority so
1248                            // other tasks can run in the right priority order.
1249                            this.schedule(task_id, stale_priority);
1250                        }
1251                        this.finish_foreground_job();
1252                    })
1253                    .await
1254                };
1255
1256                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1257            }
1258            ScheduledTask::LocalTask {
1259                ty,
1260                persistence,
1261                local_task_id,
1262                global_task_state,
1263                span,
1264            } => {
1265                let this2 = this.clone();
1266                let this = this.clone();
1267                let task_type = ty.task_type;
1268                let future = async move {
1269                    let span = match &ty.task_type {
1270                        LocalTaskType::ResolveNative { native_fn } => {
1271                            native_fn.resolve_span(priority)
1272                        }
1273                        LocalTaskType::ResolveTrait { trait_method } => {
1274                            trait_method.resolve_span(priority)
1275                        }
1276                    };
1277                    abort_on_panic(
1278                        async move {
1279                            let result = match ty.task_type {
1280                                LocalTaskType::ResolveNative { native_fn } => {
1281                                    LocalTaskType::run_resolve_native(
1282                                        native_fn,
1283                                        ty.this,
1284                                        &*ty.arg,
1285                                        persistence,
1286                                        this,
1287                                    )
1288                                    .await
1289                                }
1290                                LocalTaskType::ResolveTrait { trait_method } => {
1291                                    LocalTaskType::run_resolve_trait(
1292                                        trait_method,
1293                                        ty.this.unwrap(),
1294                                        &*ty.arg,
1295                                        persistence,
1296                                        this,
1297                                    )
1298                                    .await
1299                                }
1300                            };
1301
1302                            let output = match result {
1303                                Ok(raw_vc) => OutputContent::Link(raw_vc),
1304                                Err(err) => OutputContent::Error(
1305                                    TurboTasksExecutionError::from(err)
1306                                        .with_local_task_context(task_type.to_string()),
1307                                ),
1308                            };
1309
1310                            CURRENT_TASK_STATE.with(move |gts| {
1311                                gts.write()
1312                                    .unwrap()
1313                                    .local_tasks
1314                                    .complete(local_task_id, output);
1315                            });
1316                        }
1317                        .instrument(span),
1318                    )
1319                    .await
1320                };
1321                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1322
1323                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1324            }
1325        }
1326    }
1327}
1328
1329struct FinishedTaskState {
1330    /// True if the task has state in cells (interior mutability).
1331    /// Only tracked when verify_determinism feature is enabled.
1332    #[cfg(feature = "verify_determinism")]
1333    stateful: bool,
1334
1335    /// True if the task uses an external invalidator
1336    has_invalidator: bool,
1337}
1338
1339impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1340    fn dynamic_call(
1341        &self,
1342        native_fn: &'static NativeFunction,
1343        this: Option<RawVc>,
1344        arg: &mut dyn StackDynTaskInputs,
1345        persistence: TaskPersistence,
1346    ) -> RawVc {
1347        self.dynamic_call(native_fn, this, arg, persistence)
1348    }
1349    fn native_call(
1350        &self,
1351        native_fn: &'static NativeFunction,
1352        this: Option<RawVc>,
1353        arg: &mut dyn StackDynTaskInputs,
1354        persistence: TaskPersistence,
1355    ) -> RawVc {
1356        self.native_call(native_fn, this, arg, persistence)
1357    }
1358    fn trait_call(
1359        &self,
1360        trait_method: &'static TraitMethod,
1361        this: RawVc,
1362        arg: &mut dyn StackDynTaskInputs,
1363        persistence: TaskPersistence,
1364    ) -> RawVc {
1365        self.trait_call(trait_method, this, arg, persistence)
1366    }
1367
1368    #[track_caller]
1369    fn run(
1370        &self,
1371        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1372    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1373        let this = self.pin();
1374        Box::pin(async move { this.run(future).await })
1375    }
1376
1377    #[track_caller]
1378    fn run_once(
1379        &self,
1380        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1381    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1382        let this = self.pin();
1383        Box::pin(async move { this.run_once(future).await })
1384    }
1385
1386    #[track_caller]
1387    fn run_once_with_reason(
1388        &self,
1389        reason: StaticOrArc<dyn InvalidationReason>,
1390        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1391    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1392        {
1393            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1394            reason_set.insert(reason);
1395        }
1396        let this = self.pin();
1397        Box::pin(async move { this.run_once(future).await })
1398    }
1399
1400    #[track_caller]
1401    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1402        self.start_once_process(future)
1403    }
1404
1405    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1406        if let Err(e) = self.compilation_events.send(event) {
1407            tracing::warn!("Failed to send compilation event: {e}");
1408        }
1409    }
1410
1411    fn get_task_name(&self, task: TaskId) -> String {
1412        self.backend.get_task_name(task, self)
1413    }
1414}
1415
1416impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1417    #[instrument(level = "info", skip_all, name = "invalidate")]
1418    fn invalidate(&self, task: TaskId) {
1419        self.backend.invalidate_task(task, self);
1420    }
1421
1422    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1423    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1424        {
1425            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1426            reason_set.insert(reason);
1427        }
1428        self.backend.invalidate_task(task, self);
1429    }
1430
1431    fn invalidate_serialization(&self, task: TaskId) {
1432        self.backend.invalidate_serialization(task, self);
1433    }
1434
1435    #[track_caller]
1436    fn try_read_task_output(
1437        &self,
1438        task: TaskId,
1439        options: ReadOutputOptions,
1440    ) -> Result<Result<RawVc, EventListener>> {
1441        if options.consistency == ReadConsistency::Eventual {
1442            debug_assert_not_in_top_level_task("read_task_output");
1443        }
1444        self.backend.try_read_task_output(
1445            task,
1446            current_task_if_available("reading Vcs"),
1447            options,
1448            self,
1449        )
1450    }
1451
1452    #[track_caller]
1453    fn try_read_task_cell(
1454        &self,
1455        task: TaskId,
1456        index: CellId,
1457        options: ReadCellOptions,
1458    ) -> Result<Result<TypedCellContent, EventListener>> {
1459        let reader = current_task_if_available("reading Vcs");
1460        self.backend
1461            .try_read_task_cell(task, index, reader, options, self)
1462    }
1463
1464    fn try_read_own_task_cell(
1465        &self,
1466        current_task: TaskId,
1467        index: CellId,
1468    ) -> Result<TypedCellContent> {
1469        self.backend
1470            .try_read_own_task_cell(current_task, index, self)
1471    }
1472
1473    #[track_caller]
1474    fn try_read_local_output(
1475        &self,
1476        execution_id: ExecutionId,
1477        local_task_id: LocalTaskId,
1478    ) -> Result<Result<RawVc, EventListener>> {
1479        debug_assert_not_in_top_level_task("read_local_output");
1480        CURRENT_TASK_STATE.with(|gts| {
1481            let gts_read = gts.read().unwrap();
1482
1483            // Local Vcs are local to their parent task's current execution, and do not exist
1484            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1485            // marker trait. This assertion exists to handle any potential escapes that the
1486            // compile-time checks cannot capture.
1487            gts_read.assert_execution_id(execution_id);
1488
1489            match gts_read.local_tasks.get(local_task_id) {
1490                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1491                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1492            }
1493        })
1494    }
1495
1496    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1497        // TODO: Add assert_not_in_top_level_task("read_task_collectibles") check here.
1498        // Collectible reads are eventually consistent.
1499        self.backend.read_task_collectibles(
1500            task,
1501            trait_id,
1502            current_task_if_available("reading collectibles"),
1503            self,
1504        )
1505    }
1506
1507    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1508        self.backend.emit_collectible(
1509            trait_type,
1510            collectible,
1511            current_task("emitting collectible"),
1512            self,
1513        );
1514    }
1515
1516    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1517        self.backend.unemit_collectible(
1518            trait_type,
1519            collectible,
1520            count,
1521            current_task("emitting collectible"),
1522            self,
1523        );
1524    }
1525
1526    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1527        for (&collectible, &count) in collectibles {
1528            if count > 0 {
1529                self.backend.unemit_collectible(
1530                    trait_type,
1531                    collectible,
1532                    count as u32,
1533                    current_task("emitting collectible"),
1534                    self,
1535                );
1536            }
1537        }
1538    }
1539
1540    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1541        self.try_read_own_task_cell(task, index)
1542    }
1543
1544    fn update_own_task_cell(
1545        &self,
1546        task: TaskId,
1547        index: CellId,
1548        content: CellContent,
1549        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1550        content_hash: Option<CellHash>,
1551        verification_mode: VerificationMode,
1552    ) {
1553        self.backend.update_task_cell(
1554            task,
1555            index,
1556            content,
1557            updated_key_hashes,
1558            content_hash,
1559            verification_mode,
1560            self,
1561        );
1562    }
1563
1564    fn connect_task(&self, task: TaskId) {
1565        self.backend
1566            .connect_task(task, current_task_if_available("connecting task"), self);
1567    }
1568
1569    fn mark_own_task_as_finished(&self, task: TaskId) {
1570        self.backend.mark_own_task_as_finished(task, self);
1571    }
1572
1573    /// Creates a future that inherits the current task id and task state. The current global task
1574    /// will wait for this future to be dropped before exiting.
1575    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1576        // this is similar to what happens for a local task, except that we keep the local task's
1577        // state as well.
1578        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1579        global_task_state
1580            .write()
1581            .unwrap()
1582            .local_tasks
1583            .register_detached();
1584        let wrapped = async move {
1585            // use a drop guard for panic safety
1586            struct DropGuard;
1587            impl Drop for DropGuard {
1588                fn drop(&mut self) {
1589                    CURRENT_TASK_STATE
1590                        .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1591                }
1592            }
1593            let _guard = DropGuard;
1594            fut.await;
1595        };
1596        tokio::spawn(TURBO_TASKS.scope(
1597            turbo_tasks(),
1598            CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1599        ));
1600    }
1601
1602    fn task_statistics(&self) -> &TaskStatisticsApi {
1603        self.backend.task_statistics()
1604    }
1605
1606    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1607        let this = self.pin();
1608        Box::pin(async move {
1609            this.stop_and_wait().await;
1610        })
1611    }
1612
1613    fn subscribe_to_compilation_events(
1614        &self,
1615        event_types: Option<Vec<String>>,
1616    ) -> Receiver<Arc<dyn CompilationEvent>> {
1617        self.compilation_events.subscribe(event_types)
1618    }
1619
1620    fn is_tracking_dependencies(&self) -> bool {
1621        self.backend.is_tracking_dependencies()
1622    }
1623}
1624
1625impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1626    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1627        self.pin()
1628    }
1629    fn backend(&self) -> &B {
1630        &self.backend
1631    }
1632
1633    #[track_caller]
1634    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1635        self.schedule_background_job(async move |this| {
1636            this.backend.run_backend_job(job, &*this).await;
1637            this
1638        })
1639    }
1640
1641    #[track_caller]
1642    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1643        self.schedule_foreground_job(async move |this| {
1644            this.backend.run_backend_job(job, &*this).await;
1645            this
1646        })
1647    }
1648
1649    #[track_caller]
1650    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1651        self.schedule(task, priority)
1652    }
1653
1654    fn get_current_task_priority(&self) -> TaskPriority {
1655        CURRENT_TASK_STATE
1656            .try_with(|task_state| task_state.read().unwrap().priority)
1657            .unwrap_or(TaskPriority::initial())
1658    }
1659
1660    fn program_duration_until(&self, instant: Instant) -> Duration {
1661        instant - self.program_start
1662    }
1663
1664    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1665        // SAFETY: This is a fresh id from the factory
1666        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1667    }
1668
1669    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1670        // SAFETY: This is a fresh id from the factory
1671        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1672    }
1673
1674    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1675        unsafe { self.task_id_factory.reuse(id.into()) }
1676    }
1677
1678    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1679        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1680    }
1681
1682    fn is_idle(&self) -> bool {
1683        self.currently_scheduled_foreground_jobs
1684            .load(Ordering::Acquire)
1685            == 0
1686    }
1687}
1688
1689async fn wait_for_local_tasks() {
1690    let listener =
1691        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1692    let Some(listener) = listener else {
1693        return;
1694    };
1695    listener.await;
1696}
1697
1698pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1699    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1700        Ok(id) => id,
1701        Err(_) => panic!(
1702            "{from} can only be used in the context of a turbo_tasks task execution or \
1703             turbo_tasks run"
1704        ),
1705    }
1706}
1707
1708pub(crate) fn current_task(from: &str) -> TaskId {
1709    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1710        Ok(Some(id)) => id,
1711        Ok(None) | Err(_) => {
1712            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1713        }
1714    }
1715}
1716
1717/// Panics if we're not in a top-level task (e.g. [`run_once`]). Some function calls should only
1718/// happen in a top-level task (e.g. [`Effects::apply`][crate::Effects::apply]).
1719#[track_caller]
1720pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1721    if !cfg!(debug_assertions) {
1722        return;
1723    }
1724
1725    let in_top_level = CURRENT_TASK_STATE
1726        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1727        .unwrap_or(true);
1728    if !in_top_level {
1729        panic!("{message}");
1730    }
1731}
1732
1733#[track_caller]
1734pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1735    if !cfg!(debug_assertions) {
1736        return;
1737    }
1738
1739    // HACK: We set this inside of `ReadRawVcFuture` to suppress warnings about an internal
1740    // consistency bug
1741    let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1742        .try_with(|&suppressed| suppressed)
1743        .unwrap_or(false);
1744    if suppressed {
1745        return;
1746    }
1747
1748    let in_top_level = CURRENT_TASK_STATE
1749        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1750        .unwrap_or(false);
1751    if in_top_level {
1752        panic!(
1753            "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1754             Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1755             reads to avoid leaking inconsistent return values."
1756        );
1757    }
1758}
1759
1760pub async fn run<T: Send + 'static>(
1761    tt: Arc<dyn TurboTasksApi>,
1762    future: impl Future<Output = Result<T>> + Send + 'static,
1763) -> Result<T> {
1764    let (tx, rx) = tokio::sync::oneshot::channel();
1765
1766    tt.run(Box::pin(async move {
1767        let result = future.await?;
1768        tx.send(result)
1769            .map_err(|_| anyhow!("unable to send result"))?;
1770        Ok(())
1771    }))
1772    .await?;
1773
1774    Ok(rx.await?)
1775}
1776
1777pub async fn run_once<T: Send + 'static>(
1778    tt: Arc<dyn TurboTasksApi>,
1779    future: impl Future<Output = Result<T>> + Send + 'static,
1780) -> Result<T> {
1781    let (tx, rx) = tokio::sync::oneshot::channel();
1782
1783    tt.run_once(Box::pin(async move {
1784        let result = future.await?;
1785        tx.send(result)
1786            .map_err(|_| anyhow!("unable to send result"))?;
1787        Ok(())
1788    }))
1789    .await?;
1790
1791    Ok(rx.await?)
1792}
1793
1794pub async fn run_once_with_reason<T: Send + 'static>(
1795    tt: Arc<dyn TurboTasksApi>,
1796    reason: impl InvalidationReason,
1797    future: impl Future<Output = Result<T>> + Send + 'static,
1798) -> Result<T> {
1799    let (tx, rx) = tokio::sync::oneshot::channel();
1800
1801    tt.run_once_with_reason(
1802        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1803        Box::pin(async move {
1804            let result = future.await?;
1805            tx.send(result)
1806                .map_err(|_| anyhow!("unable to send result"))?;
1807            Ok(())
1808        }),
1809    )
1810    .await?;
1811
1812    Ok(rx.await?)
1813}
1814
1815/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1816pub fn dynamic_call(
1817    func: &'static NativeFunction,
1818    this: Option<RawVc>,
1819    arg: &mut dyn StackDynTaskInputs,
1820    persistence: TaskPersistence,
1821) -> RawVc {
1822    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1823}
1824
1825/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1826pub fn trait_call(
1827    trait_method: &'static TraitMethod,
1828    this: RawVc,
1829    arg: &mut dyn StackDynTaskInputs,
1830    persistence: TaskPersistence,
1831) -> RawVc {
1832    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1833}
1834
1835pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1836    TURBO_TASKS.with(|arc| arc.clone())
1837}
1838
1839pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1840    TURBO_TASKS.with(Arc::downgrade)
1841}
1842
1843pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1844    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1845}
1846
1847pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1848    TURBO_TASKS.with(|arc| func(arc))
1849}
1850
1851pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1852    TURBO_TASKS.sync_scope(tt, f)
1853}
1854
1855pub fn turbo_tasks_future_scope<T>(
1856    tt: Arc<dyn TurboTasksApi>,
1857    f: impl Future<Output = T>,
1858) -> impl Future<Output = T> {
1859    TURBO_TASKS.scope(tt, f)
1860}
1861
1862pub fn with_turbo_tasks_for_testing<T>(
1863    tt: Arc<dyn TurboTasksApi>,
1864    current_task: TaskId,
1865    execution_id: ExecutionId,
1866    f: impl Future<Output = T>,
1867) -> impl Future<Output = T> {
1868    TURBO_TASKS.scope(
1869        tt,
1870        CURRENT_TASK_STATE.scope(
1871            Arc::new(RwLock::new(CurrentTaskState::new(
1872                current_task,
1873                execution_id,
1874                TaskPriority::initial(),
1875                false, // in_top_level_task
1876            ))),
1877            f,
1878        ),
1879    )
1880}
1881
1882/// Spawns the given future within the context of the current task.
1883///
1884/// Beware: this method is not safe to use in production code. It is only
1885/// intended for use in tests and for debugging purposes.
1886pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1887    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1888}
1889
1890pub fn current_task_for_testing() -> Option<TaskId> {
1891    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1892}
1893
1894/// Marks the current task as finished. This excludes it from waiting for
1895/// strongly consistency.
1896pub fn mark_finished() {
1897    with_turbo_tasks(|tt| {
1898        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1899    });
1900}
1901
1902/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1903/// serialization of the current task cells.
1904///
1905/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1906/// since State allocation implies interior mutability.
1907pub fn get_serialization_invalidator() -> SerializationInvalidator {
1908    CURRENT_TASK_STATE.with(|cell| {
1909        let CurrentTaskState {
1910            task_id,
1911            #[cfg(feature = "verify_determinism")]
1912            stateful,
1913            ..
1914        } = &mut *cell.write().unwrap();
1915        #[cfg(feature = "verify_determinism")]
1916        {
1917            *stateful = true;
1918        }
1919        let Some(task_id) = *task_id else {
1920            panic!(
1921                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1922                 task execution"
1923            );
1924        };
1925        SerializationInvalidator::new(task_id)
1926    })
1927}
1928
1929pub fn mark_invalidator() {
1930    CURRENT_TASK_STATE.with(|cell| {
1931        let CurrentTaskState {
1932            has_invalidator, ..
1933        } = &mut *cell.write().unwrap();
1934        *has_invalidator = true;
1935    })
1936}
1937
1938/// Marks the current task as stateful. This is used to indicate that the task
1939/// has interior mutability (e.g., via [`State`][crate::State]), which means
1940/// the task may produce different outputs even with the same inputs.
1941///
1942/// Only has an effect when the `verify_determinism` feature is enabled.
1943pub fn mark_stateful() {
1944    #[cfg(feature = "verify_determinism")]
1945    {
1946        CURRENT_TASK_STATE.with(|cell| {
1947            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1948            *stateful = true;
1949        })
1950    }
1951    // No-op when verify_determinism is not enabled
1952}
1953
1954/// Marks the current task context as being in a top-level task. When in a top-level task,
1955/// eventually consistent reads will panic. It is almost always a mistake to perform an eventually
1956/// consistent read at the top-level of the application.
1957pub fn mark_top_level_task() {
1958    if cfg!(debug_assertions) {
1959        CURRENT_TASK_STATE.with(|cell| {
1960            cell.write().unwrap().in_top_level_task = true;
1961        })
1962    }
1963}
1964
1965/// Unmarks the current task context as being in a top-level task. The opposite of
1966/// [`mark_top_level_task`].
1967///
1968/// This utility can be okay in unit tests, where we're observing the internal behavior of
1969/// turbo-tasks, but otherwise, it is probably a mistake to call this function.
1970///
1971/// Calling this will allow eventually-consistent reads at the top-level, potentially exposing
1972/// incomplete computations and internal errors caused by eventual consistency that would've been
1973/// caught when the function was re-run. A strongly-consistent read re-runs parts of a task until
1974/// all of the dependencies have settled.
1975pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1976    if cfg!(debug_assertions) {
1977        CURRENT_TASK_STATE.with(|cell| {
1978            cell.write().unwrap().in_top_level_task = false;
1979        })
1980    }
1981}
1982
1983pub fn prevent_gc() {
1984    // TODO implement garbage collection
1985}
1986
1987pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1988    with_turbo_tasks(|tt| {
1989        let raw_vc = collectible.node.node;
1990        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1991    })
1992}
1993
1994pub(crate) async fn read_task_output(
1995    this: &dyn TurboTasksApi,
1996    id: TaskId,
1997    options: ReadOutputOptions,
1998) -> Result<RawVc> {
1999    loop {
2000        match this.try_read_task_output(id, options)? {
2001            Ok(result) => return Ok(result),
2002            Err(listener) => listener.await,
2003        }
2004    }
2005}
2006
2007/// A reference to a task's cell with methods that allow updating the contents
2008/// of the cell.
2009///
2010/// Mutations should not outside of the task that that owns this cell. Doing so
2011/// is a logic error, and may lead to incorrect caching behavior.
2012#[derive(Clone, Copy)]
2013pub struct CurrentCellRef {
2014    current_task: TaskId,
2015    index: CellId,
2016}
2017
2018type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2019
2020impl CurrentCellRef {
2021    /// Updates the cell if the given `functor` returns a value.
2022    fn conditional_update<T>(
2023        &self,
2024        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2025    ) where
2026        T: VcValueType,
2027    {
2028        self.conditional_update_with_shared_reference(|old_shared_reference| {
2029            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2030            let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2031            Some((
2032                SharedReference::new(triomphe::Arc::new(new_value)),
2033                updated_key_hashes,
2034                content_hash,
2035            ))
2036        })
2037    }
2038
2039    /// Updates the cell if the given `functor` returns a `SharedReference`.
2040    fn conditional_update_with_shared_reference(
2041        &self,
2042        functor: impl FnOnce(
2043            Option<&SharedReference>,
2044        ) -> Option<(
2045            SharedReference,
2046            Option<SmallVec<[u64; 2]>>,
2047            Option<CellHash>,
2048        )>,
2049    ) {
2050        let tt = turbo_tasks();
2051        let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2052        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2053        if let Some((update, updated_key_hashes, content_hash)) = update {
2054            tt.update_own_task_cell(
2055                self.current_task,
2056                self.index,
2057                CellContent(Some(update)),
2058                updated_key_hashes,
2059                content_hash,
2060                VerificationMode::EqualityCheck,
2061            )
2062        }
2063    }
2064
2065    /// Replace the current cell's content with `new_value` if the current content is not equal by
2066    /// value with the existing content.
2067    ///
2068    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
2069    ///
2070    /// Take this example of a custom equality implementation on a transparent wrapper type:
2071    ///
2072    /// ```
2073    /// #[turbo_tasks::value(transparent, eq = "manual")]
2074    /// struct Wrapper(Vec<u32>);
2075    ///
2076    /// impl PartialEq for Wrapper {
2077    ///     fn eq(&self, other: Wrapper) {
2078    ///         // Example: order doesn't matter for equality
2079    ///         let (mut this, mut other) = (self.clone(), other.clone());
2080    ///         this.sort_unstable();
2081    ///         other.sort_unstable();
2082    ///         this == other
2083    ///     }
2084    /// }
2085    ///
2086    /// impl Eq for Wrapper {}
2087    /// ```
2088    ///
2089    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2090    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2091    ///
2092    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2093    /// just forwards to the inner value's [`PartialEq`].
2094    ///
2095    /// If you already have a `SharedReference`, consider calling
2096    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2097    /// object.
2098    pub fn compare_and_update<T>(&self, new_value: T)
2099    where
2100        T: PartialEq + VcValueType,
2101    {
2102        self.conditional_update(|old_value| {
2103            if let Some(old_value) = old_value
2104                && old_value == &new_value
2105            {
2106                return None;
2107            }
2108            Some((new_value, None, None))
2109        });
2110    }
2111
2112    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2113    /// equal by value with the existing content.
2114    ///
2115    /// If you already have a `SharedReference`, this is a faster version of
2116    /// [`CurrentCellRef::compare_and_update`].
2117    ///
2118    /// The value should be stored in [`SharedReference`] using the type `T`.
2119    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2120    where
2121        T: VcValueType + PartialEq,
2122    {
2123        self.conditional_update_with_shared_reference(|old_sr| {
2124            if let Some(old_sr) = old_sr {
2125                let old_value = extract_sr_value::<T>(old_sr);
2126                let new_value = extract_sr_value::<T>(&new_shared_reference);
2127                if old_value == new_value {
2128                    return None;
2129                }
2130            }
2131            Some((new_shared_reference, None, None))
2132        });
2133    }
2134
2135    /// Replace the current cell's content if the new value is different.
2136    ///
2137    /// Like [`Self::compare_and_update`], but also computes and stores a hash of the value.
2138    /// When the cell's transient data is evicted, the stored hash enables the backend to detect
2139    /// whether the value actually changed without re-comparing values—avoiding unnecessary
2140    /// downstream invalidation.
2141    ///
2142    /// Requires `T: DeterministicHash` in addition to `T: PartialEq`.
2143    pub fn hashed_compare_and_update<T>(&self, new_value: T)
2144    where
2145        T: PartialEq + DeterministicHash + VcValueType,
2146    {
2147        self.conditional_update(|old_value| {
2148            if let Some(old_value) = old_value
2149                && old_value == &new_value
2150            {
2151                return None;
2152            }
2153            let content_hash = hash_xxh3_hash128(&new_value);
2154            Some((new_value, None, Some(content_hash)))
2155        });
2156    }
2157
2158    /// Replace the current cell's content if the new value (from a pre-existing
2159    /// [`SharedReference`]) is different.
2160    ///
2161    /// Like [`Self::compare_and_update_with_shared_reference`], but also passes a hash
2162    /// for hash-based change detection when transient data has been evicted.
2163    pub fn hashed_compare_and_update_with_shared_reference<T>(
2164        &self,
2165        new_shared_reference: SharedReference,
2166    ) where
2167        T: VcValueType + PartialEq + DeterministicHash,
2168    {
2169        self.conditional_update_with_shared_reference(move |old_sr| {
2170            if let Some(old_sr) = old_sr {
2171                let old_value = extract_sr_value::<T>(old_sr);
2172                let new_value = extract_sr_value::<T>(&new_shared_reference);
2173                if old_value == new_value {
2174                    return None;
2175                }
2176            }
2177            let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2178            Some((new_shared_reference, None, Some(content_hash)))
2179        });
2180    }
2181
2182    /// See [`Self::compare_and_update`], but selectively update individual keys.
2183    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2184    where
2185        T: PartialEq + VcValueType,
2186        VcReadTarget<T>: KeyedEq,
2187        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2188    {
2189        self.conditional_update(|old_value| {
2190            let Some(old_value) = old_value else {
2191                return Some((new_value, None, None));
2192            };
2193            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2194            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2195            let updated_keys = old_value.different_keys(new_value_ref);
2196            if updated_keys.is_empty() {
2197                return None;
2198            }
2199            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2200            let updated_key_hashes = updated_keys
2201                .into_iter()
2202                .map(|key| FxBuildHasher.hash_one(key))
2203                .collect();
2204            Some((new_value, Some(updated_key_hashes), None))
2205        });
2206    }
2207
2208    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2209    /// keys.
2210    pub fn keyed_compare_and_update_with_shared_reference<T>(
2211        &self,
2212        new_shared_reference: SharedReference,
2213    ) where
2214        T: VcValueType + PartialEq,
2215        VcReadTarget<T>: KeyedEq,
2216        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2217    {
2218        self.conditional_update_with_shared_reference(|old_sr| {
2219            let Some(old_sr) = old_sr else {
2220                return Some((new_shared_reference, None, None));
2221            };
2222            let old_value = extract_sr_value::<T>(old_sr);
2223            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2224            let new_value = extract_sr_value::<T>(&new_shared_reference);
2225            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2226            let updated_keys = old_value.different_keys(new_value);
2227            if updated_keys.is_empty() {
2228                return None;
2229            }
2230            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2231            let updated_key_hashes = updated_keys
2232                .into_iter()
2233                .map(|key| FxBuildHasher.hash_one(key))
2234                .collect();
2235            Some((new_shared_reference, Some(updated_key_hashes), None))
2236        });
2237    }
2238
2239    /// Unconditionally updates the content of the cell.
2240    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2241    where
2242        T: VcValueType,
2243    {
2244        let tt = turbo_tasks();
2245        tt.update_own_task_cell(
2246            self.current_task,
2247            self.index,
2248            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2249            None,
2250            None,
2251            verification_mode,
2252        )
2253    }
2254
2255    /// A faster version of [`Self::update`] if you already have a
2256    /// [`SharedReference`].
2257    ///
2258    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2259    /// by identity, no update is performed.
2260    ///
2261    /// The value should be stored in [`SharedReference`] using the type `T`.
2262    pub fn update_with_shared_reference(
2263        &self,
2264        shared_ref: SharedReference,
2265        verification_mode: VerificationMode,
2266    ) {
2267        let tt = turbo_tasks();
2268        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2269            let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2270            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2271                // pointer equality (not value equality)
2272                shared_ref_exp != shared_ref
2273            } else {
2274                true
2275            }
2276        } else {
2277            true
2278        };
2279        if update {
2280            tt.update_own_task_cell(
2281                self.current_task,
2282                self.index,
2283                CellContent(Some(shared_ref)),
2284                None,
2285                None,
2286                verification_mode,
2287            )
2288        }
2289    }
2290}
2291
2292impl From<CurrentCellRef> for RawVc {
2293    fn from(cell: CurrentCellRef) -> Self {
2294        RawVc::TaskCell(cell.current_task, cell.index)
2295    }
2296}
2297
2298fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2299    sr.0.downcast_ref::<T>()
2300        .expect("cannot update SharedReference of different type")
2301}
2302
2303pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2304    find_cell_by_id(T::get_value_type_id())
2305}
2306
2307pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2308    CURRENT_TASK_STATE.with(|ts| {
2309        let current_task = current_task("celling turbo_tasks values");
2310        let mut ts = ts.write().unwrap();
2311        let map = ts.cell_counters.as_mut().unwrap();
2312        let current_index = map.entry(ty).or_default();
2313        let index = *current_index;
2314        *current_index += 1;
2315        CurrentCellRef {
2316            current_task,
2317            index: CellId { type_id: ty, index },
2318        }
2319    })
2320}
2321
2322pub(crate) async fn read_local_output(
2323    this: &dyn TurboTasksApi,
2324    execution_id: ExecutionId,
2325    local_task_id: LocalTaskId,
2326) -> Result<RawVc> {
2327    loop {
2328        match this.try_read_local_output(execution_id, local_task_id)? {
2329            Ok(raw_vc) => return Ok(raw_vc),
2330            Err(event_listener) => event_listener.await,
2331        }
2332    }
2333}