Skip to main content

turbo_tasks/
manager.rs

1use std::{
2    cmp::Reverse,
3    fmt::{Debug, Display},
4    future::Future,
5    hash::{BuildHasher, BuildHasherDefault},
6    mem::take,
7    panic::AssertUnwindSafe,
8    pin::Pin,
9    process::abort,
10    sync::{
11        Arc, Mutex, RwLock, Weak,
12        atomic::{AtomicBool, AtomicUsize, Ordering},
13    },
14    time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30    CellId, Completion, InvalidationReason, InvalidationReasonSet, OutputContent, RawVc,
31    ReadCellOptions, ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod,
32    ValueTypeId, Vc, VcRead, VcValueTrait, VcValueType,
33    backend::{
34        Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35        TurboTasksExecutionError, TypedCellContent, VerificationMode,
36    },
37    capture_future::CaptureFuture,
38    dyn_task_inputs::DynTaskInputsStorage,
39    event::{Event, EventListener},
40    id::{ExecutionId, LocalTaskId, TraitTypeId},
41    keyed::KeyedEq,
42    local_task_tracker::LocalTaskTracker,
43    macro_helpers::NativeFunction,
44    message_queue::{CompilationEvent, CompilationEventQueue},
45    priority_runner::{Executor, PriorityRunner},
46    registry,
47    serialization_invalidation::SerializationInvalidator,
48    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
49    task_statistics::TaskStatisticsApi,
50    trace::TraceRawVcs,
51    util::{IdFactory, StaticOrArc},
52};
53
54/// Common base trait for [`TurboTasksApi`] and [`TurboTasks`]. Provides APIs for creating tasks
55/// from function calls.
56pub trait TurboTasksCallApi: Sync + Send {
57    /// Calls a native function with arguments. Resolves arguments when needed
58    /// with a wrapper task.
59    ///
60    /// `inputs_resolved` is `TaskInput::is_resolved(&args)` computed at the macro callsite on
61    /// the concrete tuple type — when [`InputResolution::Resolved`], the fast path skips wrapper
62    /// task creation.
63    fn dynamic_call(
64        &self,
65        native_fn: &'static NativeFunction,
66        this: Option<RawVc>,
67        arg: &mut dyn DynTaskInputsStorage,
68        inputs_resolved: InputResolution,
69        persistence: TaskPersistence,
70    ) -> RawVc;
71    /// Call a native function with arguments.
72    /// All inputs must be resolved.
73    fn native_call(
74        &self,
75        native_fn: &'static NativeFunction,
76        this: Option<RawVc>,
77        arg: &mut dyn DynTaskInputsStorage,
78        persistence: TaskPersistence,
79    ) -> RawVc;
80    /// Calls a trait method with arguments. First input is the `self` object.
81    /// Uses a wrapper task to resolve.
82    ///
83    /// `inputs_resolved` is the macro-site `InputResolution` of the *exposed* tuple; when filtering
84    /// is involved, the post-filter value is computed inside the filter functor and supersedes
85    /// this argument.
86    fn trait_call(
87        &self,
88        trait_method: &'static TraitMethod,
89        this: RawVc,
90        arg: &mut dyn DynTaskInputsStorage,
91        inputs_resolved: InputResolution,
92        persistence: TaskPersistence,
93    ) -> RawVc;
94
95    fn run(
96        &self,
97        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
98    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
99    fn run_once(
100        &self,
101        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
102    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
103    fn run_once_with_reason(
104        &self,
105        reason: StaticOrArc<dyn InvalidationReason>,
106        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
107    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
108    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
109
110    /// Sends a compilation event to subscribers.
111    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
112
113    /// Returns a human-readable name for the given task.
114    fn get_task_name(&self, task: TaskId) -> String;
115}
116
117/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
118/// context. Returned by the [`turbo_tasks`] helper function.
119///
120/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
121/// parameter.
122pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
123    fn invalidate(&self, task: TaskId);
124    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
125
126    fn invalidate_serialization(&self, task: TaskId);
127
128    fn try_read_task_output(
129        &self,
130        task: TaskId,
131        options: ReadOutputOptions,
132    ) -> Result<Result<RawVc, EventListener>>;
133
134    fn try_read_task_cell(
135        &self,
136        task: TaskId,
137        index: CellId,
138        options: ReadCellOptions,
139    ) -> Result<Result<TypedCellContent, EventListener>>;
140
141    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
142    /// task points to.
143    ///
144    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
145    /// recursively or in a loop.
146    ///
147    /// This does not accept a consistency argument, as you cannot control consistency of a read of
148    /// an operation owned by your own task. Strongly consistent reads are only allowed on
149    /// [`OperationVc`]s, which should never be local tasks.
150    ///
151    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
152    /// task to depend on itself.
153    ///
154    /// [`OperationVc`]: crate::OperationVc
155    fn try_read_local_output(
156        &self,
157        execution_id: ExecutionId,
158        local_task_id: LocalTaskId,
159    ) -> Result<Result<RawVc, EventListener>>;
160
161    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
162
163    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
164    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
165    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
166
167    /// INVALIDATION: Be careful with this, it will not track dependencies, so
168    /// using it could break cache invalidation.
169    fn try_read_own_task_cell(
170        &self,
171        current_task: TaskId,
172        index: CellId,
173    ) -> Result<TypedCellContent>;
174
175    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
176    fn update_own_task_cell(
177        &self,
178        task: TaskId,
179        index: CellId,
180        content: CellContent,
181        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
182        content_hash: Option<CellHash>,
183        verification_mode: VerificationMode,
184    );
185    fn mark_own_task_as_finished(&self, task: TaskId);
186
187    fn connect_task(&self, task: TaskId);
188
189    /// Wraps the given future in the current task.
190    ///
191    /// Beware: this method is not safe to use in production code. It is only intended for use in
192    /// tests and for debugging purposes.
193    fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
194
195    fn task_statistics(&self) -> &TaskStatisticsApi;
196
197    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
198
199    fn subscribe_to_compilation_events(
200        &self,
201        event_types: Option<Vec<String>>,
202    ) -> Receiver<Arc<dyn CompilationEvent>>;
203
204    // Returns true if TurboTasks is configured to track dependencies.
205    fn is_tracking_dependencies(&self) -> bool;
206}
207
208/// A wrapper around a value that is unused.
209pub struct Unused<T> {
210    inner: T,
211}
212
213impl<T> Unused<T> {
214    /// Creates a new unused value.
215    ///
216    /// # Safety
217    ///
218    /// The wrapped value must not be used.
219    pub unsafe fn new_unchecked(inner: T) -> Self {
220        Self { inner }
221    }
222
223    /// Get the inner value, without consuming the `Unused` wrapper.
224    ///
225    /// # Safety
226    ///
227    /// The user need to make sure that the value stays unused.
228    pub unsafe fn get_unchecked(&self) -> &T {
229        &self.inner
230    }
231
232    /// Unwraps the value, consuming the `Unused` wrapper.
233    pub fn into(self) -> T {
234        self.inner
235    }
236}
237
238#[allow(clippy::manual_non_exhaustive)]
239pub struct UpdateInfo {
240    pub duration: Duration,
241    pub tasks: usize,
242    pub reasons: InvalidationReasonSet,
243    #[allow(dead_code)]
244    placeholder_for_future_fields: (),
245}
246
247#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
248pub enum TaskPersistence {
249    /// Tasks that may be persisted across sessions using serialization.
250    Persistent,
251
252    /// Tasks that will be persisted in memory for the life of this session, but won't persist
253    /// between sessions.
254    ///
255    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
256    /// type [`TransientValue`][crate::value::TransientValue] or
257    /// [`TransientInstance`][crate::value::TransientInstance].
258    Transient,
259}
260
261impl Display for TaskPersistence {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        match self {
264            TaskPersistence::Persistent => write!(f, "persistent"),
265            TaskPersistence::Transient => write!(f, "transient"),
266        }
267    }
268}
269
270/// Whether a task call's inputs are already resolved, decided on the concrete input tuple at the
271/// call site. Travels alongside [`TaskPersistence`] through [`dynamic_call`] / [`trait_call`].
272#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
273pub enum InputResolution {
274    /// All inputs (and `this`, where applicable) are resolved — eligible for the synchronous fast
275    /// path with no async resolution task.
276    Resolved,
277    /// At least one input is unresolved and must be resolved in a local task first.
278    Unresolved,
279}
280
281impl InputResolution {
282    #[inline]
283    pub fn from_is_resolved(is_resolved: bool) -> Self {
284        if is_resolved {
285            Self::Resolved
286        } else {
287            Self::Unresolved
288        }
289    }
290
291    #[inline]
292    pub fn is_resolved(self) -> bool {
293        matches!(self, Self::Resolved)
294    }
295}
296
297#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
298pub enum ReadConsistency {
299    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
300    /// may later trigger re-computation.
301    #[default]
302    Eventual,
303    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
304    /// the cost of slower reads.
305    ///
306    /// Top-level code that returns data to the user should use strongly consistent reads.
307    Strong,
308}
309
310#[derive(Clone, Copy, Debug, Eq, PartialEq)]
311pub enum ReadCellTracking {
312    /// Reads are tracked as dependencies of the current task.
313    Tracked {
314        /// The key used for the dependency
315        key: Option<u64>,
316    },
317    /// The read is only tracked when there is an error, otherwise it is untracked.
318    ///
319    /// INVALIDATION: Be careful with this, it will not track dependencies, so
320    /// using it could break cache invalidation.
321    TrackOnlyError,
322    /// The read is not tracked as a dependency of the current task.
323    ///
324    /// INVALIDATION: Be careful with this, it will not track dependencies, so
325    /// using it could break cache invalidation.
326    Untracked,
327}
328
329impl ReadCellTracking {
330    pub fn should_track(&self, is_err: bool) -> bool {
331        match self {
332            ReadCellTracking::Tracked { .. } => true,
333            ReadCellTracking::TrackOnlyError => is_err,
334            ReadCellTracking::Untracked => false,
335        }
336    }
337
338    pub fn key(&self) -> Option<u64> {
339        match self {
340            ReadCellTracking::Tracked { key } => *key,
341            ReadCellTracking::TrackOnlyError => None,
342            ReadCellTracking::Untracked => None,
343        }
344    }
345}
346
347impl Default for ReadCellTracking {
348    fn default() -> Self {
349        ReadCellTracking::Tracked { key: None }
350    }
351}
352
353impl Display for ReadCellTracking {
354    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355        match self {
356            ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
357            ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
358            ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
359            ReadCellTracking::Untracked => write!(f, "untracked"),
360        }
361    }
362}
363
364#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
365pub enum ReadTracking {
366    /// Reads are tracked as dependencies of the current task.
367    #[default]
368    Tracked,
369    /// The read is only tracked when there is an error, otherwise it is untracked.
370    ///
371    /// INVALIDATION: Be careful with this, it will not track dependencies, so
372    /// using it could break cache invalidation.
373    TrackOnlyError,
374    /// The read is not tracked as a dependency of the current task.
375    ///
376    /// INVALIDATION: Be careful with this, it will not track dependencies, so
377    /// using it could break cache invalidation.
378    Untracked,
379}
380
381impl ReadTracking {
382    pub fn should_track(&self, is_err: bool) -> bool {
383        match self {
384            ReadTracking::Tracked => true,
385            ReadTracking::TrackOnlyError => is_err,
386            ReadTracking::Untracked => false,
387        }
388    }
389}
390
391impl Display for ReadTracking {
392    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393        match self {
394            ReadTracking::Tracked => write!(f, "tracked"),
395            ReadTracking::TrackOnlyError => write!(f, "track only error"),
396            ReadTracking::Untracked => write!(f, "untracked"),
397        }
398    }
399}
400
401#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
402pub enum TaskPriority {
403    #[default]
404    Initial,
405    Invalidation {
406        priority: Reverse<u32>,
407    },
408    Recomputation,
409}
410
411impl TaskPriority {
412    pub fn invalidation(priority: u32) -> Self {
413        Self::Invalidation {
414            priority: Reverse(priority),
415        }
416    }
417
418    pub fn initial() -> Self {
419        Self::Initial
420    }
421
422    pub fn leaf() -> Self {
423        Self::Invalidation {
424            priority: Reverse(0),
425        }
426    }
427
428    pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
429        match self {
430            TaskPriority::Initial => parent_priority,
431            TaskPriority::Invalidation { priority } => {
432                if let TaskPriority::Invalidation {
433                    priority: parent_priority,
434                } = parent_priority
435                    && priority.0 < parent_priority.0
436                {
437                    Self::Invalidation {
438                        priority: Reverse(parent_priority.0.saturating_add(1)),
439                    }
440                } else {
441                    *self
442                }
443            }
444            TaskPriority::Recomputation => TaskPriority::Recomputation,
445        }
446    }
447}
448
449impl Display for TaskPriority {
450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451        match self {
452            TaskPriority::Initial => write!(f, "initial"),
453            TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
454            TaskPriority::Recomputation => write!(f, "recomputation"),
455        }
456    }
457}
458
459enum ScheduledTask {
460    Task {
461        task_id: TaskId,
462        span: Span,
463    },
464    LocalTask {
465        ty: LocalTaskSpec,
466        persistence: TaskPersistence,
467        local_task_id: LocalTaskId,
468        global_task_state: Arc<RwLock<CurrentTaskState>>,
469        span: Span,
470    },
471}
472
473pub struct TurboTasks<B: Backend + 'static> {
474    this: Weak<Self>,
475    backend: B,
476    execution_id_factory: IdFactory<ExecutionId>,
477    stopped: AtomicBool,
478    currently_scheduled_foreground_jobs: AtomicUsize,
479    currently_scheduled_background_jobs: AtomicUsize,
480    scheduled_tasks: AtomicUsize,
481    priority_runner:
482        Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
483    start: Mutex<Option<Instant>>,
484    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
485    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
486    event_foreground_start: Event,
487    /// Event that is triggered when all foreground jobs are done
488    /// (currently_scheduled_foreground_jobs becomes zero)
489    event_foreground_done: Event,
490    /// Event that is triggered when all background jobs are done
491    event_background_done: Event,
492    compilation_events: CompilationEventQueue,
493}
494
495/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
496/// all share the same non-local task state.
497///
498/// A non-local task is one that:
499///
500/// - Has a unique task id.
501/// - Is potentially cached.
502/// - The backend is aware of.
503struct CurrentTaskState {
504    task_id: Option<TaskId>,
505    execution_id: ExecutionId,
506    priority: TaskPriority,
507
508    /// True if the current task has state in cells (interior mutability).
509    /// Only tracked when verify_determinism feature is enabled.
510    #[cfg(feature = "verify_determinism")]
511    stateful: bool,
512
513    /// True if the current task uses an external invalidator
514    has_invalidator: bool,
515
516    /// True if we're in a top-level task (e.g. `.run_once(...)` or `.run(...)`).
517    /// Eventually consistent reads are not allowed in top-level tasks.
518    in_top_level_task: bool,
519
520    /// Tracks how many cells of each type has been allocated so far during this task execution.
521    /// When a task is re-executed, the cell count may not match the existing cell vec length.
522    ///
523    /// This is taken (and becomes `None`) during teardown of a task.
524    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
525
526    /// Tracks execution of Local tasks (and detached test futures) created during this global
527    /// task's execution.
528    local_tasks: LocalTaskTracker,
529}
530
531impl CurrentTaskState {
532    fn new(
533        task_id: TaskId,
534        execution_id: ExecutionId,
535        priority: TaskPriority,
536        in_top_level_task: bool,
537    ) -> Self {
538        Self {
539            task_id: Some(task_id),
540            execution_id,
541            priority,
542            #[cfg(feature = "verify_determinism")]
543            stateful: false,
544            has_invalidator: false,
545            in_top_level_task,
546            cell_counters: Some(AutoMap::default()),
547            local_tasks: LocalTaskTracker::new(),
548        }
549    }
550
551    fn new_temporary(
552        execution_id: ExecutionId,
553        priority: TaskPriority,
554        in_top_level_task: bool,
555    ) -> Self {
556        Self {
557            task_id: None,
558            execution_id,
559            priority,
560            #[cfg(feature = "verify_determinism")]
561            stateful: false,
562            has_invalidator: false,
563            in_top_level_task,
564            cell_counters: None,
565            local_tasks: LocalTaskTracker::new(),
566        }
567    }
568
569    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
570        if self.execution_id != expected_execution_id {
571            panic!(
572                "Local tasks can only be scheduled/awaited within the same execution of the \
573                 parent task that created them"
574            );
575        }
576    }
577}
578
579// TODO implement our own thread pool and make these thread locals instead
580task_local! {
581    /// The current TurboTasks instance
582    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
583
584    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
585
586    /// Temporarily suppresses the eventual consistency check in top-level tasks.
587    /// This is used by strongly consistent reads to allow them to succeed in top-level tasks.
588    /// This is NOT shared across local tasks (unlike CURRENT_TASK_STATE), so it's safe
589    /// to set/unset without race conditions.
590    pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
591}
592
593impl<B: Backend + 'static> TurboTasks<B> {
594    // TODO better lifetime management for turbo tasks
595    // consider using unsafe for the task_local turbo tasks
596    // that should be safe as long tasks can't outlife turbo task
597    // so we probably want to make sure that all tasks are joined
598    // when trying to drop turbo tasks
599    pub fn new(backend: B) -> Arc<Self> {
600        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
601        let this = Arc::new_cyclic(|this| Self {
602            this: this.clone(),
603            backend,
604            execution_id_factory,
605            stopped: AtomicBool::new(false),
606            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
607            currently_scheduled_background_jobs: AtomicUsize::new(0),
608            scheduled_tasks: AtomicUsize::new(0),
609            priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
610            start: Default::default(),
611            aggregated_update: Default::default(),
612            event_foreground_done: Event::new(|| {
613                || "TurboTasks::event_foreground_done".to_string()
614            }),
615            event_foreground_start: Event::new(|| {
616                || "TurboTasks::event_foreground_start".to_string()
617            }),
618            event_background_done: Event::new(|| {
619                || "TurboTasks::event_background_done".to_string()
620            }),
621            compilation_events: CompilationEventQueue::default(),
622        });
623        this.backend.startup(&*this);
624        this
625    }
626
627    pub fn pin(&self) -> Arc<Self> {
628        self.this.upgrade().unwrap()
629    }
630
631    /// Creates a new root task
632    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
633    where
634        T: ?Sized,
635        F: Fn() -> Fut + Send + Sync + Clone + 'static,
636        Fut: Future<Output = Result<Vc<T>>> + Send,
637    {
638        let id = self.backend.create_transient_task(
639            TransientTaskType::Root(Box::new(move || {
640                let functor = functor.clone();
641                Box::pin(async move {
642                    mark_top_level_task();
643                    let raw_vc = functor().await?.node;
644                    raw_vc.to_non_local().await
645                })
646            })),
647            self,
648        );
649        self.schedule(id, TaskPriority::initial());
650        id
651    }
652
653    pub fn dispose_root_task(&self, task_id: TaskId) {
654        self.backend.dispose_root_task(task_id, self);
655    }
656
657    // TODO make sure that all dependencies settle before reading them
658    /// Creates a new root task, that is only executed once.
659    /// Dependencies will not invalidate the task.
660    #[track_caller]
661    fn spawn_once_task<T, Fut>(&self, future: Fut)
662    where
663        T: ?Sized,
664        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
665    {
666        let id = self.backend.create_transient_task(
667            TransientTaskType::Once(Box::pin(async move {
668                mark_top_level_task();
669                let raw_vc = future.await?.node;
670                raw_vc.to_non_local().await
671            })),
672            self,
673        );
674        self.schedule(id, TaskPriority::initial());
675    }
676
677    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
678        &self,
679        future: impl Future<Output = Result<T>> + Send + 'static,
680    ) -> Result<T> {
681        let (tx, rx) = tokio::sync::oneshot::channel();
682        self.spawn_once_task(async move {
683            mark_top_level_task();
684            let result = future.await;
685            tx.send(result)
686                .map_err(|_| anyhow!("unable to send result"))?;
687            Ok(Completion::new())
688        });
689
690        rx.await?
691    }
692
693    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
694    pub async fn run<T: TraceRawVcs + Send + 'static>(
695        &self,
696        future: impl Future<Output = Result<T>> + Send + 'static,
697    ) -> Result<T, TurboTasksExecutionError> {
698        self.begin_foreground_job();
699        // it's okay for execution ids to overflow and wrap, they're just used for an assert
700        let execution_id = self.execution_id_factory.wrapping_get();
701        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
702            execution_id,
703            TaskPriority::initial(),
704            true, // in_top_level_task
705        )));
706
707        let result = TURBO_TASKS
708            .scope(
709                self.pin(),
710                CURRENT_TASK_STATE.scope(current_task_state, async {
711                    let result = CaptureFuture::new(future).await;
712
713                    // wait for all spawned local tasks using `local` to finish
714                    wait_for_local_tasks().await;
715
716                    match result {
717                        Ok(Ok(value)) => Ok(value),
718                        Ok(Err(err)) => Err(err.into()),
719                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
720                    }
721                }),
722            )
723            .await;
724        self.finish_foreground_job();
725        result
726    }
727
728    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
729        let this = self.pin();
730        tokio::spawn(async move {
731            this.pin()
732                .run_once(async move {
733                    this.finish_foreground_job();
734                    future.await;
735                    this.begin_foreground_job();
736                    Ok(())
737                })
738                .await
739                .unwrap()
740        });
741    }
742
743    pub(crate) fn native_call(
744        &self,
745        native_fn: &'static NativeFunction,
746        this: Option<RawVc>,
747        arg: &mut dyn DynTaskInputsStorage,
748        persistence: TaskPersistence,
749    ) -> RawVc {
750        RawVc::TaskOutput(self.backend.get_or_create_task(
751            native_fn,
752            this,
753            arg,
754            current_task_if_available("turbo_function calls"),
755            persistence,
756            self,
757        ))
758    }
759
760    pub fn dynamic_call(
761        &self,
762        native_fn: &'static NativeFunction,
763        this: Option<RawVc>,
764        arg: &mut dyn DynTaskInputsStorage,
765        inputs_resolved: InputResolution,
766        persistence: TaskPersistence,
767    ) -> RawVc {
768        if inputs_resolved.is_resolved() && this.is_none_or(|this| this.is_resolved()) {
769            return self.native_call(native_fn, this, arg, persistence);
770        }
771        // Need async resolution — must move the arg to the heap now
772        let arg = arg.take_box();
773        let task_type = LocalTaskSpec {
774            task_type: LocalTaskType::ResolveNative { native_fn },
775            this,
776            arg,
777        };
778        self.schedule_local_task(task_type, persistence)
779    }
780
781    pub fn trait_call(
782        &self,
783        trait_method: &'static TraitMethod,
784        this: RawVc,
785        arg: &mut dyn DynTaskInputsStorage,
786        inputs_resolved: InputResolution,
787        persistence: TaskPersistence,
788    ) -> RawVc {
789        // avoid creating a wrapper task if self is already resolved
790        // for resolved cells we already know the value type so we can lookup the
791        // function
792        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
793            match registry::get_value_type(type_id).get_trait_method(trait_method) {
794                Some(native_fn) => {
795                    if let Some(filter) = native_fn.arg_meta.filter_owned {
796                        let (resolved, mut arg) = (filter)(arg);
797                        return self.dynamic_call(
798                            native_fn,
799                            Some(this),
800                            &mut arg,
801                            resolved,
802                            persistence,
803                        );
804                    } else {
805                        return self.dynamic_call(
806                            native_fn,
807                            Some(this),
808                            arg,
809                            inputs_resolved,
810                            persistence,
811                        );
812                    }
813                }
814                None => {
815                    // We are destined to fail at this point, but we just retry resolution in the
816                    // local task since we cannot report an error from here.
817                    // TODO: A panic seems appropriate since the immediate caller is to blame
818                }
819            }
820        }
821
822        // create a wrapper task to resolve all inputs
823        let task_type = LocalTaskSpec {
824            task_type: LocalTaskType::ResolveTrait { trait_method },
825            this: Some(this),
826            arg: arg.take_box(),
827        };
828
829        self.schedule_local_task(task_type, persistence)
830    }
831
832    #[track_caller]
833    pub fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
834        self.begin_foreground_job();
835        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
836
837        self.priority_runner.schedule(
838            &self.pin(),
839            ScheduledTask::Task {
840                task_id,
841                span: Span::current(),
842            },
843            priority,
844        );
845    }
846
847    fn schedule_local_task(
848        &self,
849        ty: LocalTaskSpec,
850        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
851        persistence: TaskPersistence,
852    ) -> RawVc {
853        let task_type = ty.task_type;
854        let (global_task_state, execution_id, priority, local_task_id) =
855            CURRENT_TASK_STATE.with(|gts| {
856                let mut gts_write = gts.write().unwrap();
857                let local_task_id = gts_write.local_tasks.create(task_type);
858                (
859                    Arc::clone(gts),
860                    gts_write.execution_id,
861                    gts_write.priority,
862                    local_task_id,
863                )
864            });
865
866        self.priority_runner.schedule(
867            &self.pin(),
868            ScheduledTask::LocalTask {
869                ty,
870                persistence,
871                local_task_id,
872                global_task_state,
873                span: Span::current(),
874            },
875            priority,
876        );
877
878        RawVc::LocalOutput(execution_id, local_task_id, persistence)
879    }
880
881    fn begin_foreground_job(&self) {
882        if self
883            .currently_scheduled_foreground_jobs
884            .fetch_add(1, Ordering::AcqRel)
885            == 0
886        {
887            *self.start.lock().unwrap() = Some(Instant::now());
888            self.event_foreground_start.notify(usize::MAX);
889            self.backend.idle_end(self);
890        }
891    }
892
893    fn finish_foreground_job(&self) {
894        if self
895            .currently_scheduled_foreground_jobs
896            .fetch_sub(1, Ordering::AcqRel)
897            == 1
898        {
899            self.backend.idle_start(self);
900            // That's not super race-condition-safe, but it's only for
901            // statistical reasons
902            let total = self.scheduled_tasks.load(Ordering::Acquire);
903            self.scheduled_tasks.store(0, Ordering::Release);
904            if let Some(start) = *self.start.lock().unwrap() {
905                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
906                if let Some(update) = update.as_mut() {
907                    update.0 += start.elapsed();
908                    update.1 += total;
909                } else {
910                    *update = Some((start.elapsed(), total));
911                }
912            }
913            self.event_foreground_done.notify(usize::MAX);
914        }
915    }
916
917    fn begin_background_job(&self) {
918        self.currently_scheduled_background_jobs
919            .fetch_add(1, Ordering::Relaxed);
920    }
921
922    fn finish_background_job(&self) {
923        if self
924            .currently_scheduled_background_jobs
925            .fetch_sub(1, Ordering::Relaxed)
926            == 1
927        {
928            self.event_background_done.notify(usize::MAX);
929        }
930    }
931
932    pub fn get_in_progress_count(&self) -> usize {
933        self.currently_scheduled_foreground_jobs
934            .load(Ordering::Acquire)
935    }
936
937    /// Waits for the given task to finish executing. This works by performing an untracked read,
938    /// and discarding the value of the task output.
939    ///
940    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
941    /// before all dependencies have completely settled.
942    ///
943    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
944    /// to fully settle before returning.
945    ///
946    /// As this function is typically called in top-level code that waits for results to be ready
947    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
948    pub async fn wait_task_completion(
949        &self,
950        id: TaskId,
951        consistency: ReadConsistency,
952    ) -> Result<()> {
953        read_task_output(
954            self,
955            id,
956            ReadOutputOptions {
957                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
958                tracking: ReadTracking::Untracked,
959                consistency,
960            },
961        )
962        .await?;
963        Ok(())
964    }
965
966    /// Returns [UpdateInfo] with all updates aggregated over a given duration
967    /// (`aggregation`). Will wait until an update happens.
968    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
969        self.aggregated_update_info(aggregation, Duration::MAX)
970            .await
971            .unwrap()
972    }
973
974    /// Returns [UpdateInfo] with all updates aggregated over a given duration
975    /// (`aggregation`). Will only return None when the timeout is reached while
976    /// waiting for the first update.
977    pub async fn aggregated_update_info(
978        &self,
979        aggregation: Duration,
980        timeout: Duration,
981    ) -> Option<UpdateInfo> {
982        let listener = self
983            .event_foreground_done
984            .listen_with_note(|| || "wait for update info".to_string());
985        let wait_for_finish = {
986            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
987            if aggregation.is_zero() {
988                if let Some((duration, tasks)) = update.take() {
989                    return Some(UpdateInfo {
990                        duration,
991                        tasks,
992                        reasons: take(reason_set),
993                        placeholder_for_future_fields: (),
994                    });
995                } else {
996                    true
997                }
998            } else {
999                update.is_none()
1000            }
1001        };
1002        if wait_for_finish {
1003            if timeout == Duration::MAX {
1004                // wait for finish
1005                listener.await;
1006            } else {
1007                // wait for start, then wait for finish or timeout
1008                let start_listener = self
1009                    .event_foreground_start
1010                    .listen_with_note(|| || "wait for update info".to_string());
1011                if self
1012                    .currently_scheduled_foreground_jobs
1013                    .load(Ordering::Acquire)
1014                    == 0
1015                {
1016                    start_listener.await;
1017                } else {
1018                    drop(start_listener);
1019                }
1020                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1021                    // Timeout
1022                    return None;
1023                }
1024            }
1025        }
1026        if !aggregation.is_zero() {
1027            loop {
1028                select! {
1029                    () = tokio::time::sleep(aggregation) => {
1030                        break;
1031                    }
1032                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1033                        // Resets the sleep
1034                    }
1035                }
1036            }
1037        }
1038        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1039        if let Some((duration, tasks)) = update.take() {
1040            Some(UpdateInfo {
1041                duration,
1042                tasks,
1043                reasons: take(reason_set),
1044                placeholder_for_future_fields: (),
1045            })
1046        } else {
1047            panic!("aggregated_update_info must not called concurrently")
1048        }
1049    }
1050
1051    pub async fn wait_background_done(&self) {
1052        let listener = self.event_background_done.listen();
1053        if self
1054            .currently_scheduled_background_jobs
1055            .load(Ordering::Acquire)
1056            != 0
1057        {
1058            listener.await;
1059        }
1060    }
1061
1062    pub async fn stop_and_wait(&self) {
1063        turbo_tasks_future_scope(self.pin(), async move {
1064            self.backend.stopping(self);
1065            self.stopped.store(true, Ordering::Release);
1066            {
1067                let listener = self
1068                    .event_foreground_done
1069                    .listen_with_note(|| || "wait for stop".to_string());
1070                if self
1071                    .currently_scheduled_foreground_jobs
1072                    .load(Ordering::Acquire)
1073                    != 0
1074                {
1075                    listener.await;
1076                }
1077            }
1078            {
1079                let listener = self.event_background_done.listen();
1080                if self
1081                    .currently_scheduled_background_jobs
1082                    .load(Ordering::Acquire)
1083                    != 0
1084                {
1085                    listener.await;
1086                }
1087            }
1088            self.backend.stop(self);
1089        })
1090        .await;
1091    }
1092
1093    #[track_caller]
1094    pub(crate) fn schedule_background_job<T>(&self, func: T)
1095    where
1096        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1097        T::CallOnceFuture: Send,
1098    {
1099        let mut this = self.pin();
1100        self.begin_background_job();
1101        tokio::spawn(
1102            TURBO_TASKS
1103                .scope(this.clone(), async move {
1104                    if !this.stopped.load(Ordering::Acquire) {
1105                        this = func(this).await;
1106                    }
1107                    this.finish_background_job();
1108                })
1109                .in_current_span(),
1110        );
1111    }
1112
1113    fn finish_current_task_state(&self) -> FinishedTaskState {
1114        CURRENT_TASK_STATE.with(|cell| {
1115            let current_task_state = &*cell.write().unwrap();
1116            FinishedTaskState {
1117                #[cfg(feature = "verify_determinism")]
1118                stateful: current_task_state.stateful,
1119                has_invalidator: current_task_state.has_invalidator,
1120            }
1121        })
1122    }
1123
1124    pub fn backend(&self) -> &B {
1125        &self.backend
1126    }
1127
1128    pub fn get_current_task_priority(&self) -> TaskPriority {
1129        CURRENT_TASK_STATE
1130            .try_with(|task_state| task_state.read().unwrap().priority)
1131            .unwrap_or(TaskPriority::initial())
1132    }
1133
1134    pub fn is_idle(&self) -> bool {
1135        self.currently_scheduled_foreground_jobs
1136            .load(Ordering::Acquire)
1137            == 0
1138    }
1139
1140    #[track_caller]
1141    pub fn schedule_backend_background_job(&self, job: B::BackendJob) {
1142        self.schedule_background_job(async move |this| {
1143            this.backend.run_backend_job(job, &*this).await;
1144            this
1145        })
1146    }
1147}
1148
1149struct TurboTasksExecutor;
1150
1151/// Run a future and abort the process if a panic is reported
1152///
1153/// Turbtasks catches panics from user code and propagates throught the task tree, but if it happens
1154/// as part of state management we have to abort
1155async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1156    match AssertUnwindSafe(f).catch_unwind().await {
1157        Ok(r) => r,
1158        Err(_) => {
1159            eprintln!(
1160                "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1161                 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1162                 https://github.com/vercel/next.js/discussions and include the panic message \
1163                 and stack trace above.\n\nAborting."
1164            );
1165            abort();
1166        }
1167    }
1168}
1169
1170impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1171    type Future = impl Future<Output = ()> + Send + 'static;
1172
1173    fn execute(
1174        &self,
1175        this: &Arc<TurboTasks<B>>,
1176        scheduled_task: ScheduledTask,
1177        priority: TaskPriority,
1178    ) -> Self::Future {
1179        match scheduled_task {
1180            ScheduledTask::Task { task_id, span } => {
1181                let this2 = this.clone();
1182                let this = this.clone();
1183                let future = async move {
1184                    abort_on_panic(async {
1185                        // it's okay for execution ids to overflow and wrap, they're just used
1186                        // for an assert
1187                        let execution_id = this.execution_id_factory.wrapping_get();
1188                        let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1189                            task_id,
1190                            execution_id,
1191                            priority,
1192                            false, // in_top_level_task
1193                        )));
1194                        let single_execution_future = async {
1195                            if this.stopped.load(Ordering::Acquire) {
1196                                this.backend.task_execution_canceled(task_id, &*this);
1197                                return None;
1198                            }
1199
1200                            let TaskExecutionSpec { future, span } = this
1201                                .backend
1202                                .try_start_task_execution(task_id, priority, &*this)?;
1203
1204                            async {
1205                                let result = CaptureFuture::new(future).await;
1206
1207                                // wait for all spawned local tasks using `local` to finish
1208                                wait_for_local_tasks().await;
1209
1210                                let result = match result {
1211                                    Ok(Ok(raw_vc)) => {
1212                                        // This is safe because we waited for all local tasks to
1213                                        // complete above
1214                                        raw_vc
1215                                            .to_non_local_unchecked_sync(&*this)
1216                                            .map_err(|err| err.into())
1217                                    }
1218                                    Ok(Err(err)) => Err(err.into()),
1219                                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1220                                };
1221
1222                                let finished_state = this.finish_current_task_state();
1223                                let cell_counters = CURRENT_TASK_STATE
1224                                    .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1225                                this.backend.task_execution_completed(
1226                                    task_id,
1227                                    result,
1228                                    &cell_counters,
1229                                    #[cfg(feature = "verify_determinism")]
1230                                    finished_state.stateful,
1231                                    finished_state.has_invalidator,
1232                                    &*this,
1233                                )
1234                            }
1235                            .instrument(span)
1236                            .await
1237                        };
1238                        if let Some(stale_priority) = CURRENT_TASK_STATE
1239                            .scope(current_task_state, single_execution_future)
1240                            .await
1241                        {
1242                            // Task was stale; re-schedule at the correct invalidation priority so
1243                            // other tasks can run in the right priority order.
1244                            this.schedule(task_id, stale_priority);
1245                        }
1246                        this.finish_foreground_job();
1247                    })
1248                    .await
1249                };
1250
1251                Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1252            }
1253            ScheduledTask::LocalTask {
1254                ty,
1255                persistence,
1256                local_task_id,
1257                global_task_state,
1258                span,
1259            } => {
1260                let this2 = this.clone();
1261                let this = this.clone();
1262                let task_type = ty.task_type;
1263                let future = async move {
1264                    let span = match &ty.task_type {
1265                        LocalTaskType::ResolveNative { native_fn } => {
1266                            native_fn.resolve_span(priority)
1267                        }
1268                        LocalTaskType::ResolveTrait { trait_method } => {
1269                            trait_method.resolve_span(priority)
1270                        }
1271                    };
1272                    abort_on_panic(
1273                        async move {
1274                            let result = match ty.task_type {
1275                                LocalTaskType::ResolveNative { native_fn } => {
1276                                    LocalTaskType::run_resolve_native(
1277                                        native_fn,
1278                                        ty.this,
1279                                        &*ty.arg,
1280                                        persistence,
1281                                        this,
1282                                    )
1283                                    .await
1284                                }
1285                                LocalTaskType::ResolveTrait { trait_method } => {
1286                                    LocalTaskType::run_resolve_trait(
1287                                        trait_method,
1288                                        ty.this.unwrap(),
1289                                        &*ty.arg,
1290                                        persistence,
1291                                        this,
1292                                    )
1293                                    .await
1294                                }
1295                            };
1296
1297                            let output = match result {
1298                                Ok(raw_vc) => OutputContent::Link(raw_vc),
1299                                Err(err) => OutputContent::Error(
1300                                    TurboTasksExecutionError::from(err)
1301                                        .with_local_task_context(task_type.to_string()),
1302                                ),
1303                            };
1304
1305                            CURRENT_TASK_STATE.with(move |gts| {
1306                                gts.write()
1307                                    .unwrap()
1308                                    .local_tasks
1309                                    .complete(local_task_id, output);
1310                            });
1311                        }
1312                        .instrument(span),
1313                    )
1314                    .await
1315                };
1316                let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1317
1318                Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1319            }
1320        }
1321    }
1322}
1323
1324struct FinishedTaskState {
1325    /// True if the task has state in cells (interior mutability).
1326    /// Only tracked when verify_determinism feature is enabled.
1327    #[cfg(feature = "verify_determinism")]
1328    stateful: bool,
1329
1330    /// True if the task uses an external invalidator
1331    has_invalidator: bool,
1332}
1333
1334impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1335    fn dynamic_call(
1336        &self,
1337        native_fn: &'static NativeFunction,
1338        this: Option<RawVc>,
1339        arg: &mut dyn DynTaskInputsStorage,
1340        inputs_resolved: InputResolution,
1341        persistence: TaskPersistence,
1342    ) -> RawVc {
1343        self.dynamic_call(native_fn, this, arg, inputs_resolved, persistence)
1344    }
1345    fn native_call(
1346        &self,
1347        native_fn: &'static NativeFunction,
1348        this: Option<RawVc>,
1349        arg: &mut dyn DynTaskInputsStorage,
1350        persistence: TaskPersistence,
1351    ) -> RawVc {
1352        self.native_call(native_fn, this, arg, persistence)
1353    }
1354    fn trait_call(
1355        &self,
1356        trait_method: &'static TraitMethod,
1357        this: RawVc,
1358        arg: &mut dyn DynTaskInputsStorage,
1359        inputs_resolved: InputResolution,
1360        persistence: TaskPersistence,
1361    ) -> RawVc {
1362        self.trait_call(trait_method, this, arg, inputs_resolved, persistence)
1363    }
1364
1365    #[track_caller]
1366    fn run(
1367        &self,
1368        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1369    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1370        let this = self.pin();
1371        Box::pin(async move { this.run(future).await })
1372    }
1373
1374    #[track_caller]
1375    fn run_once(
1376        &self,
1377        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1378    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1379        let this = self.pin();
1380        Box::pin(async move { this.run_once(future).await })
1381    }
1382
1383    #[track_caller]
1384    fn run_once_with_reason(
1385        &self,
1386        reason: StaticOrArc<dyn InvalidationReason>,
1387        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1388    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1389        {
1390            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1391            reason_set.insert(reason);
1392        }
1393        let this = self.pin();
1394        Box::pin(async move { this.run_once(future).await })
1395    }
1396
1397    #[track_caller]
1398    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1399        self.start_once_process(future)
1400    }
1401
1402    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1403        if let Err(e) = self.compilation_events.send(event) {
1404            tracing::warn!("Failed to send compilation event: {e}");
1405        }
1406    }
1407
1408    fn get_task_name(&self, task: TaskId) -> String {
1409        self.backend.get_task_name(task, self)
1410    }
1411}
1412
1413impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1414    #[instrument(level = "info", skip_all, name = "invalidate")]
1415    fn invalidate(&self, task: TaskId) {
1416        self.backend.invalidate_task(task, self);
1417    }
1418
1419    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1420    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1421        {
1422            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1423            reason_set.insert(reason);
1424        }
1425        self.backend.invalidate_task(task, self);
1426    }
1427
1428    fn invalidate_serialization(&self, task: TaskId) {
1429        self.backend.invalidate_serialization(task, self);
1430    }
1431
1432    #[track_caller]
1433    fn try_read_task_output(
1434        &self,
1435        task: TaskId,
1436        options: ReadOutputOptions,
1437    ) -> Result<Result<RawVc, EventListener>> {
1438        if options.consistency == ReadConsistency::Eventual {
1439            debug_assert_not_in_top_level_task("read_task_output");
1440        }
1441        self.backend.try_read_task_output(
1442            task,
1443            current_task_if_available("reading Vcs"),
1444            options,
1445            self,
1446        )
1447    }
1448
1449    #[track_caller]
1450    fn try_read_task_cell(
1451        &self,
1452        task: TaskId,
1453        index: CellId,
1454        options: ReadCellOptions,
1455    ) -> Result<Result<TypedCellContent, EventListener>> {
1456        let reader = current_task_if_available("reading Vcs");
1457        self.backend
1458            .try_read_task_cell(task, index, reader, options, self)
1459    }
1460
1461    fn try_read_own_task_cell(
1462        &self,
1463        current_task: TaskId,
1464        index: CellId,
1465    ) -> Result<TypedCellContent> {
1466        self.backend
1467            .try_read_own_task_cell(current_task, index, self)
1468    }
1469
1470    #[track_caller]
1471    fn try_read_local_output(
1472        &self,
1473        execution_id: ExecutionId,
1474        local_task_id: LocalTaskId,
1475    ) -> Result<Result<RawVc, EventListener>> {
1476        debug_assert_not_in_top_level_task("read_local_output");
1477        CURRENT_TASK_STATE.with(|gts| {
1478            let gts_read = gts.read().unwrap();
1479
1480            // Local Vcs are local to their parent task's current execution, and do not exist
1481            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1482            // marker trait. This assertion exists to handle any potential escapes that the
1483            // compile-time checks cannot capture.
1484            gts_read.assert_execution_id(execution_id);
1485
1486            match gts_read.local_tasks.get(local_task_id) {
1487                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1488                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1489            }
1490        })
1491    }
1492
1493    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1494        // TODO: Add assert_not_in_top_level_task("read_task_collectibles") check here.
1495        // Collectible reads are eventually consistent.
1496        self.backend.read_task_collectibles(
1497            task,
1498            trait_id,
1499            current_task_if_available("reading collectibles"),
1500            self,
1501        )
1502    }
1503
1504    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1505        self.backend.emit_collectible(
1506            trait_type,
1507            collectible,
1508            current_task("emitting collectible"),
1509            self,
1510        );
1511    }
1512
1513    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1514        self.backend.unemit_collectible(
1515            trait_type,
1516            collectible,
1517            count,
1518            current_task("emitting collectible"),
1519            self,
1520        );
1521    }
1522
1523    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1524        for (&collectible, &count) in collectibles {
1525            if count > 0 {
1526                self.backend.unemit_collectible(
1527                    trait_type,
1528                    collectible,
1529                    count as u32,
1530                    current_task("emitting collectible"),
1531                    self,
1532                );
1533            }
1534        }
1535    }
1536
1537    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1538        self.try_read_own_task_cell(task, index)
1539    }
1540
1541    fn update_own_task_cell(
1542        &self,
1543        task: TaskId,
1544        index: CellId,
1545        content: CellContent,
1546        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1547        content_hash: Option<CellHash>,
1548        verification_mode: VerificationMode,
1549    ) {
1550        self.backend.update_task_cell(
1551            task,
1552            index,
1553            content,
1554            updated_key_hashes,
1555            content_hash,
1556            verification_mode,
1557            self,
1558        );
1559    }
1560
1561    fn connect_task(&self, task: TaskId) {
1562        self.backend
1563            .connect_task(task, current_task_if_available("connecting task"), self);
1564    }
1565
1566    fn mark_own_task_as_finished(&self, task: TaskId) {
1567        self.backend.mark_own_task_as_finished(task, self);
1568    }
1569
1570    /// Creates a future that inherits the current task id and task state. The current global task
1571    /// will wait for this future to be dropped before exiting.
1572    fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1573        // this is similar to what happens for a local task, except that we keep the local task's
1574        // state as well.
1575        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1576        global_task_state
1577            .write()
1578            .unwrap()
1579            .local_tasks
1580            .register_detached();
1581        let wrapped = async move {
1582            // use a drop guard for panic safety
1583            struct DropGuard;
1584            impl Drop for DropGuard {
1585                fn drop(&mut self) {
1586                    CURRENT_TASK_STATE
1587                        .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1588                }
1589            }
1590            let _guard = DropGuard;
1591            fut.await;
1592        };
1593        tokio::spawn(TURBO_TASKS.scope(
1594            turbo_tasks(),
1595            CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1596        ));
1597    }
1598
1599    fn task_statistics(&self) -> &TaskStatisticsApi {
1600        self.backend.task_statistics()
1601    }
1602
1603    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1604        let this = self.pin();
1605        Box::pin(async move {
1606            this.stop_and_wait().await;
1607        })
1608    }
1609
1610    fn subscribe_to_compilation_events(
1611        &self,
1612        event_types: Option<Vec<String>>,
1613    ) -> Receiver<Arc<dyn CompilationEvent>> {
1614        self.compilation_events.subscribe(event_types)
1615    }
1616
1617    fn is_tracking_dependencies(&self) -> bool {
1618        self.backend.is_tracking_dependencies()
1619    }
1620}
1621
1622async fn wait_for_local_tasks() {
1623    let listener =
1624        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1625    let Some(listener) = listener else {
1626        return;
1627    };
1628    listener.await;
1629}
1630
1631pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1632    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1633        Ok(id) => id,
1634        Err(_) => panic!(
1635            "{from} can only be used in the context of a turbo_tasks task execution or \
1636             turbo_tasks run"
1637        ),
1638    }
1639}
1640
1641pub(crate) fn current_task(from: &str) -> TaskId {
1642    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1643        Ok(Some(id)) => id,
1644        Ok(None) | Err(_) => {
1645            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1646        }
1647    }
1648}
1649
1650/// Panics if we're not in a top-level task (e.g. [`run_once`]). Some function calls should only
1651/// happen in a top-level task (e.g. [`Effects::apply`][crate::Effects::apply]).
1652#[track_caller]
1653pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1654    if !cfg!(debug_assertions) {
1655        return;
1656    }
1657
1658    let in_top_level = CURRENT_TASK_STATE
1659        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1660        .unwrap_or(true);
1661    if !in_top_level {
1662        panic!("{message}");
1663    }
1664}
1665
1666#[track_caller]
1667pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1668    if !cfg!(debug_assertions) {
1669        return;
1670    }
1671
1672    // HACK: We set this inside of `ReadRawVcFuture` to suppress warnings about an internal
1673    // consistency bug
1674    let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1675        .try_with(|&suppressed| suppressed)
1676        .unwrap_or(false);
1677    if suppressed {
1678        return;
1679    }
1680
1681    let in_top_level = CURRENT_TASK_STATE
1682        .try_with(|ts| ts.read().unwrap().in_top_level_task)
1683        .unwrap_or(false);
1684    if in_top_level {
1685        panic!(
1686            "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1687             Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1688             reads to avoid leaking inconsistent return values."
1689        );
1690    }
1691}
1692
1693pub async fn run<T: Send + 'static>(
1694    tt: Arc<dyn TurboTasksApi>,
1695    future: impl Future<Output = Result<T>> + Send + 'static,
1696) -> Result<T> {
1697    let (tx, rx) = tokio::sync::oneshot::channel();
1698
1699    tt.run(Box::pin(async move {
1700        let result = future.await?;
1701        tx.send(result)
1702            .map_err(|_| anyhow!("unable to send result"))?;
1703        Ok(())
1704    }))
1705    .await?;
1706
1707    Ok(rx.await?)
1708}
1709
1710pub async fn run_once<T: Send + 'static>(
1711    tt: Arc<dyn TurboTasksApi>,
1712    future: impl Future<Output = Result<T>> + Send + 'static,
1713) -> Result<T> {
1714    let (tx, rx) = tokio::sync::oneshot::channel();
1715
1716    tt.run_once(Box::pin(async move {
1717        let result = future.await?;
1718        tx.send(result)
1719            .map_err(|_| anyhow!("unable to send result"))?;
1720        Ok(())
1721    }))
1722    .await?;
1723
1724    Ok(rx.await?)
1725}
1726
1727pub async fn run_once_with_reason<T: Send + 'static>(
1728    tt: Arc<dyn TurboTasksApi>,
1729    reason: impl InvalidationReason,
1730    future: impl Future<Output = Result<T>> + Send + 'static,
1731) -> Result<T> {
1732    let (tx, rx) = tokio::sync::oneshot::channel();
1733
1734    tt.run_once_with_reason(
1735        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1736        Box::pin(async move {
1737            let result = future.await?;
1738            tx.send(result)
1739                .map_err(|_| anyhow!("unable to send result"))?;
1740            Ok(())
1741        }),
1742    )
1743    .await?;
1744
1745    Ok(rx.await?)
1746}
1747
1748/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1749pub fn dynamic_call(
1750    func: &'static NativeFunction,
1751    this: Option<RawVc>,
1752    arg: &mut dyn DynTaskInputsStorage,
1753    inputs_resolved: InputResolution,
1754    persistence: TaskPersistence,
1755) -> RawVc {
1756    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, inputs_resolved, persistence))
1757}
1758
1759/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1760pub fn trait_call(
1761    trait_method: &'static TraitMethod,
1762    this: RawVc,
1763    arg: &mut dyn DynTaskInputsStorage,
1764    inputs_resolved: InputResolution,
1765    persistence: TaskPersistence,
1766) -> RawVc {
1767    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, inputs_resolved, persistence))
1768}
1769
1770pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1771    TURBO_TASKS.with(|arc| arc.clone())
1772}
1773
1774pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1775    TURBO_TASKS.with(Arc::downgrade)
1776}
1777
1778pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1779    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1780}
1781
1782pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1783    TURBO_TASKS.with(|arc| func(arc))
1784}
1785
1786pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1787    TURBO_TASKS.sync_scope(tt, f)
1788}
1789
1790pub fn turbo_tasks_future_scope<T>(
1791    tt: Arc<dyn TurboTasksApi>,
1792    f: impl Future<Output = T>,
1793) -> impl Future<Output = T> {
1794    TURBO_TASKS.scope(tt, f)
1795}
1796
1797/// Spawns the given future within the context of the current task.
1798///
1799/// Beware: this method is not safe to use in production code. It is only
1800/// intended for use in tests and for debugging purposes.
1801pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1802    turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1803}
1804
1805/// Marks the current task as finished. This excludes it from waiting for
1806/// strongly consistency.
1807pub fn mark_finished() {
1808    with_turbo_tasks(|tt| {
1809        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1810    });
1811}
1812
1813/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1814/// serialization of the current task cells.
1815///
1816/// Also marks the current task as stateful when the `verify_determinism` feature is enabled,
1817/// since State allocation implies interior mutability.
1818pub fn get_serialization_invalidator() -> SerializationInvalidator {
1819    CURRENT_TASK_STATE.with(|cell| {
1820        let CurrentTaskState {
1821            task_id,
1822            #[cfg(feature = "verify_determinism")]
1823            stateful,
1824            ..
1825        } = &mut *cell.write().unwrap();
1826        #[cfg(feature = "verify_determinism")]
1827        {
1828            *stateful = true;
1829        }
1830        let Some(task_id) = *task_id else {
1831            panic!(
1832                "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1833                 task execution"
1834            );
1835        };
1836        SerializationInvalidator::new(task_id)
1837    })
1838}
1839
1840pub fn mark_invalidator() {
1841    CURRENT_TASK_STATE.with(|cell| {
1842        let CurrentTaskState {
1843            has_invalidator, ..
1844        } = &mut *cell.write().unwrap();
1845        *has_invalidator = true;
1846    })
1847}
1848
1849/// Marks the current task as stateful. This is used to indicate that the task
1850/// has interior mutability (e.g., via [`State`][crate::State]), which means
1851/// the task may produce different outputs even with the same inputs.
1852///
1853/// Only has an effect when the `verify_determinism` feature is enabled.
1854pub fn mark_stateful() {
1855    #[cfg(feature = "verify_determinism")]
1856    {
1857        CURRENT_TASK_STATE.with(|cell| {
1858            let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1859            *stateful = true;
1860        })
1861    }
1862    // No-op when verify_determinism is not enabled
1863}
1864
1865/// Marks the current task context as being in a top-level task. When in a top-level task,
1866/// eventually consistent reads will panic. It is almost always a mistake to perform an eventually
1867/// consistent read at the top-level of the application.
1868pub fn mark_top_level_task() {
1869    if cfg!(debug_assertions) {
1870        CURRENT_TASK_STATE.with(|cell| {
1871            cell.write().unwrap().in_top_level_task = true;
1872        })
1873    }
1874}
1875
1876/// Unmarks the current task context as being in a top-level task. The opposite of
1877/// [`mark_top_level_task`].
1878///
1879/// This utility can be okay in unit tests, where we're observing the internal behavior of
1880/// turbo-tasks, but otherwise, it is probably a mistake to call this function.
1881///
1882/// Calling this will allow eventually-consistent reads at the top-level, potentially exposing
1883/// incomplete computations and internal errors caused by eventual consistency that would've been
1884/// caught when the function was re-run. A strongly-consistent read re-runs parts of a task until
1885/// all of the dependencies have settled.
1886pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1887    if cfg!(debug_assertions) {
1888        CURRENT_TASK_STATE.with(|cell| {
1889            cell.write().unwrap().in_top_level_task = false;
1890        })
1891    }
1892}
1893
1894pub fn prevent_gc() {
1895    // TODO implement garbage collection
1896}
1897
1898pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1899    with_turbo_tasks(|tt| {
1900        let raw_vc = collectible.node.node;
1901        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1902    })
1903}
1904
1905pub(crate) async fn read_task_output(
1906    this: &dyn TurboTasksApi,
1907    id: TaskId,
1908    options: ReadOutputOptions,
1909) -> Result<RawVc> {
1910    loop {
1911        match this.try_read_task_output(id, options)? {
1912            Ok(result) => return Ok(result),
1913            Err(listener) => listener.await,
1914        }
1915    }
1916}
1917
1918/// A reference to a task's cell with methods that allow updating the contents
1919/// of the cell.
1920///
1921/// Mutations should not outside of the task that that owns this cell. Doing so
1922/// is a logic error, and may lead to incorrect caching behavior.
1923#[derive(Clone, Copy)]
1924pub struct CurrentCellRef {
1925    current_task: TaskId,
1926    index: CellId,
1927}
1928
1929type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
1930
1931impl CurrentCellRef {
1932    /// Updates the cell if the given `functor` returns a value.
1933    fn conditional_update<T>(
1934        &self,
1935        functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
1936    ) where
1937        T: VcValueType,
1938    {
1939        self.conditional_update_with_shared_reference(|old_shared_reference| {
1940            let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
1941            let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
1942            Some((
1943                SharedReference::new(triomphe::Arc::new(new_value)),
1944                updated_key_hashes,
1945                content_hash,
1946            ))
1947        })
1948    }
1949
1950    /// Updates the cell if the given `functor` returns a `SharedReference`.
1951    fn conditional_update_with_shared_reference(
1952        &self,
1953        functor: impl FnOnce(
1954            Option<&SharedReference>,
1955        ) -> Option<(
1956            SharedReference,
1957            Option<SmallVec<[u64; 2]>>,
1958            Option<CellHash>,
1959        )>,
1960    ) {
1961        let tt = turbo_tasks();
1962        let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
1963        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1964        if let Some((update, updated_key_hashes, content_hash)) = update {
1965            tt.update_own_task_cell(
1966                self.current_task,
1967                self.index,
1968                CellContent(Some(update)),
1969                updated_key_hashes,
1970                content_hash,
1971                VerificationMode::EqualityCheck,
1972            )
1973        }
1974    }
1975
1976    /// Replace the current cell's content with `new_value` if the current content is not equal by
1977    /// value with the existing content.
1978    ///
1979    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1980    ///
1981    /// Take this example of a custom equality implementation on a transparent wrapper type:
1982    ///
1983    /// ```
1984    /// #[turbo_tasks::value(transparent, eq = "manual")]
1985    /// struct Wrapper(Vec<u32>);
1986    ///
1987    /// impl PartialEq for Wrapper {
1988    ///     fn eq(&self, other: Wrapper) {
1989    ///         // Example: order doesn't matter for equality
1990    ///         let (mut this, mut other) = (self.clone(), other.clone());
1991    ///         this.sort_unstable();
1992    ///         other.sort_unstable();
1993    ///         this == other
1994    ///     }
1995    /// }
1996    ///
1997    /// impl Eq for Wrapper {}
1998    /// ```
1999    ///
2000    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
2001    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
2002    ///
2003    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
2004    /// just forwards to the inner value's [`PartialEq`].
2005    ///
2006    /// If you already have a `SharedReference`, consider calling
2007    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
2008    /// object.
2009    pub fn compare_and_update<T>(&self, new_value: T)
2010    where
2011        T: PartialEq + VcValueType,
2012    {
2013        self.conditional_update(|old_value| {
2014            if let Some(old_value) = old_value
2015                && old_value == &new_value
2016            {
2017                return None;
2018            }
2019            Some((new_value, None, None))
2020        });
2021    }
2022
2023    /// Replace the current cell's content with `new_shared_reference` if the current content is not
2024    /// equal by value with the existing content.
2025    ///
2026    /// If you already have a `SharedReference`, this is a faster version of
2027    /// [`CurrentCellRef::compare_and_update`].
2028    ///
2029    /// The value should be stored in [`SharedReference`] using the type `T`.
2030    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2031    where
2032        T: VcValueType + PartialEq,
2033    {
2034        self.conditional_update_with_shared_reference(|old_sr| {
2035            if let Some(old_sr) = old_sr {
2036                let old_value = extract_sr_value::<T>(old_sr);
2037                let new_value = extract_sr_value::<T>(&new_shared_reference);
2038                if old_value == new_value {
2039                    return None;
2040                }
2041            }
2042            Some((new_shared_reference, None, None))
2043        });
2044    }
2045
2046    /// Replace the current cell's content if the new value is different.
2047    ///
2048    /// Like [`Self::compare_and_update`], but also computes and stores a hash of the value.
2049    /// When the cell's transient data is evicted, the stored hash enables the backend to detect
2050    /// whether the value actually changed without re-comparing values—avoiding unnecessary
2051    /// downstream invalidation.
2052    ///
2053    /// Requires `T: DeterministicHash` in addition to `T: PartialEq`.
2054    pub fn hashed_compare_and_update<T>(&self, new_value: T)
2055    where
2056        T: PartialEq + DeterministicHash + VcValueType,
2057    {
2058        self.conditional_update(|old_value| {
2059            if let Some(old_value) = old_value
2060                && old_value == &new_value
2061            {
2062                return None;
2063            }
2064            let content_hash = hash_xxh3_hash128(&new_value);
2065            Some((new_value, None, Some(content_hash)))
2066        });
2067    }
2068
2069    /// Replace the current cell's content if the new value (from a pre-existing
2070    /// [`SharedReference`]) is different.
2071    ///
2072    /// Like [`Self::compare_and_update_with_shared_reference`], but also passes a hash
2073    /// for hash-based change detection when transient data has been evicted.
2074    pub fn hashed_compare_and_update_with_shared_reference<T>(
2075        &self,
2076        new_shared_reference: SharedReference,
2077    ) where
2078        T: VcValueType + PartialEq + DeterministicHash,
2079    {
2080        self.conditional_update_with_shared_reference(move |old_sr| {
2081            if let Some(old_sr) = old_sr {
2082                let old_value = extract_sr_value::<T>(old_sr);
2083                let new_value = extract_sr_value::<T>(&new_shared_reference);
2084                if old_value == new_value {
2085                    return None;
2086                }
2087            }
2088            let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2089            Some((new_shared_reference, None, Some(content_hash)))
2090        });
2091    }
2092
2093    /// See [`Self::compare_and_update`], but selectively update individual keys.
2094    pub fn keyed_compare_and_update<T>(&self, new_value: T)
2095    where
2096        T: PartialEq + VcValueType,
2097        VcReadTarget<T>: KeyedEq,
2098        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2099    {
2100        self.conditional_update(|old_value| {
2101            let Some(old_value) = old_value else {
2102                return Some((new_value, None, None));
2103            };
2104            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2105            let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2106            let updated_keys = old_value.different_keys(new_value_ref);
2107            if updated_keys.is_empty() {
2108                return None;
2109            }
2110            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2111            let updated_key_hashes = updated_keys
2112                .into_iter()
2113                .map(|key| FxBuildHasher.hash_one(key))
2114                .collect();
2115            Some((new_value, Some(updated_key_hashes), None))
2116        });
2117    }
2118
2119    /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual
2120    /// keys.
2121    pub fn keyed_compare_and_update_with_shared_reference<T>(
2122        &self,
2123        new_shared_reference: SharedReference,
2124    ) where
2125        T: VcValueType + PartialEq,
2126        VcReadTarget<T>: KeyedEq,
2127        <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2128    {
2129        self.conditional_update_with_shared_reference(|old_sr| {
2130            let Some(old_sr) = old_sr else {
2131                return Some((new_shared_reference, None, None));
2132            };
2133            let old_value = extract_sr_value::<T>(old_sr);
2134            let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2135            let new_value = extract_sr_value::<T>(&new_shared_reference);
2136            let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2137            let updated_keys = old_value.different_keys(new_value);
2138            if updated_keys.is_empty() {
2139                return None;
2140            }
2141            // Duplicates are very unlikely, but ok since the backend is deduplicating them
2142            let updated_key_hashes = updated_keys
2143                .into_iter()
2144                .map(|key| FxBuildHasher.hash_one(key))
2145                .collect();
2146            Some((new_shared_reference, Some(updated_key_hashes), None))
2147        });
2148    }
2149
2150    /// Unconditionally updates the content of the cell.
2151    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2152    where
2153        T: VcValueType,
2154    {
2155        let tt = turbo_tasks();
2156        tt.update_own_task_cell(
2157            self.current_task,
2158            self.index,
2159            CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2160            None,
2161            None,
2162            verification_mode,
2163        )
2164    }
2165
2166    /// A faster version of [`Self::update`] if you already have a
2167    /// [`SharedReference`].
2168    ///
2169    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2170    /// by identity, no update is performed.
2171    ///
2172    /// The value should be stored in [`SharedReference`] using the type `T`.
2173    pub fn update_with_shared_reference(
2174        &self,
2175        shared_ref: SharedReference,
2176        verification_mode: VerificationMode,
2177    ) {
2178        let tt = turbo_tasks();
2179        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2180            let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2181            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2182                // pointer equality (not value equality)
2183                shared_ref_exp != shared_ref
2184            } else {
2185                true
2186            }
2187        } else {
2188            true
2189        };
2190        if update {
2191            tt.update_own_task_cell(
2192                self.current_task,
2193                self.index,
2194                CellContent(Some(shared_ref)),
2195                None,
2196                None,
2197                verification_mode,
2198            )
2199        }
2200    }
2201}
2202
2203impl From<CurrentCellRef> for RawVc {
2204    fn from(cell: CurrentCellRef) -> Self {
2205        RawVc::TaskCell(cell.current_task, cell.index)
2206    }
2207}
2208
2209fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2210    sr.0.downcast_ref::<T>()
2211        .expect("cannot update SharedReference of different type")
2212}
2213
2214pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2215    find_cell_by_id(T::get_value_type_id())
2216}
2217
2218pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2219    CURRENT_TASK_STATE.with(|ts| {
2220        let current_task = current_task("celling turbo_tasks values");
2221        let mut ts = ts.write().unwrap();
2222        let map = ts.cell_counters.as_mut().unwrap();
2223        let current_index = map.entry(ty).or_default();
2224        let index = *current_index;
2225        *current_index += 1;
2226        CurrentCellRef {
2227            current_task,
2228            index: CellId { type_id: ty, index },
2229        }
2230    })
2231}
2232
2233pub(crate) async fn read_local_output(
2234    this: &dyn TurboTasksApi,
2235    execution_id: ExecutionId,
2236    local_task_id: LocalTaskId,
2237) -> Result<RawVc> {
2238    loop {
2239        match this.try_read_local_output(execution_id, local_task_id)? {
2240            Ok(raw_vc) => return Ok(raw_vc),
2241            Err(event_listener) => event_listener.await,
2242        }
2243    }
2244}