Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    panic::AssertUnwindSafe,
8    pin::Pin,
9    process::abort,
10    sync::{
11        Arc, Mutex, RwLock, Weak,
12        atomic::{AtomicBool, AtomicUsize, Ordering},
13    },
14    time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
31    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
32    VcValueTrait, VcValueType,
33    backend::{
34        Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35        TurboTasksExecutionError, TypedCellContent, VerificationMode,
36    },
37    capture_future::CaptureFuture,
38    dyn_task_inputs::StackDynTaskInputs,
39    event::{Event, EventListener},
40    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
41    id_factory::IdFactoryWithReuse,
42    keyed::KeyedEq,
43    local_task_tracker::LocalTaskTracker,
44    macro_helpers::NativeFunction,
45    message_queue::{CompilationEvent, CompilationEventQueue},
46    priority_runner::{Executor, PriorityRunner},
47    raw_vc::{CellId, RawVc},
48    registry,
49    serialization_invalidation::SerializationInvalidator,
50    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
51    task_statistics::TaskStatisticsApi,
52    trace::TraceRawVcs,
53    util::{IdFactory, StaticOrArc},
54};
55
56/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
57/// tasks from function calls.
58pub trait TurboTasksCallApi: Sync + Send {
59    /// Calls a native function with arguments. Resolves arguments when needed
60    /// with a wrapper task.
61    fn dynamic_call(
62        &self,
63        native_fn: &'static NativeFunction,
64        this: Option<RawVc>,
65        arg: &mut dyn StackDynTaskInputs,
66        persistence: TaskPersistence,
67    ) -> RawVc;
68    /// Call a native function with arguments.
69    /// All inputs must be resolved.
70    fn native_call(
71        &self,
72        native_fn: &'static NativeFunction,
73        this: Option<RawVc>,
74        arg: &mut dyn StackDynTaskInputs,
75        persistence: TaskPersistence,
76    ) -> RawVc;
77    /// Calls a trait method with arguments. First input is the `self` object.
78    /// Uses a wrapper task to resolve
79    fn trait_call(
80        &self,
81        trait_method: &'static TraitMethod,
82        this: RawVc,
83        arg: &mut dyn StackDynTaskInputs,
84        persistence: TaskPersistence,
85    ) -> RawVc;
86
87    fn run(
88        &self,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
91    fn run_once(
92        &self,
93        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
95    fn run_once_with_reason(
96        &self,
97        reason: StaticOrArc<dyn InvalidationReason>,
98        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
99    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
100    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
101
102    /// Sends a compilation event to subscribers.
103    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
104
105    /// Returns a human-readable name for the given task.
106    fn get_task_name(&self, task: TaskId) -> String;
107}
108
109/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
110/// context. Returned by the [`turbo_tasks`] helper function.
111///
112/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
113/// parameter.
114pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
115    fn invalidate(&self, task: TaskId);
116    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
117
118    fn invalidate_serialization(&self, task: TaskId);
119
120    fn try_read_task_output(
121        &self,
122        task: TaskId,
123        options: ReadOutputOptions,
124    ) -> Result<Result<RawVc, EventListener>>;
125
126    fn try_read_task_cell(
127        &self,
128        task: TaskId,
129        index: CellId,
130        options: ReadCellOptions,
131    ) -> Result<Result<TypedCellContent, EventListener>>;
132
133    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
134    /// task points to.
135    ///
136    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
137    /// recursively or in a loop.
138    ///
139    /// This does not accept a consistency argument, as you cannot control consistency of a read of
140    /// an operation owned by your own task. Strongly consistent reads are only allowed on
141    /// [`OperationVc`]s, which should never be local tasks.
142    ///
143    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
144    /// task to depend on itself.
145    ///
146    /// [`OperationVc`]: crate::OperationVc
147    fn try_read_local_output(
148        &self,
149        execution_id: ExecutionId,
150        local_task_id: LocalTaskId,
151    ) -> Result<Result<RawVc, EventListener>>;
152
153    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
154
155    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
156    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
157    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
158
159    /// INVALIDATION: Be careful with this, it will not track dependencies, so
160    /// using it could break cache invalidation.
161    fn try_read_own_task_cell(
162        &self,
163        current_task: TaskId,
164        index: CellId,
165    ) -> Result<TypedCellContent>;
166
167    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
168    fn update_own_task_cell(
169        &self,
170        task: TaskId,
171        index: CellId,
172        content: CellContent,
173        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
174        content_hash: Option<CellHash>,
175        verification_mode: VerificationMode,
176    );
177    fn mark_own_task_as_finished(&self, task: TaskId);
178    fn mark_own_task_as_session_dependent(&self, task: TaskId);
179
180    fn connect_task(&self, task: TaskId);
181
182    /// Wraps the given future in the current task.
183    ///
184    /// Beware: this method is not safe to use in production code. It is only intended for use in
185    /// tests and for debugging purposes.
186    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
187
188    fn task_statistics(&self) -> &TaskStatisticsApi;
189
190    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
191
192    fn subscribe_to_compilation_events(
193        &self,
194        event_types: Option<Vec<String>>,
195    ) -> Receiver<Arc<dyn CompilationEvent>>;
196
197    // Returns true if TurboTasks is configured to track dependencies.
198    fn is_tracking_dependencies(&self) -> bool;
199}
200
201/// A wrapper around a value that is unused.
202pub struct Unused<T> {
203    inner: T,
204}
205
206impl<T> Unused<T> {
207    /// Creates a new unused value.
208    ///
209    /// # Safety
210    ///
211    /// The wrapped value must not be used.
212    pub unsafe fn new_unchecked(inner: T) -> Self {
213        Self { inner }
214    }
215
216    /// Get the inner value, without consuming the `Unused` wrapper.
217    ///
218    /// # Safety
219    ///
220    /// The user need to make sure that the value stays unused.
221    pub unsafe fn get_unchecked(&self) -> &T {
222        &self.inner
223    }
224
225    /// Unwraps the value, consuming the `Unused` wrapper.
226    pub fn into(self) -> T {
227        self.inner
228    }
229}
230
231/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
232pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
233    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
234
235    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
236    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
237    /// # Safety
238    ///
239    /// The caller must ensure that the task id is not used anymore.
240    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
241    /// # Safety
242    ///
243    /// The caller must ensure that the task id is not used anymore.
244    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
245
246    /// Schedule a task for execution.
247    fn schedule(&self, task: TaskId, priority: TaskPriority);
248
249    /// Returns the priority of the current task.
250    fn get_current_task_priority(&self) -> TaskPriority;
251
252    /// Schedule a foreground backend job for execution.
253    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
254
255    /// Schedule a background backend job for execution.
256    ///
257    /// Background jobs are not counted towards activeness of the system. The system is considered
258    /// idle even with active background jobs.
259    fn schedule_backend_background_job(&self, job: B::BackendJob);
260
261    /// Returns the duration from the start of the program to the given instant.
262    fn program_duration_until(&self, instant: Instant) -> Duration;
263
264    /// Returns true if the system is idle.
265    fn is_idle(&self) -> bool;
266
267    /// Returns a reference to the backend.
268    fn backend(&self) -> &B;
269}
270
271#[allow(clippy::manual_non_exhaustive)]
272pub struct UpdateInfo {
273    pub duration: Duration,
274    pub tasks: usize,
275    pub reasons: InvalidationReasonSet,
276    #[allow(dead_code)]
277    placeholder_for_future_fields: (),
278}
279
280#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
281pub enum TaskPersistence {
282    /// Tasks that may be persisted across sessions using serialization.
283    Persistent,
284
285    /// Tasks that will be persisted in memory for the life of this session, but won't persist
286    /// between sessions.
287    ///
288    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
289    /// type [`TransientValue`][crate::value::TransientValue] or
290    /// [`TransientInstance`][crate::value::TransientInstance].
291    Transient,
292}
293
294impl Display for TaskPersistence {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        match self {
297            TaskPersistence::Persistent => write!(f, "persistent"),
298            TaskPersistence::Transient => write!(f, "transient"),
299        }
300    }
301}
302
303#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
304pub enum ReadConsistency {
305    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
306    /// may later trigger re-computation.
307    #[default]
308    Eventual,
309    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
310    /// the cost of slower reads.
311    ///
312    /// Top-level code that returns data to the user should use strongly consistent reads.
313    Strong,
314}
315
316#[derive(Clone, Copy, Debug, Eq, PartialEq)]
317pub enum ReadCellTracking {
318    /// Reads are tracked as dependencies of the current task.
319    Tracked {
320        /// The key used for the dependency
321        key: Option<u64>,
322    },
323    /// The read is only tracked when there is an error, otherwise it is untracked.
324    ///
325    /// INVALIDATION: Be careful with this, it will not track dependencies, so
326    /// using it could break cache invalidation.
327    TrackOnlyError,
328    /// The read is not tracked as a dependency of the current task.
329    ///
330    /// INVALIDATION: Be careful with this, it will not track dependencies, so
331    /// using it could break cache invalidation.
332    Untracked,
333}
334
335impl ReadCellTracking {
336    pub fn should_track(&self, is_err: bool) -> bool {
337        match self {
338            ReadCellTracking::Tracked { .. } => true,
339            ReadCellTracking::TrackOnlyError => is_err,
340            ReadCellTracking::Untracked => false,
341        }
342    }
343
344    pub fn key(&self) -> Option<u64> {
345        match self {
346            ReadCellTracking::Tracked { key } => *key,
347            ReadCellTracking::TrackOnlyError => None,
348            ReadCellTracking::Untracked => None,
349        }
350    }
351}
352
353impl Default for ReadCellTracking {
354    fn default() -> Self {
355        ReadCellTracking::Tracked { key: None }
356    }
357}
358
359impl Display for ReadCellTracking {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        match self {
362            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
363            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
364            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
365            ReadCellTracking::Untracked => write!(f, "untracked"),
366        }
367    }
368}
369
370#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
371pub enum ReadTracking {
372    /// Reads are tracked as dependencies of the current task.
373    #[default]
374    Tracked,
375    /// The read is only tracked when there is an error, otherwise it is untracked.
376    ///
377    /// INVALIDATION: Be careful with this, it will not track dependencies, so
378    /// using it could break cache invalidation.
379    TrackOnlyError,
380    /// The read is not tracked as a dependency of the current task.
381    ///
382    /// INVALIDATION: Be careful with this, it will not track dependencies, so
383    /// using it could break cache invalidation.
384    Untracked,
385}
386
387impl ReadTracking {
388    pub fn should_track(&self, is_err: bool) -> bool {
389        match self {
390            ReadTracking::Tracked => true,
391            ReadTracking::TrackOnlyError => is_err,
392            ReadTracking::Untracked => false,
393        }
394    }
395}
396
397impl Display for ReadTracking {
398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399        match self {
400            ReadTracking::Tracked => write!(f, "tracked"),
401            ReadTracking::TrackOnlyError => write!(f, "track only error"),
402            ReadTracking::Untracked => write!(f, "untracked"),
403        }
404    }
405}
406
407#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
408pub enum TaskPriority {
409    #[default]
410    Initial,
411    Invalidation {
412        priority: Reverse<u32>,
413    },
414}
415
416impl TaskPriority {
417    pub fn invalidation(priority: u32) -> Self {
418        Self::Invalidation {
419            priority: Reverse(priority),
420        }
421    }
422
423    pub fn initial() -> Self {
424        Self::Initial
425    }
426
427    pub fn leaf() -> Self {
428        Self::Invalidation {
429            priority: Reverse(0),
430        }
431    }
432
433    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
434        match self {
435            TaskPriority::Initial => parent_priority,
436            TaskPriority::Invalidation { priority } => {
437                if let TaskPriority::Invalidation {
438                    priority: parent_priority,
439                } = parent_priority
440                    && priority.0 < parent_priority.0
441                {
442                    Self::Invalidation {
443                        priority: Reverse(parent_priority.0.saturating_add(1)),
444                    }
445                } else {
446                    *self
447                }
448            }
449        }
450    }
451}
452
453impl Display for TaskPriority {
454    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455        match self {
456            TaskPriority::Initial => write!(f, "initial"),
457            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
458        }
459    }
460}
461
462enum ScheduledTask {
463    Task {
464        task_id: TaskId,
465        span: Span,
466    },
467    LocalTask {
468        ty: LocalTaskSpec,
469        persistence: TaskPersistence,
470        local_task_id: LocalTaskId,
471        global_task_state: Arc<RwLock<CurrentTaskState>>,
472        span: Span,
473    },
474}
475
476pub struct TurboTasks<B: Backend + 'static> {
477    this: Weak<Self>,
478    backend: B,
479    task_id_factory: IdFactoryWithReuse<TaskId>,
480    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
481    execution_id_factory: IdFactory<ExecutionId>,
482    stopped: AtomicBool,
483    currently_scheduled_foreground_jobs: AtomicUsize,
484    currently_scheduled_background_jobs: AtomicUsize,
485    scheduled_tasks: AtomicUsize,
486    priority_runner:
487        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
488    start: Mutex<Option<Instant>>,
489    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
490    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
491    event_foreground_start: Event,
492    /// Event that is triggered when all foreground jobs are done
493    /// (currently_scheduled_foreground_jobs becomes zero)
494    event_foreground_done: Event,
495    /// Event that is triggered when all background jobs are done
496    event_background_done: Event,
497    program_start: Instant,
498    compilation_events: CompilationEventQueue,
499}
500
501/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
502/// all share the same non-local task state.
503///
504/// A non-local task is one that:
505///
506/// - Has a unique task id.
507/// - Is potentially cached.
508/// - The backend is aware of.
509struct CurrentTaskState {
510    task_id: Option<TaskId>,
511    execution_id: ExecutionId,
512    priority: TaskPriority,
513
514    /// True if the current task has state in cells (interior mutability).
515    /// Only tracked when verify_determinism feature is enabled.
516    #[cfg(feature = "verify_determinism")]
517    stateful: bool,
518
519    /// True if the current task uses an external invalidator
520    has_invalidator: bool,
521
522    /// True if we're in a top-level task (e.g. `.run_once(...)` or `.run(...)`).
523    /// Eventually consistent reads are not allowed in top-level tasks.
524    in_top_level_task: bool,
525
526    /// Tracks how many cells of each type has been allocated so far during this task execution.
527    /// When a task is re-executed, the cell count may not match the existing cell vec length.
528    ///
529    /// This is taken (and becomes `None`) during teardown of a task.
530    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
531
532    /// Tracks execution of Local tasks (and detached test futures) created during this global
533    /// task's execution.
534    local_tasks: LocalTaskTracker,
535}
536
537impl CurrentTaskState {
538    fn new(
539        task_id: TaskId,
540        execution_id: ExecutionId,
541        priority: TaskPriority,
542        in_top_level_task: bool,
543    ) -> Self {
544        Self {
545            task_id: Some(task_id),
546            execution_id,
547            priority,
548            #[cfg(feature = "verify_determinism")]
549            stateful: false,
550            has_invalidator: false,
551            in_top_level_task,
552            cell_counters: Some(AutoMap::default()),
553            local_tasks: LocalTaskTracker::new(),
554        }
555    }
556
557    fn new_temporary(
558        execution_id: ExecutionId,
559        priority: TaskPriority,
560        in_top_level_task: bool,
561    ) -> Self {
562        Self {
563            task_id: None,
564            execution_id,
565            priority,
566            #[cfg(feature = "verify_determinism")]
567            stateful: false,
568            has_invalidator: false,
569            in_top_level_task,
570            cell_counters: None,
571            local_tasks: LocalTaskTracker::new(),
572        }
573    }
574
575    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
576        if self.execution_id != expected_execution_id {
577            panic!(
578                "Local tasks can only be scheduled/awaited within the same execution of the \
579                 parent task that created them"
580            );
581        }
582    }
583}
584
585// TODO implement our own thread pool and make these thread locals instead
586task_local! {
587    /// The current TurboTasks instance
588    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
589
590    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
591
592    /// Temporarily suppresses the eventual consistency check in top-level tasks.
593    /// This is used by strongly consistent reads to allow them to succeed in top-level tasks.
594    /// This is NOT shared across local tasks (unlike CURRENT_TASK_STATE), so it's safe
595    /// to set/unset without race conditions.
596    pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
597}
598
599impl<B: Backend + 'static> TurboTasks<B> {
600    // TODO better lifetime management for turbo tasks
601    // consider using unsafe for the task_local turbo tasks
602    // that should be safe as long tasks can't outlife turbo task
603    // so we probably want to make sure that all tasks are joined
604    // when trying to drop turbo tasks
605    pub fn new(backend: B) -> Arc<Self> {
606        let task_id_factory = IdFactoryWithReuse::new(
607            TaskId::MIN,
608            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
609        );
610        let transient_task_id_factory =
611            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
612        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
613        let this = Arc::new_cyclic(|this| Self {
614            this: this.clone(),
615            backend,
616            task_id_factory,
617            transient_task_id_factory,
618            execution_id_factory,
619            stopped: AtomicBool::new(false),
620            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
621            currently_scheduled_background_jobs: AtomicUsize::new(0),
622            scheduled_tasks: AtomicUsize::new(0),
623            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
624            start: Default::default(),
625            aggregated_update: Default::default(),
626            event_foreground_done: Event::new(|| {
627                || "TurboTasks::event_foreground_done".to_string()
628            }),
629            event_foreground_start: Event::new(|| {
630                || "TurboTasks::event_foreground_start".to_string()
631            }),
632            event_background_done: Event::new(|| {
633                || "TurboTasks::event_background_done".to_string()
634            }),
635            program_start: Instant::now(),
636            compilation_events: CompilationEventQueue::default(),
637        });
638        this.backend.startup(&*this);
639        this
640    }
641
642    pub fn pin(&self) -> Arc<Self> {
643        self.this.upgrade().unwrap()
644    }
645
646    /// Creates a new root task
647    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
648    where
649        T: ?Sized,
650        F: Fn() -> Fut + Send + Sync + Clone + 'static,
651        Fut: Future<Output = Result<Vc<T>>> + Send,
652    {
653        let id = self.backend.create_transient_task(
654            TransientTaskType::Root(Box::new(move || {
655                let functor = functor.clone();
656                Box::pin(async move {
657                    mark_top_level_task();
658                    let raw_vc = functor().await?.node;
659                    raw_vc.to_non_local().await
660                })
661            })),
662            self,
663        );
664        self.schedule(id, TaskPriority::initial());
665        id
666    }
667
668    pub fn dispose_root_task(&self, task_id: TaskId) {
669        self.backend.dispose_root_task(task_id, self);
670    }
671
672    // TODO make sure that all dependencies settle before reading them
673    /// Creates a new root task, that is only executed once.
674    /// Dependencies will not invalidate the task.
675    #[track_caller]
676    fn spawn_once_task<T, Fut>(&self, future: Fut)
677    where
678        T: ?Sized,
679        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
680    {
681        let id = self.backend.create_transient_task(
682            TransientTaskType::Once(Box::pin(async move {
683                mark_top_level_task();
684                let raw_vc = future.await?.node;
685                raw_vc.to_non_local().await
686            })),
687            self,
688        );
689        self.schedule(id, TaskPriority::initial());
690    }
691
692    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
693        &self,
694        future: impl Future<Output = Result<T>> + Send + 'static,
695    ) -> Result<T> {
696        let (tx, rx) = tokio::sync::oneshot::channel();
697        self.spawn_once_task(async move {
698            mark_top_level_task();
699            let result = future.await;
700            tx.send(result)
701                .map_err(|_| anyhow!("unable to send result"))?;
702            Ok(Completion::new())
703        });
704
705        rx.await?
706    }
707
708    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
709    pub async fn run<T: TraceRawVcs + Send + 'static>(
710        &self,
711        future: impl Future<Output = Result<T>> + Send + 'static,
712    ) -> Result<T, TurboTasksExecutionError> {
713        self.begin_foreground_job();
714        // it's okay for execution ids to overflow and wrap, they're just used for an assert
715        let execution_id = self.execution_id_factory.wrapping_get();
716        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
717            execution_id,
718            TaskPriority::initial(),
719            true, // in_top_level_task
720        )));
721
722        let result = TURBO_TASKS
723            .scope(
724                self.pin(),
725                CURRENT_TASK_STATE.scope(current_task_state, async {
726                    let result = CaptureFuture::new(future).await;
727
728                    // wait for all spawned local tasks using `local` to finish
729                    wait_for_local_tasks().await;
730
731                    match result {
732                        Ok(Ok(value)) => Ok(value),
733                        Ok(Err(err)) => Err(err.into()),
734                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
735                    }
736                }),
737            )
738            .await;
739        self.finish_foreground_job();
740        result
741    }
742
743    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
744        let this = self.pin();
745        tokio::spawn(async move {
746            this.pin()
747                .run_once(async move {
748                    this.finish_foreground_job();
749                    future.await;
750                    this.begin_foreground_job();
751                    Ok(())
752                })
753                .await
754                .unwrap()
755        });
756    }
757
758    pub(crate) fn native_call(
759        &self,
760        native_fn: &'static NativeFunction,
761        this: Option<RawVc>,
762        arg: &mut dyn StackDynTaskInputs,
763        persistence: TaskPersistence,
764    ) -> RawVc {
765        RawVc::TaskOutput(self.backend.get_or_create_task(
766            native_fn,
767            this,
768            arg,
769            current_task_if_available("turbo_function calls"),
770            persistence,
771            self,
772        ))
773    }
774
775    pub fn dynamic_call(
776        &self,
777        native_fn: &'static NativeFunction,
778        this: Option<RawVc>,
779        arg: &mut dyn StackDynTaskInputs,
780        persistence: TaskPersistence,
781    ) -> RawVc {
782        if this.is_none_or(|this| this.is_resolved())
783            && native_fn.arg_meta.is_resolved(arg.as_ref())
784        {
785            return self.native_call(native_fn, this, arg, persistence);
786        }
787        // Need async resolution — must move the arg to the heap now
788        let arg = arg.take_box();
789        let task_type = LocalTaskSpec {
790            task_type: LocalTaskType::ResolveNative { native_fn },
791            this,
792            arg,
793        };
794        self.schedule_local_task(task_type, persistence)
795    }
796
797    pub fn trait_call(
798        &self,
799        trait_method: &'static TraitMethod,
800        this: RawVc,
801        arg: &mut dyn StackDynTaskInputs,
802        persistence: TaskPersistence,
803    ) -> RawVc {
804        // avoid creating a wrapper task if self is already resolved
805        // for resolved cells we already know the value type so we can lookup the
806        // function
807        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
808            match registry::get_value_type(type_id).get_trait_method(trait_method) {
809                Some(native_fn) => {
810                    if let Some(filter) = native_fn.arg_meta.filter_owned {
811                        let mut arg = (filter)(arg);
812                        return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
813                    } else {
814                        return self.dynamic_call(native_fn, Some(this), arg, persistence);
815                    }
816                }
817                None => {
818                    // We are destined to fail at this point, but we just retry resolution in the
819                    // local task since we cannot report an error from here.
820                    // TODO: A panic seems appropriate since the immediate caller is to blame
821                }
822            }
823        }
824
825        // create a wrapper task to resolve all inputs
826        let task_type = LocalTaskSpec {
827            task_type: LocalTaskType::ResolveTrait { trait_method },
828            this: Some(this),
829            arg: arg.take_box(),
830        };
831
832        self.schedule_local_task(task_type, persistence)
833    }
834
835    #[track_caller]
836    pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
837        self.begin_foreground_job();
838        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
839
840        self.priority_runner.schedule(
841            &self.pin(),
842            ScheduledTask::Task {
843                task_id,
844                span: Span::current(),
845            },
846            priority,
847        );
848    }
849
850    fn schedule_local_task(
851        &self,
852        ty: LocalTaskSpec,
853        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
854        persistence: TaskPersistence,
855    ) -> RawVc {
856        let task_type = ty.task_type;
857        let (global_task_state, execution_id, priority, local_task_id) =
858            CURRENT_TASK_STATE.with(|gts| {
859                let mut gts_write = gts.write().unwrap();
860                let local_task_id = gts_write.local_tasks.create(task_type);
861                (
862                    Arc::clone(gts),
863                    gts_write.execution_id,
864                    gts_write.priority,
865                    local_task_id,
866                )
867            });
868
869        self.priority_runner.schedule(
870            &self.pin(),
871            ScheduledTask::LocalTask {
872                ty,
873                persistence,
874                local_task_id,
875                global_task_state,
876                span: Span::current(),
877            },
878            priority,
879        );
880
881        RawVc::LocalOutput(execution_id, local_task_id, persistence)
882    }
883
884    fn begin_foreground_job(&self) {
885        if self
886            .currently_scheduled_foreground_jobs
887            .fetch_add(1, Ordering::AcqRel)
888            == 0
889        {
890            *self.start.lock().unwrap() = Some(Instant::now());
891            self.event_foreground_start.notify(usize::MAX);
892            self.backend.idle_end(self);
893        }
894    }
895
896    fn finish_foreground_job(&self) {
897        if self
898            .currently_scheduled_foreground_jobs
899            .fetch_sub(1, Ordering::AcqRel)
900            == 1
901        {
902            self.backend.idle_start(self);
903            // That's not super race-condition-safe, but it's only for
904            // statistical reasons
905            let total = self.scheduled_tasks.load(Ordering::Acquire);
906            self.scheduled_tasks.store(0, Ordering::Release);
907            if let Some(start) = *self.start.lock().unwrap() {
908                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
909                if let Some(update) = update.as_mut() {
910                    update.0 += start.elapsed();
911                    update.1 += total;
912                } else {
913                    *update = Some((start.elapsed(), total));
914                }
915            }
916            self.event_foreground_done.notify(usize::MAX);
917        }
918    }
919
920    fn begin_background_job(&self) {
921        self.currently_scheduled_background_jobs
922            .fetch_add(1, Ordering::Relaxed);
923    }
924
925    fn finish_background_job(&self) {
926        if self
927            .currently_scheduled_background_jobs
928            .fetch_sub(1, Ordering::Relaxed)
929            == 1
930        {
931            self.event_background_done.notify(usize::MAX);
932        }
933    }
934
935    pub fn get_in_progress_count(&self) -> usize {
936        self.currently_scheduled_foreground_jobs
937            .load(Ordering::Acquire)
938    }
939
940    /// Waits for the given task to finish executing. This works by performing an untracked read,
941    /// and discarding the value of the task output.
942    ///
943    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
944    /// before all dependencies have completely settled.
945    ///
946    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
947    /// to fully settle before returning.
948    ///
949    /// As this function is typically called in top-level code that waits for results to be ready
950    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
951    pub async fn wait_task_completion(
952        &self,
953        id: TaskId,
954        consistency: ReadConsistency,
955    ) -> Result<()> {
956        read_task_output(
957            self,
958            id,
959            ReadOutputOptions {
960                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
961                tracking: ReadTracking::Untracked,
962                consistency,
963            },
964        )
965        .await?;
966        Ok(())
967    }
968
969    /// Returns [UpdateInfo] with all updates aggregated over a given duration
970    /// (`aggregation`). Will wait until an update happens.
971    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
972        self.aggregated_update_info(aggregation, Duration::MAX)
973            .await
974            .unwrap()
975    }
976
977    /// Returns [UpdateInfo] with all updates aggregated over a given duration
978    /// (`aggregation`). Will only return None when the timeout is reached while
979    /// waiting for the first update.
980    pub async fn aggregated_update_info(
981        &self,
982        aggregation: Duration,
983        timeout: Duration,
984    ) -> Option<UpdateInfo> {
985        let listener = self
986            .event_foreground_done
987            .listen_with_note(|| || "wait for update info".to_string());
988        let wait_for_finish = {
989            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
990            if aggregation.is_zero() {
991                if let Some((duration, tasks)) = update.take() {
992                    return Some(UpdateInfo {
993                        duration,
994                        tasks,
995                        reasons: take(reason_set),
996                        placeholder_for_future_fields: (),
997                    });
998                } else {
999                    true
1000                }
1001            } else {
1002                update.is_none()
1003            }
1004        };
1005        if wait_for_finish {
1006            if timeout == Duration::MAX {
1007                // wait for finish
1008                listener.await;
1009            } else {
1010                // wait for start, then wait for finish or timeout
1011                let start_listener = self
1012                    .event_foreground_start
1013                    .listen_with_note(|| || "wait for update info".to_string());
1014                if self
1015                    .currently_scheduled_foreground_jobs
1016                    .load(Ordering::Acquire)
1017                    == 0
1018                {
1019                    start_listener.await;
1020                } else {
1021                    drop(start_listener);
1022                }
1023                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1024                    // Timeout
1025                    return None;
1026                }
1027            }
1028        }
1029        if !aggregation.is_zero() {
1030            loop {
1031                select! {
1032                    () = tokio::time::sleep(aggregation) => {
1033                        break;
1034                    }
1035                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1036                        // Resets the sleep
1037                    }
1038                }
1039            }
1040        }
1041        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1042        if let Some((duration, tasks)) = update.take() {
1043            Some(UpdateInfo {
1044                duration,
1045                tasks,
1046                reasons: take(reason_set),
1047                placeholder_for_future_fields: (),
1048            })
1049        } else {
1050            panic!("aggregated_update_info must not called concurrently")
1051        }
1052    }
1053
1054    pub async fn wait_background_done(&self) {
1055        let listener = self.event_background_done.listen();
1056        if self
1057            .currently_scheduled_background_jobs
1058            .load(Ordering::Acquire)
1059            != 0
1060        {
1061            listener.await;
1062        }
1063    }
1064
1065    pub async fn stop_and_wait(&self) {
1066        turbo_tasks_future_scope(self.pin(), async move {
1067            self.backend.stopping(self);
1068            self.stopped.store(true, Ordering::Release);
1069            {
1070                let listener = self
1071                    .event_foreground_done
1072                    .listen_with_note(|| || "wait for stop".to_string());
1073                if self
1074                    .currently_scheduled_foreground_jobs
1075                    .load(Ordering::Acquire)
1076                    != 0
1077                {
1078                    listener.await;
1079                }
1080            }
1081            {
1082                let listener = self.event_background_done.listen();
1083                if self
1084                    .currently_scheduled_background_jobs
1085                    .load(Ordering::Acquire)
1086                    != 0
1087                {
1088                    listener.await;
1089                }
1090            }
1091            self.backend.stop(self);
1092        })
1093        .await;
1094    }
1095
1096    #[track_caller]
1097    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1098    where
1099        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1100        T::CallOnceFuture: Send,
1101    {
1102        let mut this = self.pin();
1103        this.begin_foreground_job();
1104        tokio::spawn(
1105            TURBO_TASKS
1106                .scope(this.clone(), async move {
1107                    if !this.stopped.load(Ordering::Acquire) {
1108                        this = func(this.clone()).await;
1109                    }
1110                    this.finish_foreground_job();
1111                })
1112                .in_current_span(),
1113        );
1114    }
1115
1116    #[track_caller]
1117    pub(crate) fn schedule_background_job<T>(&self, func: T)
1118    where
1119        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1120        T::CallOnceFuture: Send,
1121    {
1122        let mut this = self.pin();
1123        self.begin_background_job();
1124        tokio::spawn(
1125            TURBO_TASKS
1126                .scope(this.clone(), async move {
1127                    if !this.stopped.load(Ordering::Acquire) {
1128                        this = func(this).await;
1129                    }
1130                    this.finish_background_job();
1131                })
1132                .in_current_span(),
1133        );
1134    }
1135
1136    fn finish_current_task_state(&self) -> FinishedTaskState {
1137        CURRENT_TASK_STATE.with(|cell| {
1138            let current_task_state = &*cell.write().unwrap();
1139            FinishedTaskState {
1140                #[cfg(feature = "verify_determinism")]
1141                stateful: current_task_state.stateful,
1142                has_invalidator: current_task_state.has_invalidator,
1143            }
1144        })
1145    }
1146
1147    pub fn backend(&self) -> &B {
1148        &self.backend
1149    }
1150}
1151
1152struct TurboTasksExecutor;
1153
1154/// Run a future and abort the process if a panic is reported
1155///
1156/// Turbtasks catches panics from user code and propagates throught the task tree, but if it happens
1157/// as part of state management we have to abort
1158async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1159    match AssertUnwindSafe(f).catch_unwind().await {
1160        Ok(r) => r,
1161        Err(_) => {
1162            eprintln!(
1163                "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1164                 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1165                 https://github.com/vercel/next.js/discussions and include the panic message \
1166                 and stack trace above.\n\nAborting."
1167            );
1168            abort();
1169        }
1170    }
1171}
1172
1173impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1174    type Future = impl Future<Output = ()> + Send + 'static;
1175
1176    fn execute(
1177        &self,
1178        this: &Arc<TurboTasks<B>>,
1179        scheduled_task: ScheduledTask,
1180        priority: TaskPriority,
1181    ) -> Self::Future {
1182        match scheduled_task {
1183            ScheduledTask::Task { task_id, span } => {
1184                let this2 = this.clone();
1185                let this = this.clone();
1186                let future = async move {
1187                    abort_on_panic(async {
1188                        // it's okay for execution ids to overflow and wrap, they're just used
1189                        // for an assert
1190                        let execution_id = this.execution_id_factory.wrapping_get();
1191                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1192                            task_id,
1193                            execution_id,
1194                            priority,
1195                            false, // in_top_level_task
1196                        )));
1197                        let single_execution_future = async {
1198                            if this.stopped.load(Ordering::Acquire) {
1199                                this.backend.task_execution_canceled(task_id, &*this);
1200                                return None;
1201                            }
1202
1203                            let TaskExecutionSpec { future, span } = this
1204                                .backend
1205                                .try_start_task_execution(task_id, priority, &*this)?;
1206
1207                            async {
1208                                let result = CaptureFuture::new(future).await;
1209
1210                                // wait for all spawned local tasks using `local` to finish
1211                                wait_for_local_tasks().await;
1212
1213                                let result = match result {
1214                                    Ok(Ok(raw_vc)) => {
1215                                        // This is safe because we waited for all local tasks to
1216                                        // complete above
1217                                        raw_vc
1218                                            .to_non_local_unchecked_sync(&*this)
1219                                            .map_err(|err| err.into())
1220                                    }
1221                                    Ok(Err(err)) => Err(err.into()),
1222                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1223                                };
1224
1225                                let finished_state = this.finish_current_task_state();
1226                                let cell_counters = CURRENT_TASK_STATE
1227                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1228                                this.backend.task_execution_completed(
1229                                    task_id,
1230                                    result,
1231                                    &cell_counters,
1232                                    #[cfg(feature = "verify_determinism")]
1233                                    finished_state.stateful,
1234                                    finished_state.has_invalidator,
1235                                    &*this,
1236                                )
1237                            }
1238                            .instrument(span)
1239                            .await
1240                        };
1241                        if let Some(stale_priority) = CURRENT_TASK_STATE
1242                            .scope(current_task_state, single_execution_future)
1243                            .await
1244                        {
1245                            // Task was stale; re-schedule at the correct invalidation priority so
1246                            // other tasks can run in the right priority order.
1247                            this.schedule(task_id, stale_priority);
1248                        }
1249                        this.finish_foreground_job();
1250                    })
1251                    .await
1252                };
1253
1254                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1255            }
1256            ScheduledTask::LocalTask {
1257                ty,
1258                persistence,
1259                local_task_id,
1260                global_task_state,
1261                span,
1262            } => {
1263                let this2 = this.clone();
1264                let this = this.clone();
1265                let task_type = ty.task_type;
1266                let future = async move {
1267                    let span = match &ty.task_type {
1268                        LocalTaskType::ResolveNative { native_fn } => {
1269                            native_fn.resolve_span(priority)
1270                        }
1271                        LocalTaskType::ResolveTrait { trait_method } => {
1272                            trait_method.resolve_span(priority)
1273                        }
1274                    };
1275                    abort_on_panic(
1276                        async move {
1277                            let result = match ty.task_type {
1278                                LocalTaskType::ResolveNative { native_fn } => {
1279                                    LocalTaskType::run_resolve_native(
1280                                        native_fn,
1281                                        ty.this,
1282                                        &*ty.arg,
1283                                        persistence,
1284                                        this,
1285                                    )
1286                                    .await
1287                                }
1288                                LocalTaskType::ResolveTrait { trait_method } => {
1289                                    LocalTaskType::run_resolve_trait(
1290                                        trait_method,
1291                                        ty.this.unwrap(),
1292                                        &*ty.arg,
1293                                        persistence,
1294                                        this,
1295                                    )
1296                                    .await
1297                                }
1298                            };
1299
1300                            let output = match result {
1301                                Ok(raw_vc) => OutputContent::Link(raw_vc),
1302                                Err(err) => OutputContent::Error(
1303                                    TurboTasksExecutionError::from(err)
1304                                        .with_local_task_context(task_type.to_string()),
1305                                ),
1306                            };
1307
1308                            CURRENT_TASK_STATE.with(move |gts| {
1309                                gts.write()
1310                                    .unwrap()
1311                                    .local_tasks
1312                                    .complete(local_task_id, output);
1313                            });
1314                        }
1315                        .instrument(span),
1316                    )
1317                    .await
1318                };
1319                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1320
1321                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1322            }
1323        }
1324    }
1325}
1326
1327struct FinishedTaskState {
1328    /// True if the task has state in cells (interior mutability).
1329    /// Only tracked when verify_determinism feature is enabled.
1330    #[cfg(feature = "verify_determinism")]
1331    stateful: bool,
1332
1333    /// True if the task uses an external invalidator
1334    has_invalidator: bool,
1335}
1336
1337impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1338    fn dynamic_call(
1339        &self,
1340        native_fn: &'static NativeFunction,
1341        this: Option<RawVc>,
1342        arg: &mut dyn StackDynTaskInputs,
1343        persistence: TaskPersistence,
1344    ) -> RawVc {
1345        self.dynamic_call(native_fn, this, arg, persistence)
1346    }
1347    fn native_call(
1348        &self,
1349        native_fn: &'static NativeFunction,
1350        this: Option<RawVc>,
1351        arg: &mut dyn StackDynTaskInputs,
1352        persistence: TaskPersistence,
1353    ) -> RawVc {
1354        self.native_call(native_fn, this, arg, persistence)
1355    }
1356    fn trait_call(
1357        &self,
1358        trait_method: &'static TraitMethod,
1359        this: RawVc,
1360        arg: &mut dyn StackDynTaskInputs,
1361        persistence: TaskPersistence,
1362    ) -> RawVc {
1363        self.trait_call(trait_method, this, arg, persistence)
1364    }
1365
1366    #[track_caller]
1367    fn run(
1368        &self,
1369        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1370    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1371        let this = self.pin();
1372        Box::pin(async move { this.run(future).await })
1373    }
1374
1375    #[track_caller]
1376    fn run_once(
1377        &self,
1378        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1379    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1380        let this = self.pin();
1381        Box::pin(async move { this.run_once(future).await })
1382    }
1383
1384    #[track_caller]
1385    fn run_once_with_reason(
1386        &self,
1387        reason: StaticOrArc<dyn InvalidationReason>,
1388        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1389    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1390        {
1391            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1392            reason_set.insert(reason);
1393        }
1394        let this = self.pin();
1395        Box::pin(async move { this.run_once(future).await })
1396    }
1397
1398    #[track_caller]
1399    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1400        self.start_once_process(future)
1401    }
1402
1403    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1404        if let Err(e) = self.compilation_events.send(event) {
1405            tracing::warn!("Failed to send compilation event: {e}");
1406        }
1407    }
1408
1409    fn get_task_name(&self, task: TaskId) -> String {
1410        self.backend.get_task_name(task, self)
1411    }
1412}
1413
1414impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1415    #[instrument(level = "info", skip_all, name = "invalidate")]
1416    fn invalidate(&self, task: TaskId) {
1417        self.backend.invalidate_task(task, self);
1418    }
1419
1420    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1421    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1422        {
1423            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1424            reason_set.insert(reason);
1425        }
1426        self.backend.invalidate_task(task, self);
1427    }
1428
1429    fn invalidate_serialization(&self, task: TaskId) {
1430        self.backend.invalidate_serialization(task, self);
1431    }
1432
1433    #[track_caller]
1434    fn try_read_task_output(
1435        &self,
1436        task: TaskId,
1437        options: ReadOutputOptions,
1438    ) -> Result<Result<RawVc, EventListener>> {
1439        if options.consistency == ReadConsistency::Eventual {
1440            debug_assert_not_in_top_level_task("read_task_output");
1441        }
1442        self.backend.try_read_task_output(
1443            task,
1444            current_task_if_available("reading Vcs"),
1445            options,
1446            self,
1447        )
1448    }
1449
1450    #[track_caller]
1451    fn try_read_task_cell(
1452        &self,
1453        task: TaskId,
1454        index: CellId,
1455        options: ReadCellOptions,
1456    ) -> Result<Result<TypedCellContent, EventListener>> {
1457        let reader = current_task_if_available("reading Vcs");
1458        self.backend
1459            .try_read_task_cell(task, index, reader, options, self)
1460    }
1461
1462    fn try_read_own_task_cell(
1463        &self,
1464        current_task: TaskId,
1465        index: CellId,
1466    ) -> Result<TypedCellContent> {
1467        self.backend
1468            .try_read_own_task_cell(current_task, index, self)
1469    }
1470
1471    #[track_caller]
1472    fn try_read_local_output(
1473        &self,
1474        execution_id: ExecutionId,
1475        local_task_id: LocalTaskId,
1476    ) -> Result<Result<RawVc, EventListener>> {
1477        debug_assert_not_in_top_level_task("read_local_output");
1478        CURRENT_TASK_STATE.with(|gts| {
1479            let gts_read = gts.read().unwrap();
1480
1481            // Local Vcs are local to their parent task's current execution, and do not exist
1482            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1483            // marker trait. This assertion exists to handle any potential escapes that the
1484            // compile-time checks cannot capture.
1485            gts_read.assert_execution_id(execution_id);
1486
1487            match gts_read.local_tasks.get(local_task_id) {
1488                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1489                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1490            }
1491        })
1492    }
1493
1494    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1495        // TODO: Add assert_not_in_top_level_task("read_task_collectibles") check here.
1496        // Collectible reads are eventually consistent.
1497        self.backend.read_task_collectibles(
1498            task,
1499            trait_id,
1500            current_task_if_available("reading collectibles"),
1501            self,
1502        )
1503    }
1504
1505    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1506        self.backend.emit_collectible(
1507            trait_type,
1508            collectible,
1509            current_task("emitting collectible"),
1510            self,
1511        );
1512    }
1513
1514    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1515        self.backend.unemit_collectible(
1516            trait_type,
1517            collectible,
1518            count,
1519            current_task("emitting collectible"),
1520            self,
1521        );
1522    }
1523
1524    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1525        for (&collectible, &count) in collectibles {
1526            if count > 0 {
1527                self.backend.unemit_collectible(
1528                    trait_type,
1529                    collectible,
1530                    count as u32,
1531                    current_task("emitting collectible"),
1532                    self,
1533                );
1534            }
1535        }
1536    }
1537
1538    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1539        self.try_read_own_task_cell(task, index)
1540    }
1541
1542    fn update_own_task_cell(
1543        &self,
1544        task: TaskId,
1545        index: CellId,
1546        content: CellContent,
1547        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1548        content_hash: Option<CellHash>,
1549        verification_mode: VerificationMode,
1550    ) {
1551        self.backend.update_task_cell(
1552            task,
1553            index,
1554            content,
1555            updated_key_hashes,
1556            content_hash,
1557            verification_mode,
1558            self,
1559        );
1560    }
1561
1562    fn connect_task(&self, task: TaskId) {
1563        self.backend
1564            .connect_task(task, current_task_if_available("connecting task"), self);
1565    }
1566
1567    fn mark_own_task_as_finished(&self, task: TaskId) {
1568        self.backend.mark_own_task_as_finished(task, self);
1569    }
1570
1571    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1572        self.backend.mark_own_task_as_session_dependent(task, self);
1573    }
1574
1575    /// Creates a future that inherits the current task id and task state. The current global task
1576    /// will wait for this future to be dropped before exiting.
1577    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1578        // this is similar to what happens for a local task, except that we keep the local task's
1579        // state as well.
1580        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1581        global_task_state
1582            .write()
1583            .unwrap()
1584            .local_tasks
1585            .register_detached();
1586        let wrapped = async move {
1587            // use a drop guard for panic safety
1588            struct DropGuard;
1589            impl Drop for DropGuard {
1590                fn drop(&mut self) {
1591                    CURRENT_TASK_STATE
1592                        .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1593                }
1594            }
1595            let _guard = DropGuard;
1596            fut.await;
1597        };
1598        tokio::spawn(TURBO_TASKS.scope(
1599            turbo_tasks(),
1600            CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1601        ));
1602    }
1603
1604    fn task_statistics(&self) -> &TaskStatisticsApi {
1605        self.backend.task_statistics()
1606    }
1607
1608    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1609        let this = self.pin();
1610        Box::pin(async move {
1611            this.stop_and_wait().await;
1612        })
1613    }
1614
1615    fn subscribe_to_compilation_events(
1616        &self,
1617        event_types: Option<Vec<String>>,
1618    ) -> Receiver<Arc<dyn CompilationEvent>> {
1619        self.compilation_events.subscribe(event_types)
1620    }
1621
1622    fn is_tracking_dependencies(&self) -> bool {
1623        self.backend.is_tracking_dependencies()
1624    }
1625}
1626
1627impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1628    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1629        self.pin()
1630    }
1631    fn backend(&self) -> &B {
1632        &self.backend
1633    }
1634
1635    #[track_caller]
1636    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1637        self.schedule_background_job(async move |this| {
1638            this.backend.run_backend_job(job, &*this).await;
1639            this
1640        })
1641    }
1642
1643    #[track_caller]
1644    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1645        self.schedule_foreground_job(async move |this| {
1646            this.backend.run_backend_job(job, &*this).await;
1647            this
1648        })
1649    }
1650
1651    #[track_caller]
1652    fn schedule(&self, task: TaskId, priority: TaskPriority) {
1653        self.schedule(task, priority)
1654    }
1655
1656    fn get_current_task_priority(&self) -> TaskPriority {
1657        CURRENT_TASK_STATE
1658            .try_with(|task_state| task_state.read().unwrap().priority)
1659            .unwrap_or(TaskPriority::initial())
1660    }
1661
1662    fn program_duration_until(&self, instant: Instant) -> Duration {
1663        instant - self.program_start
1664    }
1665
1666    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1667        // SAFETY: This is a fresh id from the factory
1668        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1669    }
1670
1671    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1672        // SAFETY: This is a fresh id from the factory
1673        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1674    }
1675
1676    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1677        unsafe { self.task_id_factory.reuse(id.into()) }
1678    }
1679
1680    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1681        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1682    }
1683
1684    fn is_idle(&self) -> bool {
1685        self.currently_scheduled_foreground_jobs
1686            .load(Ordering::Acquire)
1687            == 0
1688    }
1689}
1690
1691async fn wait_for_local_tasks() {
1692    let listener =
1693        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1694    let Some(listener) = listener else {
1695        return;
1696    };
1697    listener.await;
1698}
1699
1700pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1701    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1702        Ok(id) => id,
1703        Err(_) => panic!(
1704            "{from} can only be used in the context of a turbo_tasks task execution or \
1705             turbo_tasks run"
1706        ),
1707    }
1708}
1709
1710pub(crate) fn current_task(from: &str) -> TaskId {
1711    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1712        Ok(Some(id)) => id,
1713        Ok(None) | Err(_) => {
1714            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1715        }
1716    }
1717}
1718
1719/// Panics if we're not in a top-level task (e.g. [`run_once`]). Some function calls should only
1720/// happen in a top-level task (e.g. [`Effects::apply`][crate::Effects::apply]).
1721#[track_caller]
1722pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1723    if !cfg!(debug_assertions) {
1724        return;
1725    }
1726
1727    let in_top_level = CURRENT_TASK_STATE
1728        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1729        .unwrap_or(true);
1730    if !in_top_level {
1731        panic!("{message}");
1732    }
1733}
1734
1735#[track_caller]
1736pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1737    if !cfg!(debug_assertions) {
1738        return;
1739    }
1740
1741    // HACK: We set this inside of `ReadRawVcFuture` to suppress warnings about an internal
1742    // consistency bug
1743    let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1744        .try_with(|&suppressed| suppressed)
1745        .unwrap_or(false);
1746    if suppressed {
1747        return;
1748    }
1749
1750    let in_top_level = CURRENT_TASK_STATE
1751        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1752        .unwrap_or(false);
1753    if in_top_level {
1754        panic!(
1755            "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1756             Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1757             reads to avoid leaking inconsistent return values."
1758        );
1759    }
1760}
1761
1762pub async fn run<T: Send + 'static>(
1763    tt: Arc<dyn TurboTasksApi>,
1764    future: impl Future<Output = Result<T>> + Send + 'static,
1765) -> Result<T> {
1766    let (tx, rx) = tokio::sync::oneshot::channel();
1767
1768    tt.run(Box::pin(async move {
1769        let result = future.await?;
1770        tx.send(result)
1771            .map_err(|_| anyhow!("unable to send result"))?;
1772        Ok(())
1773    }))
1774    .await?;
1775
1776    Ok(rx.await?)
1777}
1778
1779pub async fn run_once<T: Send + 'static>(
1780    tt: Arc<dyn TurboTasksApi>,
1781    future: impl Future<Output = Result<T>> + Send + 'static,
1782) -> Result<T> {
1783    let (tx, rx) = tokio::sync::oneshot::channel();
1784
1785    tt.run_once(Box::pin(async move {
1786        let result = future.await?;
1787        tx.send(result)
1788            .map_err(|_| anyhow!("unable to send result"))?;
1789        Ok(())
1790    }))
1791    .await?;
1792
1793    Ok(rx.await?)
1794}
1795
1796pub async fn run_once_with_reason<T: Send + 'static>(
1797    tt: Arc<dyn TurboTasksApi>,
1798    reason: impl InvalidationReason,
1799    future: impl Future<Output = Result<T>> + Send + 'static,
1800) -> Result<T> {
1801    let (tx, rx) = tokio::sync::oneshot::channel();
1802
1803    tt.run_once_with_reason(
1804        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1805        Box::pin(async move {
1806            let result = future.await?;
1807            tx.send(result)
1808                .map_err(|_| anyhow!("unable to send result"))?;
1809            Ok(())
1810        }),
1811    )
1812    .await?;
1813
1814    Ok(rx.await?)
1815}
1816
1817/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1818pub fn dynamic_call(
1819    func: &'static NativeFunction,
1820    this: Option<RawVc>,
1821    arg: &mut dyn StackDynTaskInputs,
1822    persistence: TaskPersistence,
1823) -> RawVc {
1824    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1825}
1826
1827/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1828pub fn trait_call(
1829    trait_method: &'static TraitMethod,
1830    this: RawVc,
1831    arg: &mut dyn StackDynTaskInputs,
1832    persistence: TaskPersistence,
1833) -> RawVc {
1834    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1835}
1836
1837pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1838    TURBO_TASKS.with(|arc| arc.clone())
1839}
1840
1841pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1842    TURBO_TASKS.with(Arc::downgrade)
1843}
1844
1845pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1846    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1847}
1848
1849pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1850    TURBO_TASKS.with(|arc| func(arc))
1851}
1852
1853pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1854    TURBO_TASKS.sync_scope(tt, f)
1855}
1856
1857pub fn turbo_tasks_future_scope<T>(
1858    tt: Arc<dyn TurboTasksApi>,
1859    f: impl Future<Output = T>,
1860) -> impl Future<Output = T> {
1861    TURBO_TASKS.scope(tt, f)
1862}
1863
1864pub fn with_turbo_tasks_for_testing<T>(
1865    tt: Arc<dyn TurboTasksApi>,
1866    current_task: TaskId,
1867    execution_id: ExecutionId,
1868    f: impl Future<Output = T>,
1869) -> impl Future<Output = T> {
1870    TURBO_TASKS.scope(
1871        tt,
1872        CURRENT_TASK_STATE.scope(
1873            Arc::new(RwLock::new(CurrentTaskState::new(
1874                current_task,
1875                execution_id,
1876                TaskPriority::initial(),
1877                false, // in_top_level_task
1878            ))),
1879            f,
1880        ),
1881    )
1882}
1883
1884/// Spawns the given future within the context of the current task.
1885///
1886/// Beware: this method is not safe to use in production code. It is only
1887/// intended for use in tests and for debugging purposes.
1888pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1889    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1890}
1891
1892pub fn current_task_for_testing() -> Option<TaskId> {
1893    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1894}
1895
1896/// Marks the current task as dirty when restored from filesystem cache.
1897pub fn mark_session_dependent() {
1898    with_turbo_tasks(|tt| {
1899        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1900    });
1901}
1902
1903/// Marks the current task as finished. This excludes it from waiting for
1904/// strongly consistency.
1905pub fn mark_finished() {
1906    with_turbo_tasks(|tt| {
1907        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1908    });
1909}
1910
1911/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1912/// serialization of the current task cells.
1913///
1914/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1915/// since State allocation implies interior mutability.
1916pub fn get_serialization_invalidator() -> SerializationInvalidator {
1917    CURRENT_TASK_STATE.with(|cell| {
1918        let CurrentTaskState {
1919            task_id,
1920            #[cfg(feature = "verify_determinism")]
1921            stateful,
1922            ..
1923        } = &mut *cell.write().unwrap();
1924        #[cfg(feature = "verify_determinism")]
1925        {
1926            *stateful = true;
1927        }
1928        let Some(task_id) = *task_id else {
1929            panic!(
1930                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1931                 task execution"
1932            );
1933        };
1934        SerializationInvalidator::new(task_id)
1935    })
1936}
1937
1938pub fn mark_invalidator() {
1939    CURRENT_TASK_STATE.with(|cell| {
1940        let CurrentTaskState {
1941            has_invalidator, ..
1942        } = &mut *cell.write().unwrap();
1943        *has_invalidator = true;
1944    })
1945}
1946
1947/// Marks the current task as stateful. This is used to indicate that the task
1948/// has interior mutability (e.g., via [`State`][crate::State]), which means
1949/// the task may produce different outputs even with the same inputs.
1950///
1951/// Only has an effect when the `verify_determinism` feature is enabled.
1952pub fn mark_stateful() {
1953    #[cfg(feature = "verify_determinism")]
1954    {
1955        CURRENT_TASK_STATE.with(|cell| {
1956            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1957            *stateful = true;
1958        })
1959    }
1960    // No-op when verify_determinism is not enabled
1961}
1962
1963/// Marks the current task context as being in a top-level task. When in a top-level task,
1964/// eventually consistent reads will panic. It is almost always a mistake to perform an eventually
1965/// consistent read at the top-level of the application.
1966pub fn mark_top_level_task() {
1967    if cfg!(debug_assertions) {
1968        CURRENT_TASK_STATE.with(|cell| {
1969            cell.write().unwrap().in_top_level_task = true;
1970        })
1971    }
1972}
1973
1974/// Unmarks the current task context as being in a top-level task. The opposite of
1975/// [`mark_top_level_task`].
1976///
1977/// This utility can be okay in unit tests, where we're observing the internal behavior of
1978/// turbo-tasks, but otherwise, it is probably a mistake to call this function.
1979///
1980/// Calling this will allow eventually-consistent reads at the top-level, potentially exposing
1981/// incomplete computations and internal errors caused by eventual consistency that would've been
1982/// caught when the function was re-run. A strongly-consistent read re-runs parts of a task until
1983/// all of the dependencies have settled.
1984pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1985    if cfg!(debug_assertions) {
1986        CURRENT_TASK_STATE.with(|cell| {
1987            cell.write().unwrap().in_top_level_task = false;
1988        })
1989    }
1990}
1991
1992pub fn prevent_gc() {
1993    // TODO implement garbage collection
1994}
1995
1996pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1997    with_turbo_tasks(|tt| {
1998        let raw_vc = collectible.node.node;
1999        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
2000    })
2001}
2002
2003pub(crate) async fn read_task_output(
2004    this: &dyn TurboTasksApi,
2005    id: TaskId,
2006    options: ReadOutputOptions,
2007) -> Result<RawVc> {
2008    loop {
2009        match this.try_read_task_output(id, options)? {
2010            Ok(result) => return Ok(result),
2011            Err(listener) => listener.await,
2012        }
2013    }
2014}
2015
2016/// A reference to a task's cell with methods that allow updating the contents
2017/// of the cell.
2018///
2019/// Mutations should not outside of the task that that owns this cell. Doing so
2020/// is a logic error, and may lead to incorrect caching behavior.
2021#[derive(Clone, Copy)]
2022pub struct CurrentCellRef {
2023    current_task: TaskId,
2024    index: CellId,
2025}
2026
2027type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2028
2029impl CurrentCellRef {
2030    /// Updates the cell if the given `functor` returns a value.
2031    fn conditional_update<T>(
2032        &self,
2033        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2034    ) where
2035        T: VcValueType,
2036    {
2037        self.conditional_update_with_shared_reference(|old_shared_reference| {
2038            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2039            let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2040            Some((
2041                SharedReference::new(triomphe::Arc::new(new_value)),
2042                updated_key_hashes,
2043                content_hash,
2044            ))
2045        })
2046    }
2047
2048    /// Updates the cell if the given `functor` returns a `SharedReference`.
2049    fn conditional_update_with_shared_reference(
2050        &self,
2051        functor: impl FnOnce(
2052            Option<&SharedReference>,
2053        ) -> Option<(
2054            SharedReference,
2055            Option<SmallVec<[u64; 2]>>,
2056            Option<CellHash>,
2057        )>,
2058    ) {
2059        let tt = turbo_tasks();
2060        let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2061        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2062        if let Some((update, updated_key_hashes, content_hash)) = update {
2063            tt.update_own_task_cell(
2064                self.current_task,
2065                self.index,
2066                CellContent(Some(update)),
2067                updated_key_hashes,
2068                content_hash,
2069                VerificationMode::EqualityCheck,
2070            )
2071        }
2072    }
2073
2074    /// Replace the current cell's content with `new_value` if the current content is not equal by
2075    /// value with the existing content.
2076    ///
2077    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
2078    ///
2079    /// Take this example of a custom equality implementation on a transparent wrapper type:
2080    ///
2081    /// ```
2082    /// #[turbo_tasks::value(transparent, eq = "manual")]
2083    /// struct Wrapper(Vec<u32>);
2084    ///
2085    /// impl PartialEq for Wrapper {
2086    ///     fn eq(&self, other: Wrapper) {
2087    ///         // Example: order doesn't matter for equality
2088    ///         let (mut this, mut other) = (self.clone(), other.clone());
2089    ///         this.sort_unstable();
2090    ///         other.sort_unstable();
2091    ///         this == other
2092    ///     }
2093    /// }
2094    ///
2095    /// impl Eq for Wrapper {}
2096    /// ```
2097    ///
2098    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2099    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2100    ///
2101    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2102    /// just forwards to the inner value's [`PartialEq`].
2103    ///
2104    /// If you already have a `SharedReference`, consider calling
2105    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2106    /// object.
2107    pub fn compare_and_update<T>(&self, new_value: T)
2108    where
2109        T: PartialEq + VcValueType,
2110    {
2111        self.conditional_update(|old_value| {
2112            if let Some(old_value) = old_value
2113                && old_value == &new_value
2114            {
2115                return None;
2116            }
2117            Some((new_value, None, None))
2118        });
2119    }
2120
2121    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2122    /// equal by value with the existing content.
2123    ///
2124    /// If you already have a `SharedReference`, this is a faster version of
2125    /// [`CurrentCellRef::compare_and_update`].
2126    ///
2127    /// The value should be stored in [`SharedReference`] using the type `T`.
2128    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2129    where
2130        T: VcValueType + PartialEq,
2131    {
2132        self.conditional_update_with_shared_reference(|old_sr| {
2133            if let Some(old_sr) = old_sr {
2134                let old_value = extract_sr_value::<T>(old_sr);
2135                let new_value = extract_sr_value::<T>(&new_shared_reference);
2136                if old_value == new_value {
2137                    return None;
2138                }
2139            }
2140            Some((new_shared_reference, None, None))
2141        });
2142    }
2143
2144    /// Replace the current cell's content if the new value is different.
2145    ///
2146    /// Like [`Self::compare_and_update`], but also computes and stores a hash of the value.
2147    /// When the cell's transient data is evicted, the stored hash enables the backend to detect
2148    /// whether the value actually changed without re-comparing values—avoiding unnecessary
2149    /// downstream invalidation.
2150    ///
2151    /// Requires `T: DeterministicHash` in addition to `T: PartialEq`.
2152    pub fn hashed_compare_and_update<T>(&self, new_value: T)
2153    where
2154        T: PartialEq + DeterministicHash + VcValueType,
2155    {
2156        self.conditional_update(|old_value| {
2157            if let Some(old_value) = old_value
2158                && old_value == &new_value
2159            {
2160                return None;
2161            }
2162            let content_hash = hash_xxh3_hash128(&new_value);
2163            Some((new_value, None, Some(content_hash)))
2164        });
2165    }
2166
2167    /// Replace the current cell's content if the new value (from a pre-existing
2168    /// [`SharedReference`]) is different.
2169    ///
2170    /// Like [`Self::compare_and_update_with_shared_reference`], but also passes a hash
2171    /// for hash-based change detection when transient data has been evicted.
2172    pub fn hashed_compare_and_update_with_shared_reference<T>(
2173        &self,
2174        new_shared_reference: SharedReference,
2175    ) where
2176        T: VcValueType + PartialEq + DeterministicHash,
2177    {
2178        self.conditional_update_with_shared_reference(move |old_sr| {
2179            if let Some(old_sr) = old_sr {
2180                let old_value = extract_sr_value::<T>(old_sr);
2181                let new_value = extract_sr_value::<T>(&new_shared_reference);
2182                if old_value == new_value {
2183                    return None;
2184                }
2185            }
2186            let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2187            Some((new_shared_reference, None, Some(content_hash)))
2188        });
2189    }
2190
2191    /// See [`Self::compare_and_update`], but selectively update individual keys.
2192    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2193    where
2194        T: PartialEq + VcValueType,
2195        VcReadTarget<T>: KeyedEq,
2196        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2197    {
2198        self.conditional_update(|old_value| {
2199            let Some(old_value) = old_value else {
2200                return Some((new_value, None, None));
2201            };
2202            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2203            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2204            let updated_keys = old_value.different_keys(new_value_ref);
2205            if updated_keys.is_empty() {
2206                return None;
2207            }
2208            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2209            let updated_key_hashes = updated_keys
2210                .into_iter()
2211                .map(|key| FxBuildHasher.hash_one(key))
2212                .collect();
2213            Some((new_value, Some(updated_key_hashes), None))
2214        });
2215    }
2216
2217    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2218    /// keys.
2219    pub fn keyed_compare_and_update_with_shared_reference<T>(
2220        &self,
2221        new_shared_reference: SharedReference,
2222    ) where
2223        T: VcValueType + PartialEq,
2224        VcReadTarget<T>: KeyedEq,
2225        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2226    {
2227        self.conditional_update_with_shared_reference(|old_sr| {
2228            let Some(old_sr) = old_sr else {
2229                return Some((new_shared_reference, None, None));
2230            };
2231            let old_value = extract_sr_value::<T>(old_sr);
2232            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2233            let new_value = extract_sr_value::<T>(&new_shared_reference);
2234            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2235            let updated_keys = old_value.different_keys(new_value);
2236            if updated_keys.is_empty() {
2237                return None;
2238            }
2239            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2240            let updated_key_hashes = updated_keys
2241                .into_iter()
2242                .map(|key| FxBuildHasher.hash_one(key))
2243                .collect();
2244            Some((new_shared_reference, Some(updated_key_hashes), None))
2245        });
2246    }
2247
2248    /// Unconditionally updates the content of the cell.
2249    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2250    where
2251        T: VcValueType,
2252    {
2253        let tt = turbo_tasks();
2254        tt.update_own_task_cell(
2255            self.current_task,
2256            self.index,
2257            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2258            None,
2259            None,
2260            verification_mode,
2261        )
2262    }
2263
2264    /// A faster version of [`Self::update`] if you already have a
2265    /// [`SharedReference`].
2266    ///
2267    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2268    /// by identity, no update is performed.
2269    ///
2270    /// The value should be stored in [`SharedReference`] using the type `T`.
2271    pub fn update_with_shared_reference(
2272        &self,
2273        shared_ref: SharedReference,
2274        verification_mode: VerificationMode,
2275    ) {
2276        let tt = turbo_tasks();
2277        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2278            let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2279            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2280                // pointer equality (not value equality)
2281                shared_ref_exp != shared_ref
2282            } else {
2283                true
2284            }
2285        } else {
2286            true
2287        };
2288        if update {
2289            tt.update_own_task_cell(
2290                self.current_task,
2291                self.index,
2292                CellContent(Some(shared_ref)),
2293                None,
2294                None,
2295                verification_mode,
2296            )
2297        }
2298    }
2299}
2300
2301impl From<CurrentCellRef> for RawVc {
2302    fn from(cell: CurrentCellRef) -> Self {
2303        RawVc::TaskCell(cell.current_task, cell.index)
2304    }
2305}
2306
2307fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2308    sr.0.downcast_ref::<T>()
2309        .expect("cannot update SharedReference of different type")
2310}
2311
2312pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2313    find_cell_by_id(T::get_value_type_id())
2314}
2315
2316pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2317    CURRENT_TASK_STATE.with(|ts| {
2318        let current_task = current_task("celling turbo_tasks values");
2319        let mut ts = ts.write().unwrap();
2320        let map = ts.cell_counters.as_mut().unwrap();
2321        let current_index = map.entry(ty).or_default();
2322        let index = *current_index;
2323        *current_index += 1;
2324        CurrentCellRef {
2325            current_task,
2326            index: CellId { type_id: ty, index },
2327        }
2328    })
2329}
2330
2331pub(crate) async fn read_local_output(
2332    this: &dyn TurboTasksApi,
2333    execution_id: ExecutionId,
2334    local_task_id: LocalTaskId,
2335) -> Result<RawVc> {
2336    loop {
2337        match this.try_read_local_output(execution_id, local_task_id)? {
2338            Ok(raw_vc) => return Ok(raw_vc),
2339            Err(event_listener) => event_listener.await,
2340        }
2341    }
2342}