Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    panic::AssertUnwindSafe,
8    pin::Pin,
9    process::abort,
10    sync::{
11        Arc, Mutex, RwLock, Weak,
12        atomic::{AtomicBool, AtomicUsize, Ordering},
13    },
14    time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
31    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
32    VcValueTrait, VcValueType,
33    backend::{
34        Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35        TurboTasksExecutionError, TypedCellContent, VerificationMode,
36    },
37    capture_future::CaptureFuture,
38    dyn_task_inputs::StackDynTaskInputs,
39    event::{Event, EventListener},
40    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
41    id_factory::IdFactoryWithReuse,
42    keyed::KeyedEq,
43    local_task_tracker::LocalTaskTracker,
44    macro_helpers::NativeFunction,
45    message_queue::{CompilationEvent, CompilationEventQueue},
46    priority_runner::{Executor, PriorityRunner},
47    raw_vc::{CellId, RawVc},
48    registry,
49    serialization_invalidation::SerializationInvalidator,
50    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
51    task_statistics::TaskStatisticsApi,
52    trace::TraceRawVcs,
53    util::{IdFactory, StaticOrArc},
54};
55
56/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
57/// tasks from function calls.
58pub trait TurboTasksCallApi: Sync + Send {
59    /// Calls a native function with arguments. Resolves arguments when needed
60    /// with a wrapper task.
61    fn dynamic_call(
62        &self,
63        native_fn: &'static NativeFunction,
64        this: Option<RawVc>,
65        arg: &mut dyn StackDynTaskInputs,
66        persistence: TaskPersistence,
67    ) -> RawVc;
68    /// Call a native function with arguments.
69    /// All inputs must be resolved.
70    fn native_call(
71        &self,
72        native_fn: &'static NativeFunction,
73        this: Option<RawVc>,
74        arg: &mut dyn StackDynTaskInputs,
75        persistence: TaskPersistence,
76    ) -> RawVc;
77    /// Calls a trait method with arguments. First input is the `self` object.
78    /// Uses a wrapper task to resolve
79    fn trait_call(
80        &self,
81        trait_method: &'static TraitMethod,
82        this: RawVc,
83        arg: &mut dyn StackDynTaskInputs,
84        persistence: TaskPersistence,
85    ) -> RawVc;
86
87    fn run(
88        &self,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
91    fn run_once(
92        &self,
93        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
95    fn run_once_with_reason(
96        &self,
97        reason: StaticOrArc<dyn InvalidationReason>,
98        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
99    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
100    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
101
102    /// Sends a compilation event to subscribers.
103    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
104
105    /// Returns a human-readable name for the given task.
106    fn get_task_name(&self, task: TaskId) -> String;
107}
108
109/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
110/// context. Returned by the [`turbo_tasks`] helper function.
111///
112/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
113/// parameter.
114pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
115    fn invalidate(&self, task: TaskId);
116    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
117
118    fn invalidate_serialization(&self, task: TaskId);
119
120    fn try_read_task_output(
121        &self,
122        task: TaskId,
123        options: ReadOutputOptions,
124    ) -> Result<Result<RawVc, EventListener>>;
125
126    fn try_read_task_cell(
127        &self,
128        task: TaskId,
129        index: CellId,
130        options: ReadCellOptions,
131    ) -> Result<Result<TypedCellContent, EventListener>>;
132
133    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
134    /// task points to.
135    ///
136    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
137    /// recursively or in a loop.
138    ///
139    /// This does not accept a consistency argument, as you cannot control consistency of a read of
140    /// an operation owned by your own task. Strongly consistent reads are only allowed on
141    /// [`OperationVc`]s, which should never be local tasks.
142    ///
143    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
144    /// task to depend on itself.
145    ///
146    /// [`OperationVc`]: crate::OperationVc
147    fn try_read_local_output(
148        &self,
149        execution_id: ExecutionId,
150        local_task_id: LocalTaskId,
151    ) -> Result<Result<RawVc, EventListener>>;
152
153    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
154
155    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
156    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
157    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
158
159    /// INVALIDATION: Be careful with this, it will not track dependencies, so
160    /// using it could break cache invalidation.
161    fn try_read_own_task_cell(
162        &self,
163        current_task: TaskId,
164        index: CellId,
165    ) -> Result<TypedCellContent>;
166
167    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
168    fn update_own_task_cell(
169        &self,
170        task: TaskId,
171        index: CellId,
172        content: CellContent,
173        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
174        content_hash: Option<CellHash>,
175        verification_mode: VerificationMode,
176    );
177    fn mark_own_task_as_finished(&self, task: TaskId);
178
179    fn connect_task(&self, task: TaskId);
180
181    /// Wraps the given future in the current task.
182    ///
183    /// Beware: this method is not safe to use in production code. It is only intended for use in
184    /// tests and for debugging purposes.
185    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
186
187    fn task_statistics(&self) -> &TaskStatisticsApi;
188
189    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
190
191    fn subscribe_to_compilation_events(
192        &self,
193        event_types: Option<Vec<String>>,
194    ) -> Receiver<Arc<dyn CompilationEvent>>;
195
196    // Returns true if TurboTasks is configured to track dependencies.
197    fn is_tracking_dependencies(&self) -> bool;
198}
199
200/// A wrapper around a value that is unused.
201pub struct Unused<T> {
202    inner: T,
203}
204
205impl<T> Unused<T> {
206    /// Creates a new unused value.
207    ///
208    /// # Safety
209    ///
210    /// The wrapped value must not be used.
211    pub unsafe fn new_unchecked(inner: T) -> Self {
212        Self { inner }
213    }
214
215    /// Get the inner value, without consuming the `Unused` wrapper.
216    ///
217    /// # Safety
218    ///
219    /// The user need to make sure that the value stays unused.
220    pub unsafe fn get_unchecked(&self) -> &T {
221        &self.inner
222    }
223
224    /// Unwraps the value, consuming the `Unused` wrapper.
225    pub fn into(self) -> T {
226        self.inner
227    }
228}
229
230/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
231pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
232    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
233
234    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
235    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
236    /// # Safety
237    ///
238    /// The caller must ensure that the task id is not used anymore.
239    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
240    /// # Safety
241    ///
242    /// The caller must ensure that the task id is not used anymore.
243    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
244
245    /// Schedule a task for execution.
246    fn schedule(&self, task: TaskId, priority: TaskPriority);
247
248    /// Returns the priority of the current task.
249    fn get_current_task_priority(&self) -> TaskPriority;
250
251    /// Schedule a foreground backend job for execution.
252    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
253
254    /// Schedule a background backend job for execution.
255    ///
256    /// Background jobs are not counted towards activeness of the system. The system is considered
257    /// idle even with active background jobs.
258    fn schedule_backend_background_job(&self, job: B::BackendJob);
259
260    /// Returns the duration from the start of the program to the given instant.
261    fn program_duration_until(&self, instant: Instant) -> Duration;
262
263    /// Returns true if the system is idle.
264    fn is_idle(&self) -> bool;
265
266    /// Returns a reference to the backend.
267    fn backend(&self) -> &B;
268}
269
270#[allow(clippy::manual_non_exhaustive)]
271pub struct UpdateInfo {
272    pub duration: Duration,
273    pub tasks: usize,
274    pub reasons: InvalidationReasonSet,
275    #[allow(dead_code)]
276    placeholder_for_future_fields: (),
277}
278
279#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
280pub enum TaskPersistence {
281    /// Tasks that may be persisted across sessions using serialization.
282    Persistent,
283
284    /// Tasks that will be persisted in memory for the life of this session, but won't persist
285    /// between sessions.
286    ///
287    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
288    /// type [`TransientValue`][crate::value::TransientValue] or
289    /// [`TransientInstance`][crate::value::TransientInstance].
290    Transient,
291}
292
293impl Display for TaskPersistence {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        match self {
296            TaskPersistence::Persistent => write!(f, "persistent"),
297            TaskPersistence::Transient => write!(f, "transient"),
298        }
299    }
300}
301
302#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
303pub enum ReadConsistency {
304    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
305    /// may later trigger re-computation.
306    #[default]
307    Eventual,
308    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
309    /// the cost of slower reads.
310    ///
311    /// Top-level code that returns data to the user should use strongly consistent reads.
312    Strong,
313}
314
315#[derive(Clone, Copy, Debug, Eq, PartialEq)]
316pub enum ReadCellTracking {
317    /// Reads are tracked as dependencies of the current task.
318    Tracked {
319        /// The key used for the dependency
320        key: Option<u64>,
321    },
322    /// The read is only tracked when there is an error, otherwise it is untracked.
323    ///
324    /// INVALIDATION: Be careful with this, it will not track dependencies, so
325    /// using it could break cache invalidation.
326    TrackOnlyError,
327    /// The read is not tracked as a dependency of the current task.
328    ///
329    /// INVALIDATION: Be careful with this, it will not track dependencies, so
330    /// using it could break cache invalidation.
331    Untracked,
332}
333
334impl ReadCellTracking {
335    pub fn should_track(&self, is_err: bool) -> bool {
336        match self {
337            ReadCellTracking::Tracked { .. } => true,
338            ReadCellTracking::TrackOnlyError => is_err,
339            ReadCellTracking::Untracked => false,
340        }
341    }
342
343    pub fn key(&self) -> Option<u64> {
344        match self {
345            ReadCellTracking::Tracked { key } => *key,
346            ReadCellTracking::TrackOnlyError => None,
347            ReadCellTracking::Untracked => None,
348        }
349    }
350}
351
352impl Default for ReadCellTracking {
353    fn default() -> Self {
354        ReadCellTracking::Tracked { key: None }
355    }
356}
357
358impl Display for ReadCellTracking {
359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360        match self {
361            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
362            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
363            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
364            ReadCellTracking::Untracked => write!(f, "untracked"),
365        }
366    }
367}
368
369#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
370pub enum ReadTracking {
371    /// Reads are tracked as dependencies of the current task.
372    #[default]
373    Tracked,
374    /// The read is only tracked when there is an error, otherwise it is untracked.
375    ///
376    /// INVALIDATION: Be careful with this, it will not track dependencies, so
377    /// using it could break cache invalidation.
378    TrackOnlyError,
379    /// The read is not tracked as a dependency of the current task.
380    ///
381    /// INVALIDATION: Be careful with this, it will not track dependencies, so
382    /// using it could break cache invalidation.
383    Untracked,
384}
385
386impl ReadTracking {
387    pub fn should_track(&self, is_err: bool) -> bool {
388        match self {
389            ReadTracking::Tracked => true,
390            ReadTracking::TrackOnlyError => is_err,
391            ReadTracking::Untracked => false,
392        }
393    }
394}
395
396impl Display for ReadTracking {
397    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
398        match self {
399            ReadTracking::Tracked => write!(f, "tracked"),
400            ReadTracking::TrackOnlyError => write!(f, "track only error"),
401            ReadTracking::Untracked => write!(f, "untracked"),
402        }
403    }
404}
405
406#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
407pub enum TaskPriority {
408    #[default]
409    Initial,
410    Invalidation {
411        priority: Reverse<u32>,
412    },
413}
414
415impl TaskPriority {
416    pub fn invalidation(priority: u32) -> Self {
417        Self::Invalidation {
418            priority: Reverse(priority),
419        }
420    }
421
422    pub fn initial() -> Self {
423        Self::Initial
424    }
425
426    pub fn leaf() -> Self {
427        Self::Invalidation {
428            priority: Reverse(0),
429        }
430    }
431
432    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
433        match self {
434            TaskPriority::Initial => parent_priority,
435            TaskPriority::Invalidation { priority } => {
436                if let TaskPriority::Invalidation {
437                    priority: parent_priority,
438                } = parent_priority
439                    && priority.0 < parent_priority.0
440                {
441                    Self::Invalidation {
442                        priority: Reverse(parent_priority.0.saturating_add(1)),
443                    }
444                } else {
445                    *self
446                }
447            }
448        }
449    }
450}
451
452impl Display for TaskPriority {
453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454        match self {
455            TaskPriority::Initial => write!(f, "initial"),
456            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
457        }
458    }
459}
460
461enum ScheduledTask {
462    Task {
463        task_id: TaskId,
464        span: Span,
465    },
466    LocalTask {
467        ty: LocalTaskSpec,
468        persistence: TaskPersistence,
469        local_task_id: LocalTaskId,
470        global_task_state: Arc<RwLock<CurrentTaskState>>,
471        span: Span,
472    },
473}
474
475pub struct TurboTasks<B: Backend + 'static> {
476    this: Weak<Self>,
477    backend: B,
478    task_id_factory: IdFactoryWithReuse<TaskId>,
479    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
480    execution_id_factory: IdFactory<ExecutionId>,
481    stopped: AtomicBool,
482    currently_scheduled_foreground_jobs: AtomicUsize,
483    currently_scheduled_background_jobs: AtomicUsize,
484    scheduled_tasks: AtomicUsize,
485    priority_runner:
486        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
487    start: Mutex<Option<Instant>>,
488    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
489    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
490    event_foreground_start: Event,
491    /// Event that is triggered when all foreground jobs are done
492    /// (currently_scheduled_foreground_jobs becomes zero)
493    event_foreground_done: Event,
494    /// Event that is triggered when all background jobs are done
495    event_background_done: Event,
496    program_start: Instant,
497    compilation_events: CompilationEventQueue,
498}
499
500/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
501/// all share the same non-local task state.
502///
503/// A non-local task is one that:
504///
505/// - Has a unique task id.
506/// - Is potentially cached.
507/// - The backend is aware of.
508struct CurrentTaskState {
509    task_id: Option<TaskId>,
510    execution_id: ExecutionId,
511    priority: TaskPriority,
512
513    /// True if the current task has state in cells (interior mutability).
514    /// Only tracked when verify_determinism feature is enabled.
515    #[cfg(feature = "verify_determinism")]
516    stateful: bool,
517
518    /// True if the current task uses an external invalidator
519    has_invalidator: bool,
520
521    /// True if we're in a top-level task (e.g. `.run_once(...)` or `.run(...)`).
522    /// Eventually consistent reads are not allowed in top-level tasks.
523    in_top_level_task: bool,
524
525    /// Tracks how many cells of each type has been allocated so far during this task execution.
526    /// When a task is re-executed, the cell count may not match the existing cell vec length.
527    ///
528    /// This is taken (and becomes `None`) during teardown of a task.
529    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
530
531    /// Tracks execution of Local tasks (and detached test futures) created during this global
532    /// task's execution.
533    local_tasks: LocalTaskTracker,
534}
535
536impl CurrentTaskState {
537    fn new(
538        task_id: TaskId,
539        execution_id: ExecutionId,
540        priority: TaskPriority,
541        in_top_level_task: bool,
542    ) -> Self {
543        Self {
544            task_id: Some(task_id),
545            execution_id,
546            priority,
547            #[cfg(feature = "verify_determinism")]
548            stateful: false,
549            has_invalidator: false,
550            in_top_level_task,
551            cell_counters: Some(AutoMap::default()),
552            local_tasks: LocalTaskTracker::new(),
553        }
554    }
555
556    fn new_temporary(
557        execution_id: ExecutionId,
558        priority: TaskPriority,
559        in_top_level_task: bool,
560    ) -> Self {
561        Self {
562            task_id: None,
563            execution_id,
564            priority,
565            #[cfg(feature = "verify_determinism")]
566            stateful: false,
567            has_invalidator: false,
568            in_top_level_task,
569            cell_counters: None,
570            local_tasks: LocalTaskTracker::new(),
571        }
572    }
573
574    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
575        if self.execution_id != expected_execution_id {
576            panic!(
577                "Local tasks can only be scheduled/awaited within the same execution of the \
578                 parent task that created them"
579            );
580        }
581    }
582}
583
584// TODO implement our own thread pool and make these thread locals instead
585task_local! {
586    /// The current TurboTasks instance
587    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
588
589    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
590
591    /// Temporarily suppresses the eventual consistency check in top-level tasks.
592    /// This is used by strongly consistent reads to allow them to succeed in top-level tasks.
593    /// This is NOT shared across local tasks (unlike CURRENT_TASK_STATE), so it's safe
594    /// to set/unset without race conditions.
595    pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
596}
597
598impl<B: Backend + 'static> TurboTasks<B> {
599    // TODO better lifetime management for turbo tasks
600    // consider using unsafe for the task_local turbo tasks
601    // that should be safe as long tasks can't outlife turbo task
602    // so we probably want to make sure that all tasks are joined
603    // when trying to drop turbo tasks
604    pub fn new(backend: B) -> Arc<Self> {
605        let task_id_factory = IdFactoryWithReuse::new(
606            TaskId::MIN,
607            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
608        );
609        let transient_task_id_factory =
610            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
611        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
612        let this = Arc::new_cyclic(|this| Self {
613            this: this.clone(),
614            backend,
615            task_id_factory,
616            transient_task_id_factory,
617            execution_id_factory,
618            stopped: AtomicBool::new(false),
619            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
620            currently_scheduled_background_jobs: AtomicUsize::new(0),
621            scheduled_tasks: AtomicUsize::new(0),
622            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
623            start: Default::default(),
624            aggregated_update: Default::default(),
625            event_foreground_done: Event::new(|| {
626                || "TurboTasks::event_foreground_done".to_string()
627            }),
628            event_foreground_start: Event::new(|| {
629                || "TurboTasks::event_foreground_start".to_string()
630            }),
631            event_background_done: Event::new(|| {
632                || "TurboTasks::event_background_done".to_string()
633            }),
634            program_start: Instant::now(),
635            compilation_events: CompilationEventQueue::default(),
636        });
637        this.backend.startup(&*this);
638        this
639    }
640
641    pub fn pin(&self) -> Arc<Self> {
642        self.this.upgrade().unwrap()
643    }
644
645    /// Creates a new root task
646    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
647    where
648        T: ?Sized,
649        F: Fn() -> Fut + Send + Sync + Clone + 'static,
650        Fut: Future<Output = Result<Vc<T>>> + Send,
651    {
652        let id = self.backend.create_transient_task(
653            TransientTaskType::Root(Box::new(move || {
654                let functor = functor.clone();
655                Box::pin(async move {
656                    mark_top_level_task();
657                    let raw_vc = functor().await?.node;
658                    raw_vc.to_non_local().await
659                })
660            })),
661            self,
662        );
663        self.schedule(id, TaskPriority::initial());
664        id
665    }
666
667    pub fn dispose_root_task(&self, task_id: TaskId) {
668        self.backend.dispose_root_task(task_id, self);
669    }
670
671    // TODO make sure that all dependencies settle before reading them
672    /// Creates a new root task, that is only executed once.
673    /// Dependencies will not invalidate the task.
674    #[track_caller]
675    fn spawn_once_task<T, Fut>(&self, future: Fut)
676    where
677        T: ?Sized,
678        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
679    {
680        let id = self.backend.create_transient_task(
681            TransientTaskType::Once(Box::pin(async move {
682                mark_top_level_task();
683                let raw_vc = future.await?.node;
684                raw_vc.to_non_local().await
685            })),
686            self,
687        );
688        self.schedule(id, TaskPriority::initial());
689    }
690
691    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
692        &self,
693        future: impl Future<Output = Result<T>> + Send + 'static,
694    ) -> Result<T> {
695        let (tx, rx) = tokio::sync::oneshot::channel();
696        self.spawn_once_task(async move {
697            mark_top_level_task();
698            let result = future.await;
699            tx.send(result)
700                .map_err(|_| anyhow!("unable to send result"))?;
701            Ok(Completion::new())
702        });
703
704        rx.await?
705    }
706
707    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
708    pub async fn run<T: TraceRawVcs + Send + 'static>(
709        &self,
710        future: impl Future<Output = Result<T>> + Send + 'static,
711    ) -> Result<T, TurboTasksExecutionError> {
712        self.begin_foreground_job();
713        // it's okay for execution ids to overflow and wrap, they're just used for an assert
714        let execution_id = self.execution_id_factory.wrapping_get();
715        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
716            execution_id,
717            TaskPriority::initial(),
718            true, // in_top_level_task
719        )));
720
721        let result = TURBO_TASKS
722            .scope(
723                self.pin(),
724                CURRENT_TASK_STATE.scope(current_task_state, async {
725                    let result = CaptureFuture::new(future).await;
726
727                    // wait for all spawned local tasks using `local` to finish
728                    wait_for_local_tasks().await;
729
730                    match result {
731                        Ok(Ok(value)) => Ok(value),
732                        Ok(Err(err)) => Err(err.into()),
733                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
734                    }
735                }),
736            )
737            .await;
738        self.finish_foreground_job();
739        result
740    }
741
742    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
743        let this = self.pin();
744        tokio::spawn(async move {
745            this.pin()
746                .run_once(async move {
747                    this.finish_foreground_job();
748                    future.await;
749                    this.begin_foreground_job();
750                    Ok(())
751                })
752                .await
753                .unwrap()
754        });
755    }
756
757    pub(crate) fn native_call(
758        &self,
759        native_fn: &'static NativeFunction,
760        this: Option<RawVc>,
761        arg: &mut dyn StackDynTaskInputs,
762        persistence: TaskPersistence,
763    ) -> RawVc {
764        RawVc::TaskOutput(self.backend.get_or_create_task(
765            native_fn,
766            this,
767            arg,
768            current_task_if_available("turbo_function calls"),
769            persistence,
770            self,
771        ))
772    }
773
774    pub fn dynamic_call(
775        &self,
776        native_fn: &'static NativeFunction,
777        this: Option<RawVc>,
778        arg: &mut dyn StackDynTaskInputs,
779        persistence: TaskPersistence,
780    ) -> RawVc {
781        if this.is_none_or(|this| this.is_resolved())
782            && native_fn.arg_meta.is_resolved(arg.as_ref())
783        {
784            return self.native_call(native_fn, this, arg, persistence);
785        }
786        // Need async resolution — must move the arg to the heap now
787        let arg = arg.take_box();
788        let task_type = LocalTaskSpec {
789            task_type: LocalTaskType::ResolveNative { native_fn },
790            this,
791            arg,
792        };
793        self.schedule_local_task(task_type, persistence)
794    }
795
796    pub fn trait_call(
797        &self,
798        trait_method: &'static TraitMethod,
799        this: RawVc,
800        arg: &mut dyn StackDynTaskInputs,
801        persistence: TaskPersistence,
802    ) -> RawVc {
803        // avoid creating a wrapper task if self is already resolved
804        // for resolved cells we already know the value type so we can lookup the
805        // function
806        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
807            match registry::get_value_type(type_id).get_trait_method(trait_method) {
808                Some(native_fn) => {
809                    if let Some(filter) = native_fn.arg_meta.filter_owned {
810                        let mut arg = (filter)(arg);
811                        return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
812                    } else {
813                        return self.dynamic_call(native_fn, Some(this), arg, persistence);
814                    }
815                }
816                None => {
817                    // We are destined to fail at this point, but we just retry resolution in the
818                    // local task since we cannot report an error from here.
819                    // TODO: A panic seems appropriate since the immediate caller is to blame
820                }
821            }
822        }
823
824        // create a wrapper task to resolve all inputs
825        let task_type = LocalTaskSpec {
826            task_type: LocalTaskType::ResolveTrait { trait_method },
827            this: Some(this),
828            arg: arg.take_box(),
829        };
830
831        self.schedule_local_task(task_type, persistence)
832    }
833
834    #[track_caller]
835    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
836        self.begin_foreground_job();
837        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
838
839        self.priority_runner.schedule(
840            &self.pin(),
841            ScheduledTask::Task {
842                task_id,
843                span: Span::current(),
844            },
845            priority,
846        );
847    }
848
849    fn schedule_local_task(
850        &self,
851        ty: LocalTaskSpec,
852        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
853        persistence: TaskPersistence,
854    ) -> RawVc {
855        let task_type = ty.task_type;
856        let (global_task_state, execution_id, priority, local_task_id) =
857            CURRENT_TASK_STATE.with(|gts| {
858                let mut gts_write = gts.write().unwrap();
859                let local_task_id = gts_write.local_tasks.create(task_type);
860                (
861                    Arc::clone(gts),
862                    gts_write.execution_id,
863                    gts_write.priority,
864                    local_task_id,
865                )
866            });
867
868        self.priority_runner.schedule(
869            &self.pin(),
870            ScheduledTask::LocalTask {
871                ty,
872                persistence,
873                local_task_id,
874                global_task_state,
875                span: Span::current(),
876            },
877            priority,
878        );
879
880        RawVc::LocalOutput(execution_id, local_task_id, persistence)
881    }
882
883    fn begin_foreground_job(&self) {
884        if self
885            .currently_scheduled_foreground_jobs
886            .fetch_add(1, Ordering::AcqRel)
887            == 0
888        {
889            *self.start.lock().unwrap() = Some(Instant::now());
890            self.event_foreground_start.notify(usize::MAX);
891            self.backend.idle_end(self);
892        }
893    }
894
895    fn finish_foreground_job(&self) {
896        if self
897            .currently_scheduled_foreground_jobs
898            .fetch_sub(1, Ordering::AcqRel)
899            == 1
900        {
901            self.backend.idle_start(self);
902            // That's not super race-condition-safe, but it's only for
903            // statistical reasons
904            let total = self.scheduled_tasks.load(Ordering::Acquire);
905            self.scheduled_tasks.store(0, Ordering::Release);
906            if let Some(start) = *self.start.lock().unwrap() {
907                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
908                if let Some(update) = update.as_mut() {
909                    update.0 += start.elapsed();
910                    update.1 += total;
911                } else {
912                    *update = Some((start.elapsed(), total));
913                }
914            }
915            self.event_foreground_done.notify(usize::MAX);
916        }
917    }
918
919    fn begin_background_job(&self) {
920        self.currently_scheduled_background_jobs
921            .fetch_add(1, Ordering::Relaxed);
922    }
923
924    fn finish_background_job(&self) {
925        if self
926            .currently_scheduled_background_jobs
927            .fetch_sub(1, Ordering::Relaxed)
928            == 1
929        {
930            self.event_background_done.notify(usize::MAX);
931        }
932    }
933
934    pub fn get_in_progress_count(&self) -> usize {
935        self.currently_scheduled_foreground_jobs
936            .load(Ordering::Acquire)
937    }
938
939    /// Waits for the given task to finish executing. This works by performing an untracked read,
940    /// and discarding the value of the task output.
941    ///
942    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
943    /// before all dependencies have completely settled.
944    ///
945    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
946    /// to fully settle before returning.
947    ///
948    /// As this function is typically called in top-level code that waits for results to be ready
949    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
950    pub async fn wait_task_completion(
951        &self,
952        id: TaskId,
953        consistency: ReadConsistency,
954    ) -> Result<()> {
955        read_task_output(
956            self,
957            id,
958            ReadOutputOptions {
959                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
960                tracking: ReadTracking::Untracked,
961                consistency,
962            },
963        )
964        .await?;
965        Ok(())
966    }
967
968    /// Returns [UpdateInfo] with all updates aggregated over a given duration
969    /// (`aggregation`). Will wait until an update happens.
970    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
971        self.aggregated_update_info(aggregation, Duration::MAX)
972            .await
973            .unwrap()
974    }
975
976    /// Returns [UpdateInfo] with all updates aggregated over a given duration
977    /// (`aggregation`). Will only return None when the timeout is reached while
978    /// waiting for the first update.
979    pub async fn aggregated_update_info(
980        &self,
981        aggregation: Duration,
982        timeout: Duration,
983    ) -> Option<UpdateInfo> {
984        let listener = self
985            .event_foreground_done
986            .listen_with_note(|| || "wait for update info".to_string());
987        let wait_for_finish = {
988            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
989            if aggregation.is_zero() {
990                if let Some((duration, tasks)) = update.take() {
991                    return Some(UpdateInfo {
992                        duration,
993                        tasks,
994                        reasons: take(reason_set),
995                        placeholder_for_future_fields: (),
996                    });
997                } else {
998                    true
999                }
1000            } else {
1001                update.is_none()
1002            }
1003        };
1004        if wait_for_finish {
1005            if timeout == Duration::MAX {
1006                // wait for finish
1007                listener.await;
1008            } else {
1009                // wait for start, then wait for finish or timeout
1010                let start_listener = self
1011                    .event_foreground_start
1012                    .listen_with_note(|| || "wait for update info".to_string());
1013                if self
1014                    .currently_scheduled_foreground_jobs
1015                    .load(Ordering::Acquire)
1016                    == 0
1017                {
1018                    start_listener.await;
1019                } else {
1020                    drop(start_listener);
1021                }
1022                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1023                    // Timeout
1024                    return None;
1025                }
1026            }
1027        }
1028        if !aggregation.is_zero() {
1029            loop {
1030                select! {
1031                    () = tokio::time::sleep(aggregation) => {
1032                        break;
1033                    }
1034                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1035                        // Resets the sleep
1036                    }
1037                }
1038            }
1039        }
1040        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1041        if let Some((duration, tasks)) = update.take() {
1042            Some(UpdateInfo {
1043                duration,
1044                tasks,
1045                reasons: take(reason_set),
1046                placeholder_for_future_fields: (),
1047            })
1048        } else {
1049            panic!("aggregated_update_info must not called concurrently")
1050        }
1051    }
1052
1053    pub async fn wait_background_done(&self) {
1054        let listener = self.event_background_done.listen();
1055        if self
1056            .currently_scheduled_background_jobs
1057            .load(Ordering::Acquire)
1058            != 0
1059        {
1060            listener.await;
1061        }
1062    }
1063
1064    pub async fn stop_and_wait(&self) {
1065        turbo_tasks_future_scope(self.pin(), async move {
1066            self.backend.stopping(self);
1067            self.stopped.store(true, Ordering::Release);
1068            {
1069                let listener = self
1070                    .event_foreground_done
1071                    .listen_with_note(|| || "wait for stop".to_string());
1072                if self
1073                    .currently_scheduled_foreground_jobs
1074                    .load(Ordering::Acquire)
1075                    != 0
1076                {
1077                    listener.await;
1078                }
1079            }
1080            {
1081                let listener = self.event_background_done.listen();
1082                if self
1083                    .currently_scheduled_background_jobs
1084                    .load(Ordering::Acquire)
1085                    != 0
1086                {
1087                    listener.await;
1088                }
1089            }
1090            self.backend.stop(self);
1091        })
1092        .await;
1093    }
1094
1095    #[track_caller]
1096    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1097    where
1098        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1099        T::CallOnceFuture: Send,
1100    {
1101        let mut this = self.pin();
1102        this.begin_foreground_job();
1103        tokio::spawn(
1104            TURBO_TASKS
1105                .scope(this.clone(), async move {
1106                    if !this.stopped.load(Ordering::Acquire) {
1107                        this = func(this.clone()).await;
1108                    }
1109                    this.finish_foreground_job();
1110                })
1111                .in_current_span(),
1112        );
1113    }
1114
1115    #[track_caller]
1116    pub(crate) fn schedule_background_job<T>(&self, func: T)
1117    where
1118        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1119        T::CallOnceFuture: Send,
1120    {
1121        let mut this = self.pin();
1122        self.begin_background_job();
1123        tokio::spawn(
1124            TURBO_TASKS
1125                .scope(this.clone(), async move {
1126                    if !this.stopped.load(Ordering::Acquire) {
1127                        this = func(this).await;
1128                    }
1129                    this.finish_background_job();
1130                })
1131                .in_current_span(),
1132        );
1133    }
1134
1135    fn finish_current_task_state(&self) -> FinishedTaskState {
1136        CURRENT_TASK_STATE.with(|cell| {
1137            let current_task_state = &*cell.write().unwrap();
1138            FinishedTaskState {
1139                #[cfg(feature = "verify_determinism")]
1140                stateful: current_task_state.stateful,
1141                has_invalidator: current_task_state.has_invalidator,
1142            }
1143        })
1144    }
1145
1146    pub fn backend(&self) -> &B {
1147        &self.backend
1148    }
1149}
1150
1151struct TurboTasksExecutor;
1152
1153/// Run a future and abort the process if a panic is reported
1154///
1155/// Turbtasks catches panics from user code and propagates throught the task tree, but if it happens
1156/// as part of state management we have to abort
1157async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1158    match AssertUnwindSafe(f).catch_unwind().await {
1159        Ok(r) => r,
1160        Err(_) => {
1161            eprintln!(
1162                "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1163                 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1164                 https://github.com/vercel/next.js/discussions and include the panic message \
1165                 and stack trace above.\n\nAborting."
1166            );
1167            abort();
1168        }
1169    }
1170}
1171
1172impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1173    type Future = impl Future<Output = ()> + Send + 'static;
1174
1175    fn execute(
1176        &self,
1177        this: &Arc<TurboTasks<B>>,
1178        scheduled_task: ScheduledTask,
1179        priority: TaskPriority,
1180    ) -> Self::Future {
1181        match scheduled_task {
1182            ScheduledTask::Task { task_id, span } => {
1183                let this2 = this.clone();
1184                let this = this.clone();
1185                let future = async move {
1186                    abort_on_panic(async {
1187                        // it's okay for execution ids to overflow and wrap, they're just used
1188                        // for an assert
1189                        let execution_id = this.execution_id_factory.wrapping_get();
1190                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1191                            task_id,
1192                            execution_id,
1193                            priority,
1194                            false, // in_top_level_task
1195                        )));
1196                        let single_execution_future = async {
1197                            if this.stopped.load(Ordering::Acquire) {
1198                                this.backend.task_execution_canceled(task_id, &*this);
1199                                return None;
1200                            }
1201
1202                            let TaskExecutionSpec { future, span } = this
1203                                .backend
1204                                .try_start_task_execution(task_id, priority, &*this)?;
1205
1206                            async {
1207                                let result = CaptureFuture::new(future).await;
1208
1209                                // wait for all spawned local tasks using `local` to finish
1210                                wait_for_local_tasks().await;
1211
1212                                let result = match result {
1213                                    Ok(Ok(raw_vc)) => {
1214                                        // This is safe because we waited for all local tasks to
1215                                        // complete above
1216                                        raw_vc
1217                                            .to_non_local_unchecked_sync(&*this)
1218                                            .map_err(|err| err.into())
1219                                    }
1220                                    Ok(Err(err)) => Err(err.into()),
1221                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1222                                };
1223
1224                                let finished_state = this.finish_current_task_state();
1225                                let cell_counters = CURRENT_TASK_STATE
1226                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1227                                this.backend.task_execution_completed(
1228                                    task_id,
1229                                    result,
1230                                    &cell_counters,
1231                                    #[cfg(feature = "verify_determinism")]
1232                                    finished_state.stateful,
1233                                    finished_state.has_invalidator,
1234                                    &*this,
1235                                )
1236                            }
1237                            .instrument(span)
1238                            .await
1239                        };
1240                        if let Some(stale_priority) = CURRENT_TASK_STATE
1241                            .scope(current_task_state, single_execution_future)
1242                            .await
1243                        {
1244                            // Task was stale; re-schedule at the correct invalidation priority so
1245                            // other tasks can run in the right priority order.
1246                            this.schedule(task_id, stale_priority);
1247                        }
1248                        this.finish_foreground_job();
1249                    })
1250                    .await
1251                };
1252
1253                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1254            }
1255            ScheduledTask::LocalTask {
1256                ty,
1257                persistence,
1258                local_task_id,
1259                global_task_state,
1260                span,
1261            } => {
1262                let this2 = this.clone();
1263                let this = this.clone();
1264                let task_type = ty.task_type;
1265                let future = async move {
1266                    let span = match &ty.task_type {
1267                        LocalTaskType::ResolveNative { native_fn } => {
1268                            native_fn.resolve_span(priority)
1269                        }
1270                        LocalTaskType::ResolveTrait { trait_method } => {
1271                            trait_method.resolve_span(priority)
1272                        }
1273                    };
1274                    abort_on_panic(
1275                        async move {
1276                            let result = match ty.task_type {
1277                                LocalTaskType::ResolveNative { native_fn } => {
1278                                    LocalTaskType::run_resolve_native(
1279                                        native_fn,
1280                                        ty.this,
1281                                        &*ty.arg,
1282                                        persistence,
1283                                        this,
1284                                    )
1285                                    .await
1286                                }
1287                                LocalTaskType::ResolveTrait { trait_method } => {
1288                                    LocalTaskType::run_resolve_trait(
1289                                        trait_method,
1290                                        ty.this.unwrap(),
1291                                        &*ty.arg,
1292                                        persistence,
1293                                        this,
1294                                    )
1295                                    .await
1296                                }
1297                            };
1298
1299                            let output = match result {
1300                                Ok(raw_vc) => OutputContent::Link(raw_vc),
1301                                Err(err) => OutputContent::Error(
1302                                    TurboTasksExecutionError::from(err)
1303                                        .with_local_task_context(task_type.to_string()),
1304                                ),
1305                            };
1306
1307                            CURRENT_TASK_STATE.with(move |gts| {
1308                                gts.write()
1309                                    .unwrap()
1310                                    .local_tasks
1311                                    .complete(local_task_id, output);
1312                            });
1313                        }
1314                        .instrument(span),
1315                    )
1316                    .await
1317                };
1318                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1319
1320                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1321            }
1322        }
1323    }
1324}
1325
1326struct FinishedTaskState {
1327    /// True if the task has state in cells (interior mutability).
1328    /// Only tracked when verify_determinism feature is enabled.
1329    #[cfg(feature = "verify_determinism")]
1330    stateful: bool,
1331
1332    /// True if the task uses an external invalidator
1333    has_invalidator: bool,
1334}
1335
1336impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1337    fn dynamic_call(
1338        &self,
1339        native_fn: &'static NativeFunction,
1340        this: Option<RawVc>,
1341        arg: &mut dyn StackDynTaskInputs,
1342        persistence: TaskPersistence,
1343    ) -> RawVc {
1344        self.dynamic_call(native_fn, this, arg, persistence)
1345    }
1346    fn native_call(
1347        &self,
1348        native_fn: &'static NativeFunction,
1349        this: Option<RawVc>,
1350        arg: &mut dyn StackDynTaskInputs,
1351        persistence: TaskPersistence,
1352    ) -> RawVc {
1353        self.native_call(native_fn, this, arg, persistence)
1354    }
1355    fn trait_call(
1356        &self,
1357        trait_method: &'static TraitMethod,
1358        this: RawVc,
1359        arg: &mut dyn StackDynTaskInputs,
1360        persistence: TaskPersistence,
1361    ) -> RawVc {
1362        self.trait_call(trait_method, this, arg, persistence)
1363    }
1364
1365    #[track_caller]
1366    fn run(
1367        &self,
1368        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1369    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1370        let this = self.pin();
1371        Box::pin(async move { this.run(future).await })
1372    }
1373
1374    #[track_caller]
1375    fn run_once(
1376        &self,
1377        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1378    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1379        let this = self.pin();
1380        Box::pin(async move { this.run_once(future).await })
1381    }
1382
1383    #[track_caller]
1384    fn run_once_with_reason(
1385        &self,
1386        reason: StaticOrArc<dyn InvalidationReason>,
1387        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1388    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1389        {
1390            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1391            reason_set.insert(reason);
1392        }
1393        let this = self.pin();
1394        Box::pin(async move { this.run_once(future).await })
1395    }
1396
1397    #[track_caller]
1398    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1399        self.start_once_process(future)
1400    }
1401
1402    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1403        if let Err(e) = self.compilation_events.send(event) {
1404            tracing::warn!("Failed to send compilation event: {e}");
1405        }
1406    }
1407
1408    fn get_task_name(&self, task: TaskId) -> String {
1409        self.backend.get_task_name(task, self)
1410    }
1411}
1412
1413impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1414    #[instrument(level = "info", skip_all, name = "invalidate")]
1415    fn invalidate(&self, task: TaskId) {
1416        self.backend.invalidate_task(task, self);
1417    }
1418
1419    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1420    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1421        {
1422            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1423            reason_set.insert(reason);
1424        }
1425        self.backend.invalidate_task(task, self);
1426    }
1427
1428    fn invalidate_serialization(&self, task: TaskId) {
1429        self.backend.invalidate_serialization(task, self);
1430    }
1431
1432    #[track_caller]
1433    fn try_read_task_output(
1434        &self,
1435        task: TaskId,
1436        options: ReadOutputOptions,
1437    ) -> Result<Result<RawVc, EventListener>> {
1438        if options.consistency == ReadConsistency::Eventual {
1439            debug_assert_not_in_top_level_task("read_task_output");
1440        }
1441        self.backend.try_read_task_output(
1442            task,
1443            current_task_if_available("reading Vcs"),
1444            options,
1445            self,
1446        )
1447    }
1448
1449    #[track_caller]
1450    fn try_read_task_cell(
1451        &self,
1452        task: TaskId,
1453        index: CellId,
1454        options: ReadCellOptions,
1455    ) -> Result<Result<TypedCellContent, EventListener>> {
1456        let reader = current_task_if_available("reading Vcs");
1457        self.backend
1458            .try_read_task_cell(task, index, reader, options, self)
1459    }
1460
1461    fn try_read_own_task_cell(
1462        &self,
1463        current_task: TaskId,
1464        index: CellId,
1465    ) -> Result<TypedCellContent> {
1466        self.backend
1467            .try_read_own_task_cell(current_task, index, self)
1468    }
1469
1470    #[track_caller]
1471    fn try_read_local_output(
1472        &self,
1473        execution_id: ExecutionId,
1474        local_task_id: LocalTaskId,
1475    ) -> Result<Result<RawVc, EventListener>> {
1476        debug_assert_not_in_top_level_task("read_local_output");
1477        CURRENT_TASK_STATE.with(|gts| {
1478            let gts_read = gts.read().unwrap();
1479
1480            // Local Vcs are local to their parent task's current execution, and do not exist
1481            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1482            // marker trait. This assertion exists to handle any potential escapes that the
1483            // compile-time checks cannot capture.
1484            gts_read.assert_execution_id(execution_id);
1485
1486            match gts_read.local_tasks.get(local_task_id) {
1487                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1488                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1489            }
1490        })
1491    }
1492
1493    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1494        // TODO: Add assert_not_in_top_level_task("read_task_collectibles") check here.
1495        // Collectible reads are eventually consistent.
1496        self.backend.read_task_collectibles(
1497            task,
1498            trait_id,
1499            current_task_if_available("reading collectibles"),
1500            self,
1501        )
1502    }
1503
1504    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1505        self.backend.emit_collectible(
1506            trait_type,
1507            collectible,
1508            current_task("emitting collectible"),
1509            self,
1510        );
1511    }
1512
1513    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1514        self.backend.unemit_collectible(
1515            trait_type,
1516            collectible,
1517            count,
1518            current_task("emitting collectible"),
1519            self,
1520        );
1521    }
1522
1523    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1524        for (&collectible, &count) in collectibles {
1525            if count > 0 {
1526                self.backend.unemit_collectible(
1527                    trait_type,
1528                    collectible,
1529                    count as u32,
1530                    current_task("emitting collectible"),
1531                    self,
1532                );
1533            }
1534        }
1535    }
1536
1537    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1538        self.try_read_own_task_cell(task, index)
1539    }
1540
1541    fn update_own_task_cell(
1542        &self,
1543        task: TaskId,
1544        index: CellId,
1545        content: CellContent,
1546        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1547        content_hash: Option<CellHash>,
1548        verification_mode: VerificationMode,
1549    ) {
1550        self.backend.update_task_cell(
1551            task,
1552            index,
1553            content,
1554            updated_key_hashes,
1555            content_hash,
1556            verification_mode,
1557            self,
1558        );
1559    }
1560
1561    fn connect_task(&self, task: TaskId) {
1562        self.backend
1563            .connect_task(task, current_task_if_available("connecting task"), self);
1564    }
1565
1566    fn mark_own_task_as_finished(&self, task: TaskId) {
1567        self.backend.mark_own_task_as_finished(task, self);
1568    }
1569
1570    /// Creates a future that inherits the current task id and task state. The current global task
1571    /// will wait for this future to be dropped before exiting.
1572    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1573        // this is similar to what happens for a local task, except that we keep the local task's
1574        // state as well.
1575        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1576        global_task_state
1577            .write()
1578            .unwrap()
1579            .local_tasks
1580            .register_detached();
1581        let wrapped = async move {
1582            // use a drop guard for panic safety
1583            struct DropGuard;
1584            impl Drop for DropGuard {
1585                fn drop(&mut self) {
1586                    CURRENT_TASK_STATE
1587                        .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1588                }
1589            }
1590            let _guard = DropGuard;
1591            fut.await;
1592        };
1593        tokio::spawn(TURBO_TASKS.scope(
1594            turbo_tasks(),
1595            CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1596        ));
1597    }
1598
1599    fn task_statistics(&self) -> &TaskStatisticsApi {
1600        self.backend.task_statistics()
1601    }
1602
1603    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1604        let this = self.pin();
1605        Box::pin(async move {
1606            this.stop_and_wait().await;
1607        })
1608    }
1609
1610    fn subscribe_to_compilation_events(
1611        &self,
1612        event_types: Option<Vec<String>>,
1613    ) -> Receiver<Arc<dyn CompilationEvent>> {
1614        self.compilation_events.subscribe(event_types)
1615    }
1616
1617    fn is_tracking_dependencies(&self) -> bool {
1618        self.backend.is_tracking_dependencies()
1619    }
1620}
1621
1622impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1623    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1624        self.pin()
1625    }
1626    fn backend(&self) -> &B {
1627        &self.backend
1628    }
1629
1630    #[track_caller]
1631    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1632        self.schedule_background_job(async move |this| {
1633            this.backend.run_backend_job(job, &*this).await;
1634            this
1635        })
1636    }
1637
1638    #[track_caller]
1639    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1640        self.schedule_foreground_job(async move |this| {
1641            this.backend.run_backend_job(job, &*this).await;
1642            this
1643        })
1644    }
1645
1646    #[track_caller]
1647    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1648        self.schedule(task, priority)
1649    }
1650
1651    fn get_current_task_priority(&self) -> TaskPriority {
1652        CURRENT_TASK_STATE
1653            .try_with(|task_state| task_state.read().unwrap().priority)
1654            .unwrap_or(TaskPriority::initial())
1655    }
1656
1657    fn program_duration_until(&self, instant: Instant) -> Duration {
1658        instant - self.program_start
1659    }
1660
1661    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1662        // SAFETY: This is a fresh id from the factory
1663        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1664    }
1665
1666    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1667        // SAFETY: This is a fresh id from the factory
1668        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1669    }
1670
1671    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1672        unsafe { self.task_id_factory.reuse(id.into()) }
1673    }
1674
1675    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1676        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1677    }
1678
1679    fn is_idle(&self) -> bool {
1680        self.currently_scheduled_foreground_jobs
1681            .load(Ordering::Acquire)
1682            == 0
1683    }
1684}
1685
1686async fn wait_for_local_tasks() {
1687    let listener =
1688        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1689    let Some(listener) = listener else {
1690        return;
1691    };
1692    listener.await;
1693}
1694
1695pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1696    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1697        Ok(id) => id,
1698        Err(_) => panic!(
1699            "{from} can only be used in the context of a turbo_tasks task execution or \
1700             turbo_tasks run"
1701        ),
1702    }
1703}
1704
1705pub(crate) fn current_task(from: &str) -> TaskId {
1706    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1707        Ok(Some(id)) => id,
1708        Ok(None) | Err(_) => {
1709            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1710        }
1711    }
1712}
1713
1714/// Panics if we're not in a top-level task (e.g. [`run_once`]). Some function calls should only
1715/// happen in a top-level task (e.g. [`Effects::apply`][crate::Effects::apply]).
1716#[track_caller]
1717pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1718    if !cfg!(debug_assertions) {
1719        return;
1720    }
1721
1722    let in_top_level = CURRENT_TASK_STATE
1723        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1724        .unwrap_or(true);
1725    if !in_top_level {
1726        panic!("{message}");
1727    }
1728}
1729
1730#[track_caller]
1731pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1732    if !cfg!(debug_assertions) {
1733        return;
1734    }
1735
1736    // HACK: We set this inside of `ReadRawVcFuture` to suppress warnings about an internal
1737    // consistency bug
1738    let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1739        .try_with(|&suppressed| suppressed)
1740        .unwrap_or(false);
1741    if suppressed {
1742        return;
1743    }
1744
1745    let in_top_level = CURRENT_TASK_STATE
1746        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1747        .unwrap_or(false);
1748    if in_top_level {
1749        panic!(
1750            "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1751             Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1752             reads to avoid leaking inconsistent return values."
1753        );
1754    }
1755}
1756
1757pub async fn run<T: Send + 'static>(
1758    tt: Arc<dyn TurboTasksApi>,
1759    future: impl Future<Output = Result<T>> + Send + 'static,
1760) -> Result<T> {
1761    let (tx, rx) = tokio::sync::oneshot::channel();
1762
1763    tt.run(Box::pin(async move {
1764        let result = future.await?;
1765        tx.send(result)
1766            .map_err(|_| anyhow!("unable to send result"))?;
1767        Ok(())
1768    }))
1769    .await?;
1770
1771    Ok(rx.await?)
1772}
1773
1774pub async fn run_once<T: Send + 'static>(
1775    tt: Arc<dyn TurboTasksApi>,
1776    future: impl Future<Output = Result<T>> + Send + 'static,
1777) -> Result<T> {
1778    let (tx, rx) = tokio::sync::oneshot::channel();
1779
1780    tt.run_once(Box::pin(async move {
1781        let result = future.await?;
1782        tx.send(result)
1783            .map_err(|_| anyhow!("unable to send result"))?;
1784        Ok(())
1785    }))
1786    .await?;
1787
1788    Ok(rx.await?)
1789}
1790
1791pub async fn run_once_with_reason<T: Send + 'static>(
1792    tt: Arc<dyn TurboTasksApi>,
1793    reason: impl InvalidationReason,
1794    future: impl Future<Output = Result<T>> + Send + 'static,
1795) -> Result<T> {
1796    let (tx, rx) = tokio::sync::oneshot::channel();
1797
1798    tt.run_once_with_reason(
1799        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1800        Box::pin(async move {
1801            let result = future.await?;
1802            tx.send(result)
1803                .map_err(|_| anyhow!("unable to send result"))?;
1804            Ok(())
1805        }),
1806    )
1807    .await?;
1808
1809    Ok(rx.await?)
1810}
1811
1812/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1813pub fn dynamic_call(
1814    func: &'static NativeFunction,
1815    this: Option<RawVc>,
1816    arg: &mut dyn StackDynTaskInputs,
1817    persistence: TaskPersistence,
1818) -> RawVc {
1819    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1820}
1821
1822/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1823pub fn trait_call(
1824    trait_method: &'static TraitMethod,
1825    this: RawVc,
1826    arg: &mut dyn StackDynTaskInputs,
1827    persistence: TaskPersistence,
1828) -> RawVc {
1829    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1830}
1831
1832pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1833    TURBO_TASKS.with(|arc| arc.clone())
1834}
1835
1836pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1837    TURBO_TASKS.with(Arc::downgrade)
1838}
1839
1840pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1841    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1842}
1843
1844pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1845    TURBO_TASKS.with(|arc| func(arc))
1846}
1847
1848pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1849    TURBO_TASKS.sync_scope(tt, f)
1850}
1851
1852pub fn turbo_tasks_future_scope<T>(
1853    tt: Arc<dyn TurboTasksApi>,
1854    f: impl Future<Output = T>,
1855) -> impl Future<Output = T> {
1856    TURBO_TASKS.scope(tt, f)
1857}
1858
1859pub fn with_turbo_tasks_for_testing<T>(
1860    tt: Arc<dyn TurboTasksApi>,
1861    current_task: TaskId,
1862    execution_id: ExecutionId,
1863    f: impl Future<Output = T>,
1864) -> impl Future<Output = T> {
1865    TURBO_TASKS.scope(
1866        tt,
1867        CURRENT_TASK_STATE.scope(
1868            Arc::new(RwLock::new(CurrentTaskState::new(
1869                current_task,
1870                execution_id,
1871                TaskPriority::initial(),
1872                false, // in_top_level_task
1873            ))),
1874            f,
1875        ),
1876    )
1877}
1878
1879/// Spawns the given future within the context of the current task.
1880///
1881/// Beware: this method is not safe to use in production code. It is only
1882/// intended for use in tests and for debugging purposes.
1883pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1884    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1885}
1886
1887pub fn current_task_for_testing() -> Option<TaskId> {
1888    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1889}
1890
1891/// Marks the current task as finished. This excludes it from waiting for
1892/// strongly consistency.
1893pub fn mark_finished() {
1894    with_turbo_tasks(|tt| {
1895        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1896    });
1897}
1898
1899/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1900/// serialization of the current task cells.
1901///
1902/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1903/// since State allocation implies interior mutability.
1904pub fn get_serialization_invalidator() -> SerializationInvalidator {
1905    CURRENT_TASK_STATE.with(|cell| {
1906        let CurrentTaskState {
1907            task_id,
1908            #[cfg(feature = "verify_determinism")]
1909            stateful,
1910            ..
1911        } = &mut *cell.write().unwrap();
1912        #[cfg(feature = "verify_determinism")]
1913        {
1914            *stateful = true;
1915        }
1916        let Some(task_id) = *task_id else {
1917            panic!(
1918                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1919                 task execution"
1920            );
1921        };
1922        SerializationInvalidator::new(task_id)
1923    })
1924}
1925
1926pub fn mark_invalidator() {
1927    CURRENT_TASK_STATE.with(|cell| {
1928        let CurrentTaskState {
1929            has_invalidator, ..
1930        } = &mut *cell.write().unwrap();
1931        *has_invalidator = true;
1932    })
1933}
1934
1935/// Marks the current task as stateful. This is used to indicate that the task
1936/// has interior mutability (e.g., via [`State`][crate::State]), which means
1937/// the task may produce different outputs even with the same inputs.
1938///
1939/// Only has an effect when the `verify_determinism` feature is enabled.
1940pub fn mark_stateful() {
1941    #[cfg(feature = "verify_determinism")]
1942    {
1943        CURRENT_TASK_STATE.with(|cell| {
1944            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1945            *stateful = true;
1946        })
1947    }
1948    // No-op when verify_determinism is not enabled
1949}
1950
1951/// Marks the current task context as being in a top-level task. When in a top-level task,
1952/// eventually consistent reads will panic. It is almost always a mistake to perform an eventually
1953/// consistent read at the top-level of the application.
1954pub fn mark_top_level_task() {
1955    if cfg!(debug_assertions) {
1956        CURRENT_TASK_STATE.with(|cell| {
1957            cell.write().unwrap().in_top_level_task = true;
1958        })
1959    }
1960}
1961
1962/// Unmarks the current task context as being in a top-level task. The opposite of
1963/// [`mark_top_level_task`].
1964///
1965/// This utility can be okay in unit tests, where we're observing the internal behavior of
1966/// turbo-tasks, but otherwise, it is probably a mistake to call this function.
1967///
1968/// Calling this will allow eventually-consistent reads at the top-level, potentially exposing
1969/// incomplete computations and internal errors caused by eventual consistency that would've been
1970/// caught when the function was re-run. A strongly-consistent read re-runs parts of a task until
1971/// all of the dependencies have settled.
1972pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1973    if cfg!(debug_assertions) {
1974        CURRENT_TASK_STATE.with(|cell| {
1975            cell.write().unwrap().in_top_level_task = false;
1976        })
1977    }
1978}
1979
1980pub fn prevent_gc() {
1981    // TODO implement garbage collection
1982}
1983
1984pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1985    with_turbo_tasks(|tt| {
1986        let raw_vc = collectible.node.node;
1987        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1988    })
1989}
1990
1991pub(crate) async fn read_task_output(
1992    this: &dyn TurboTasksApi,
1993    id: TaskId,
1994    options: ReadOutputOptions,
1995) -> Result<RawVc> {
1996    loop {
1997        match this.try_read_task_output(id, options)? {
1998            Ok(result) => return Ok(result),
1999            Err(listener) => listener.await,
2000        }
2001    }
2002}
2003
2004/// A reference to a task's cell with methods that allow updating the contents
2005/// of the cell.
2006///
2007/// Mutations should not outside of the task that that owns this cell. Doing so
2008/// is a logic error, and may lead to incorrect caching behavior.
2009#[derive(Clone, Copy)]
2010pub struct CurrentCellRef {
2011    current_task: TaskId,
2012    index: CellId,
2013}
2014
2015type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2016
2017impl CurrentCellRef {
2018    /// Updates the cell if the given `functor` returns a value.
2019    fn conditional_update<T>(
2020        &self,
2021        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2022    ) where
2023        T: VcValueType,
2024    {
2025        self.conditional_update_with_shared_reference(|old_shared_reference| {
2026            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2027            let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2028            Some((
2029                SharedReference::new(triomphe::Arc::new(new_value)),
2030                updated_key_hashes,
2031                content_hash,
2032            ))
2033        })
2034    }
2035
2036    /// Updates the cell if the given `functor` returns a `SharedReference`.
2037    fn conditional_update_with_shared_reference(
2038        &self,
2039        functor: impl FnOnce(
2040            Option<&SharedReference>,
2041        ) -> Option<(
2042            SharedReference,
2043            Option<SmallVec<[u64; 2]>>,
2044            Option<CellHash>,
2045        )>,
2046    ) {
2047        let tt = turbo_tasks();
2048        let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2049        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2050        if let Some((update, updated_key_hashes, content_hash)) = update {
2051            tt.update_own_task_cell(
2052                self.current_task,
2053                self.index,
2054                CellContent(Some(update)),
2055                updated_key_hashes,
2056                content_hash,
2057                VerificationMode::EqualityCheck,
2058            )
2059        }
2060    }
2061
2062    /// Replace the current cell's content with `new_value` if the current content is not equal by
2063    /// value with the existing content.
2064    ///
2065    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
2066    ///
2067    /// Take this example of a custom equality implementation on a transparent wrapper type:
2068    ///
2069    /// ```
2070    /// #[turbo_tasks::value(transparent, eq = "manual")]
2071    /// struct Wrapper(Vec<u32>);
2072    ///
2073    /// impl PartialEq for Wrapper {
2074    ///     fn eq(&self, other: Wrapper) {
2075    ///         // Example: order doesn't matter for equality
2076    ///         let (mut this, mut other) = (self.clone(), other.clone());
2077    ///         this.sort_unstable();
2078    ///         other.sort_unstable();
2079    ///         this == other
2080    ///     }
2081    /// }
2082    ///
2083    /// impl Eq for Wrapper {}
2084    /// ```
2085    ///
2086    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2087    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2088    ///
2089    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2090    /// just forwards to the inner value's [`PartialEq`].
2091    ///
2092    /// If you already have a `SharedReference`, consider calling
2093    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2094    /// object.
2095    pub fn compare_and_update<T>(&self, new_value: T)
2096    where
2097        T: PartialEq + VcValueType,
2098    {
2099        self.conditional_update(|old_value| {
2100            if let Some(old_value) = old_value
2101                && old_value == &new_value
2102            {
2103                return None;
2104            }
2105            Some((new_value, None, None))
2106        });
2107    }
2108
2109    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2110    /// equal by value with the existing content.
2111    ///
2112    /// If you already have a `SharedReference`, this is a faster version of
2113    /// [`CurrentCellRef::compare_and_update`].
2114    ///
2115    /// The value should be stored in [`SharedReference`] using the type `T`.
2116    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2117    where
2118        T: VcValueType + PartialEq,
2119    {
2120        self.conditional_update_with_shared_reference(|old_sr| {
2121            if let Some(old_sr) = old_sr {
2122                let old_value = extract_sr_value::<T>(old_sr);
2123                let new_value = extract_sr_value::<T>(&new_shared_reference);
2124                if old_value == new_value {
2125                    return None;
2126                }
2127            }
2128            Some((new_shared_reference, None, None))
2129        });
2130    }
2131
2132    /// Replace the current cell's content if the new value is different.
2133    ///
2134    /// Like [`Self::compare_and_update`], but also computes and stores a hash of the value.
2135    /// When the cell's transient data is evicted, the stored hash enables the backend to detect
2136    /// whether the value actually changed without re-comparing values—avoiding unnecessary
2137    /// downstream invalidation.
2138    ///
2139    /// Requires `T: DeterministicHash` in addition to `T: PartialEq`.
2140    pub fn hashed_compare_and_update<T>(&self, new_value: T)
2141    where
2142        T: PartialEq + DeterministicHash + VcValueType,
2143    {
2144        self.conditional_update(|old_value| {
2145            if let Some(old_value) = old_value
2146                && old_value == &new_value
2147            {
2148                return None;
2149            }
2150            let content_hash = hash_xxh3_hash128(&new_value);
2151            Some((new_value, None, Some(content_hash)))
2152        });
2153    }
2154
2155    /// Replace the current cell's content if the new value (from a pre-existing
2156    /// [`SharedReference`]) is different.
2157    ///
2158    /// Like [`Self::compare_and_update_with_shared_reference`], but also passes a hash
2159    /// for hash-based change detection when transient data has been evicted.
2160    pub fn hashed_compare_and_update_with_shared_reference<T>(
2161        &self,
2162        new_shared_reference: SharedReference,
2163    ) where
2164        T: VcValueType + PartialEq + DeterministicHash,
2165    {
2166        self.conditional_update_with_shared_reference(move |old_sr| {
2167            if let Some(old_sr) = old_sr {
2168                let old_value = extract_sr_value::<T>(old_sr);
2169                let new_value = extract_sr_value::<T>(&new_shared_reference);
2170                if old_value == new_value {
2171                    return None;
2172                }
2173            }
2174            let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2175            Some((new_shared_reference, None, Some(content_hash)))
2176        });
2177    }
2178
2179    /// See [`Self::compare_and_update`], but selectively update individual keys.
2180    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2181    where
2182        T: PartialEq + VcValueType,
2183        VcReadTarget<T>: KeyedEq,
2184        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2185    {
2186        self.conditional_update(|old_value| {
2187            let Some(old_value) = old_value else {
2188                return Some((new_value, None, None));
2189            };
2190            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2191            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2192            let updated_keys = old_value.different_keys(new_value_ref);
2193            if updated_keys.is_empty() {
2194                return None;
2195            }
2196            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2197            let updated_key_hashes = updated_keys
2198                .into_iter()
2199                .map(|key| FxBuildHasher.hash_one(key))
2200                .collect();
2201            Some((new_value, Some(updated_key_hashes), None))
2202        });
2203    }
2204
2205    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2206    /// keys.
2207    pub fn keyed_compare_and_update_with_shared_reference<T>(
2208        &self,
2209        new_shared_reference: SharedReference,
2210    ) where
2211        T: VcValueType + PartialEq,
2212        VcReadTarget<T>: KeyedEq,
2213        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2214    {
2215        self.conditional_update_with_shared_reference(|old_sr| {
2216            let Some(old_sr) = old_sr else {
2217                return Some((new_shared_reference, None, None));
2218            };
2219            let old_value = extract_sr_value::<T>(old_sr);
2220            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2221            let new_value = extract_sr_value::<T>(&new_shared_reference);
2222            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2223            let updated_keys = old_value.different_keys(new_value);
2224            if updated_keys.is_empty() {
2225                return None;
2226            }
2227            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2228            let updated_key_hashes = updated_keys
2229                .into_iter()
2230                .map(|key| FxBuildHasher.hash_one(key))
2231                .collect();
2232            Some((new_shared_reference, Some(updated_key_hashes), None))
2233        });
2234    }
2235
2236    /// Unconditionally updates the content of the cell.
2237    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2238    where
2239        T: VcValueType,
2240    {
2241        let tt = turbo_tasks();
2242        tt.update_own_task_cell(
2243            self.current_task,
2244            self.index,
2245            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2246            None,
2247            None,
2248            verification_mode,
2249        )
2250    }
2251
2252    /// A faster version of [`Self::update`] if you already have a
2253    /// [`SharedReference`].
2254    ///
2255    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2256    /// by identity, no update is performed.
2257    ///
2258    /// The value should be stored in [`SharedReference`] using the type `T`.
2259    pub fn update_with_shared_reference(
2260        &self,
2261        shared_ref: SharedReference,
2262        verification_mode: VerificationMode,
2263    ) {
2264        let tt = turbo_tasks();
2265        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2266            let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2267            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2268                // pointer equality (not value equality)
2269                shared_ref_exp != shared_ref
2270            } else {
2271                true
2272            }
2273        } else {
2274            true
2275        };
2276        if update {
2277            tt.update_own_task_cell(
2278                self.current_task,
2279                self.index,
2280                CellContent(Some(shared_ref)),
2281                None,
2282                None,
2283                verification_mode,
2284            )
2285        }
2286    }
2287}
2288
2289impl From<CurrentCellRef> for RawVc {
2290    fn from(cell: CurrentCellRef) -> Self {
2291        RawVc::TaskCell(cell.current_task, cell.index)
2292    }
2293}
2294
2295fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2296    sr.0.downcast_ref::<T>()
2297        .expect("cannot update SharedReference of different type")
2298}
2299
2300pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2301    find_cell_by_id(T::get_value_type_id())
2302}
2303
2304pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2305    CURRENT_TASK_STATE.with(|ts| {
2306        let current_task = current_task("celling turbo_tasks values");
2307        let mut ts = ts.write().unwrap();
2308        let map = ts.cell_counters.as_mut().unwrap();
2309        let current_index = map.entry(ty).or_default();
2310        let index = *current_index;
2311        *current_index += 1;
2312        CurrentCellRef {
2313            current_task,
2314            index: CellId { type_id: ty, index },
2315        }
2316    })
2317}
2318
2319pub(crate) async fn read_local_output(
2320    this: &dyn TurboTasksApi,
2321    execution_id: ExecutionId,
2322    local_task_id: LocalTaskId,
2323) -> Result<RawVc> {
2324    loop {
2325        match this.try_read_local_output(execution_id, local_task_id)? {
2326            Ok(raw_vc) => return Ok(raw_vc),
2327            Err(event_listener) => event_listener.await,
2328        }
2329    }
2330}