turbo_tasks/
manager.rs

1use std::{
2    fmt::Display,
3    future::Future,
4    hash::BuildHasherDefault,
5    mem::take,
6    pin::Pin,
7    sync::{
8        Arc, Mutex, RwLock, Weak,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11    time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use rustc_hash::FxHasher;
17use serde::{Deserialize, Serialize};
18use tokio::{select, sync::mpsc::Receiver, task_local};
19use tokio_util::task::TaskTracker;
20use tracing::{Instrument, instrument};
21
22use crate::{
23    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
24    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
25    VcValueTrait, VcValueType,
26    backend::{
27        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
28        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
29    },
30    capture_future::CaptureFuture,
31    event::{Event, EventListener},
32    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
33    id_factory::IdFactoryWithReuse,
34    macro_helpers::NativeFunction,
35    magic_any::MagicAny,
36    message_queue::{CompilationEvent, CompilationEventQueue},
37    raw_vc::{CellId, RawVc},
38    registry,
39    serialization_invalidation::SerializationInvalidator,
40    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
41    task_statistics::TaskStatisticsApi,
42    trace::TraceRawVcs,
43    util::{IdFactory, StaticOrArc},
44};
45
46/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
47/// tasks from function calls.
48pub trait TurboTasksCallApi: Sync + Send {
49    /// Calls a native function with arguments. Resolves arguments when needed
50    /// with a wrapper task.
51    fn dynamic_call(
52        &self,
53        native_fn: &'static NativeFunction,
54        this: Option<RawVc>,
55        arg: Box<dyn MagicAny>,
56        persistence: TaskPersistence,
57    ) -> RawVc;
58    /// Call a native function with arguments.
59    /// All inputs must be resolved.
60    fn native_call(
61        &self,
62        native_fn: &'static NativeFunction,
63        this: Option<RawVc>,
64        arg: Box<dyn MagicAny>,
65        persistence: TaskPersistence,
66    ) -> RawVc;
67    /// Calls a trait method with arguments. First input is the `self` object.
68    /// Uses a wrapper task to resolve
69    fn trait_call(
70        &self,
71        trait_method: &'static TraitMethod,
72        this: RawVc,
73        arg: Box<dyn MagicAny>,
74        persistence: TaskPersistence,
75    ) -> RawVc;
76
77    fn run(
78        &self,
79        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
81    fn run_once(
82        &self,
83        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
84    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
85    fn run_once_with_reason(
86        &self,
87        reason: StaticOrArc<dyn InvalidationReason>,
88        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
90    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
91}
92
93/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
94/// context. Returned by the [`turbo_tasks`] helper function.
95///
96/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
97/// parameter.
98pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
99    fn invalidate(&self, task: TaskId);
100    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
101
102    fn invalidate_serialization(&self, task: TaskId);
103
104    fn try_read_task_output(
105        &self,
106        task: TaskId,
107        options: ReadOutputOptions,
108    ) -> Result<Result<RawVc, EventListener>>;
109
110    fn try_read_task_cell(
111        &self,
112        task: TaskId,
113        index: CellId,
114        options: ReadCellOptions,
115    ) -> Result<Result<TypedCellContent, EventListener>>;
116
117    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
118    /// task points to.
119    ///
120    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
121    /// recursively or in a loop.
122    ///
123    /// This does not accept a consistency argument, as you cannot control consistency of a read of
124    /// an operation owned by your own task. Strongly consistent reads are only allowed on
125    /// [`OperationVc`]s, which should never be local tasks.
126    ///
127    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
128    /// task to depend on itself.
129    ///
130    /// [`OperationVc`]: crate::OperationVc
131    fn try_read_local_output(
132        &self,
133        execution_id: ExecutionId,
134        local_task_id: LocalTaskId,
135    ) -> Result<Result<RawVc, EventListener>>;
136
137    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
138
139    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
140    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
141    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
142
143    /// INVALIDATION: Be careful with this, it will not track dependencies, so
144    /// using it could break cache invalidation.
145    fn try_read_own_task_cell(
146        &self,
147        current_task: TaskId,
148        index: CellId,
149        options: ReadCellOptions,
150    ) -> Result<TypedCellContent>;
151
152    fn read_own_task_cell(
153        &self,
154        task: TaskId,
155        index: CellId,
156        options: ReadCellOptions,
157    ) -> Result<TypedCellContent>;
158    fn update_own_task_cell(
159        &self,
160        task: TaskId,
161        index: CellId,
162        content: CellContent,
163        verification_mode: VerificationMode,
164    );
165    fn mark_own_task_as_finished(&self, task: TaskId);
166    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
167    fn mark_own_task_as_session_dependent(&self, task: TaskId);
168
169    fn connect_task(&self, task: TaskId);
170
171    /// Wraps the given future in the current task.
172    ///
173    /// Beware: this method is not safe to use in production code. It is only intended for use in
174    /// tests and for debugging purposes.
175    fn detached_for_testing(
176        &self,
177        f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
178    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
179
180    fn task_statistics(&self) -> &TaskStatisticsApi;
181
182    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
183
184    fn subscribe_to_compilation_events(
185        &self,
186        event_types: Option<Vec<String>>,
187    ) -> Receiver<Arc<dyn CompilationEvent>>;
188    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
189
190    // Returns true if TurboTasks is configured to track dependencies.
191    fn is_tracking_dependencies(&self) -> bool;
192}
193
194/// A wrapper around a value that is unused.
195pub struct Unused<T> {
196    inner: T,
197}
198
199impl<T> Unused<T> {
200    /// Creates a new unused value.
201    ///
202    /// # Safety
203    ///
204    /// The wrapped value must not be used.
205    pub unsafe fn new_unchecked(inner: T) -> Self {
206        Self { inner }
207    }
208
209    /// Get the inner value, without consuming the `Unused` wrapper.
210    ///
211    /// # Safety
212    ///
213    /// The user need to make sure that the value stays unused.
214    pub unsafe fn get_unchecked(&self) -> &T {
215        &self.inner
216    }
217
218    /// Unwraps the value, consuming the `Unused` wrapper.
219    pub fn into(self) -> T {
220        self.inner
221    }
222}
223
224/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
225pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
226    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
227
228    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
229    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
230    /// # Safety
231    ///
232    /// The caller must ensure that the task id is not used anymore.
233    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
234    /// # Safety
235    ///
236    /// The caller must ensure that the task id is not used anymore.
237    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
238
239    /// Schedule a task for execution.
240    fn schedule(&self, task: TaskId);
241
242    /// Schedule a foreground backend job for execution.
243    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
244
245    /// Schedule a background backend job for execution.
246    ///
247    /// Background jobs are not counted towards activeness of the system. The system is considered
248    /// idle even with active background jobs.
249    fn schedule_backend_background_job(&self, job: B::BackendJob);
250
251    /// Returns the duration from the start of the program to the given instant.
252    fn program_duration_until(&self, instant: Instant) -> Duration;
253
254    /// Returns true if the system is idle.
255    fn is_idle(&self) -> bool;
256
257    /// Returns a reference to the backend.
258    fn backend(&self) -> &B;
259}
260
261#[allow(clippy::manual_non_exhaustive)]
262pub struct UpdateInfo {
263    pub duration: Duration,
264    pub tasks: usize,
265    pub reasons: InvalidationReasonSet,
266    #[allow(dead_code)]
267    placeholder_for_future_fields: (),
268}
269
270#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
271pub enum TaskPersistence {
272    /// Tasks that may be persisted across sessions using serialization.
273    Persistent,
274
275    /// Tasks that will be persisted in memory for the life of this session, but won't persist
276    /// between sessions.
277    ///
278    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
279    /// type [`TransientValue`][crate::value::TransientValue] or
280    /// [`TransientInstance`][crate::value::TransientInstance].
281    Transient,
282}
283
284#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
285pub enum ReadConsistency {
286    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
287    /// may later trigger re-computation.
288    #[default]
289    Eventual,
290    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
291    /// the cost of slower reads.
292    ///
293    /// Top-level code that returns data to the user should use strongly consistent reads.
294    Strong,
295}
296
297#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
298pub enum ReadTracking {
299    /// Reads are tracked as dependencies of the current task.
300    #[default]
301    Tracked,
302    /// The read is only tracked when there is an error, otherwise it is untracked.
303    ///
304    /// INVALIDATION: Be careful with this, it will not track dependencies, so
305    /// using it could break cache invalidation.
306    TrackOnlyError,
307    /// The read is not tracked as a dependency of the current task.
308    ///
309    /// INVALIDATION: Be careful with this, it will not track dependencies, so
310    /// using it could break cache invalidation.
311    Untracked,
312}
313
314impl ReadTracking {
315    pub fn should_track(&self, is_err: bool) -> bool {
316        match self {
317            ReadTracking::Tracked => true,
318            ReadTracking::TrackOnlyError => is_err,
319            ReadTracking::Untracked => false,
320        }
321    }
322}
323
324impl Display for ReadTracking {
325    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326        match self {
327            ReadTracking::Tracked => write!(f, "tracked"),
328            ReadTracking::TrackOnlyError => write!(f, "track only error"),
329            ReadTracking::Untracked => write!(f, "untracked"),
330        }
331    }
332}
333
334pub struct TurboTasks<B: Backend + 'static> {
335    this: Weak<Self>,
336    backend: B,
337    task_id_factory: IdFactoryWithReuse<TaskId>,
338    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
339    execution_id_factory: IdFactory<ExecutionId>,
340    stopped: AtomicBool,
341    currently_scheduled_foreground_jobs: AtomicUsize,
342    currently_scheduled_background_jobs: AtomicUsize,
343    scheduled_tasks: AtomicUsize,
344    start: Mutex<Option<Instant>>,
345    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
346    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
347    event_foreground_start: Event,
348    /// Event that is triggered when all foreground jobs are done
349    /// (currently_scheduled_foreground_jobs becomes zero)
350    event_foreground_done: Event,
351    /// Event that is triggered when all background jobs are done
352    event_background_done: Event,
353    program_start: Instant,
354    compilation_events: CompilationEventQueue,
355}
356
357/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
358/// all share the same non-local task state.
359///
360/// A non-local task is one that:
361///
362/// - Has a unique task id.
363/// - Is potentially cached.
364/// - The backend is aware of.
365struct CurrentTaskState {
366    task_id: Option<TaskId>,
367    execution_id: ExecutionId,
368
369    /// True if the current task has state in cells
370    stateful: bool,
371
372    /// True if the current task uses an external invalidator
373    has_invalidator: bool,
374
375    /// Tracks how many cells of each type has been allocated so far during this task execution.
376    /// When a task is re-executed, the cell count may not match the existing cell vec length.
377    ///
378    /// This is taken (and becomes `None`) during teardown of a task.
379    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
380
381    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
382    local_tasks: Vec<LocalTask>,
383
384    /// Tracks currently running local tasks, and defers cleanup of the global task until those
385    /// complete. Also used by `detached_for_testing`.
386    local_task_tracker: TaskTracker,
387}
388
389impl CurrentTaskState {
390    fn new(task_id: TaskId, execution_id: ExecutionId) -> Self {
391        Self {
392            task_id: Some(task_id),
393            execution_id,
394            stateful: false,
395            has_invalidator: false,
396            cell_counters: Some(AutoMap::default()),
397            local_tasks: Vec::new(),
398            local_task_tracker: TaskTracker::new(),
399        }
400    }
401
402    fn new_temporary(execution_id: ExecutionId) -> Self {
403        Self {
404            task_id: None,
405            execution_id,
406            stateful: false,
407            has_invalidator: false,
408            cell_counters: None,
409            local_tasks: Vec::new(),
410            local_task_tracker: TaskTracker::new(),
411        }
412    }
413
414    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
415        if self.execution_id != expected_execution_id {
416            panic!(
417                "Local tasks can only be scheduled/awaited within the same execution of the \
418                 parent task that created them"
419            );
420        }
421    }
422
423    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
424        self.local_tasks.push(local_task);
425        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
426        if cfg!(debug_assertions) {
427            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
428        } else {
429            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
430        }
431    }
432
433    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
434        // local task ids are one-indexed (they use NonZeroU32)
435        &self.local_tasks[(*local_task_id as usize) - 1]
436    }
437
438    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
439        &mut self.local_tasks[(*local_task_id as usize) - 1]
440    }
441}
442
443// TODO implement our own thread pool and make these thread locals instead
444task_local! {
445    /// The current TurboTasks instance
446    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
447
448    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
449}
450
451impl<B: Backend + 'static> TurboTasks<B> {
452    // TODO better lifetime management for turbo tasks
453    // consider using unsafe for the task_local turbo tasks
454    // that should be safe as long tasks can't outlife turbo task
455    // so we probably want to make sure that all tasks are joined
456    // when trying to drop turbo tasks
457    pub fn new(backend: B) -> Arc<Self> {
458        let task_id_factory = IdFactoryWithReuse::new(
459            TaskId::MIN,
460            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
461        );
462        let transient_task_id_factory =
463            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
464        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
465        let this = Arc::new_cyclic(|this| Self {
466            this: this.clone(),
467            backend,
468            task_id_factory,
469            transient_task_id_factory,
470            execution_id_factory,
471            stopped: AtomicBool::new(false),
472            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
473            currently_scheduled_background_jobs: AtomicUsize::new(0),
474            scheduled_tasks: AtomicUsize::new(0),
475            start: Default::default(),
476            aggregated_update: Default::default(),
477            event_foreground_done: Event::new(|| {
478                || "TurboTasks::event_foreground_done".to_string()
479            }),
480            event_foreground_start: Event::new(|| {
481                || "TurboTasks::event_foreground_start".to_string()
482            }),
483            event_background_done: Event::new(|| {
484                || "TurboTasks::event_background_done".to_string()
485            }),
486            program_start: Instant::now(),
487            compilation_events: CompilationEventQueue::default(),
488        });
489        this.backend.startup(&*this);
490        this
491    }
492
493    pub fn pin(&self) -> Arc<Self> {
494        self.this.upgrade().unwrap()
495    }
496
497    /// Creates a new root task
498    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
499    where
500        T: ?Sized,
501        F: Fn() -> Fut + Send + Sync + Clone + 'static,
502        Fut: Future<Output = Result<Vc<T>>> + Send,
503    {
504        let id = self.backend.create_transient_task(
505            TransientTaskType::Root(Box::new(move || {
506                let functor = functor.clone();
507                Box::pin(async move {
508                    let raw_vc = functor().await?.node;
509                    raw_vc.to_non_local().await
510                })
511            })),
512            self,
513        );
514        self.schedule(id);
515        id
516    }
517
518    pub fn dispose_root_task(&self, task_id: TaskId) {
519        self.backend.dispose_root_task(task_id, self);
520    }
521
522    // TODO make sure that all dependencies settle before reading them
523    /// Creates a new root task, that is only executed once.
524    /// Dependencies will not invalidate the task.
525    #[track_caller]
526    fn spawn_once_task<T, Fut>(&self, future: Fut)
527    where
528        T: ?Sized,
529        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
530    {
531        let id = self.backend.create_transient_task(
532            TransientTaskType::Once(Box::pin(async move {
533                let raw_vc = future.await?.node;
534                raw_vc.to_non_local().await
535            })),
536            self,
537        );
538        self.schedule(id);
539    }
540
541    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
542        &self,
543        future: impl Future<Output = Result<T>> + Send + 'static,
544    ) -> Result<T> {
545        let (tx, rx) = tokio::sync::oneshot::channel();
546        self.spawn_once_task(async move {
547            let result = future.await;
548            tx.send(result)
549                .map_err(|_| anyhow!("unable to send result"))?;
550            Ok(Completion::new())
551        });
552
553        rx.await?
554    }
555
556    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
557    pub async fn run<T: TraceRawVcs + Send + 'static>(
558        &self,
559        future: impl Future<Output = Result<T>> + Send + 'static,
560    ) -> Result<T, TurboTasksExecutionError> {
561        self.begin_foreground_job();
562        // it's okay for execution ids to overflow and wrap, they're just used for an assert
563        let execution_id = self.execution_id_factory.wrapping_get();
564        let current_task_state =
565            Arc::new(RwLock::new(CurrentTaskState::new_temporary(execution_id)));
566
567        let result = TURBO_TASKS
568            .scope(
569                self.pin(),
570                CURRENT_TASK_STATE.scope(current_task_state, async {
571                    let result = CaptureFuture::new(future).await;
572
573                    // wait for all spawned local tasks using `local` to finish
574                    let ltt =
575                        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_task_tracker.clone());
576                    ltt.close();
577                    ltt.wait().await;
578
579                    match result {
580                        Ok(Ok(raw_vc)) => Ok(raw_vc),
581                        Ok(Err(err)) => Err(err.into()),
582                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
583                    }
584                }),
585            )
586            .await;
587        self.finish_foreground_job();
588        result
589    }
590
591    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
592        let this = self.pin();
593        tokio::spawn(async move {
594            this.pin()
595                .run_once(async move {
596                    this.finish_foreground_job();
597                    future.await;
598                    this.begin_foreground_job();
599                    Ok(())
600                })
601                .await
602                .unwrap()
603        });
604    }
605
606    pub(crate) fn native_call(
607        &self,
608        native_fn: &'static NativeFunction,
609        this: Option<RawVc>,
610        arg: Box<dyn MagicAny>,
611        persistence: TaskPersistence,
612    ) -> RawVc {
613        let task_type = CachedTaskType {
614            native_fn,
615            this,
616            arg,
617        };
618        RawVc::TaskOutput(match persistence {
619            TaskPersistence::Transient => self.backend.get_or_create_transient_task(
620                task_type,
621                current_task_if_available("turbo_function calls"),
622                self,
623            ),
624            TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
625                task_type,
626                current_task_if_available("turbo_function calls"),
627                self,
628            ),
629        })
630    }
631
632    pub fn dynamic_call(
633        &self,
634        native_fn: &'static NativeFunction,
635        this: Option<RawVc>,
636        arg: Box<dyn MagicAny>,
637        persistence: TaskPersistence,
638    ) -> RawVc {
639        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
640            return self.native_call(native_fn, this, arg, persistence);
641        }
642        let task_type = LocalTaskSpec {
643            task_type: LocalTaskType::ResolveNative { native_fn },
644            this,
645            arg,
646        };
647        self.schedule_local_task(task_type, persistence)
648    }
649
650    pub fn trait_call(
651        &self,
652        trait_method: &'static TraitMethod,
653        this: RawVc,
654        arg: Box<dyn MagicAny>,
655        persistence: TaskPersistence,
656    ) -> RawVc {
657        // avoid creating a wrapper task if self is already resolved
658        // for resolved cells we already know the value type so we can lookup the
659        // function
660        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
661            match registry::get_value_type(type_id).get_trait_method(trait_method) {
662                Some(native_fn) => {
663                    let arg = native_fn.arg_meta.filter_owned(arg);
664                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
665                }
666                None => {
667                    // We are destined to fail at this point, but we just retry resolution in the
668                    // local task since we cannot report an error from here.
669                    // TODO: A panic seems appropriate since the immediate caller is to blame
670                }
671            }
672        }
673
674        // create a wrapper task to resolve all inputs
675        let task_type = LocalTaskSpec {
676            task_type: LocalTaskType::ResolveTrait { trait_method },
677            this: Some(this),
678            arg,
679        };
680
681        self.schedule_local_task(task_type, persistence)
682    }
683
684    #[track_caller]
685    pub(crate) fn schedule(&self, task_id: TaskId) {
686        self.begin_foreground_job();
687        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
688
689        let this = self.pin();
690        let future = async move {
691            let mut schedule_again = true;
692            while schedule_again {
693                // it's okay for execution ids to overflow and wrap, they're just used for an assert
694                let execution_id = this.execution_id_factory.wrapping_get();
695                let current_task_state =
696                    Arc::new(RwLock::new(CurrentTaskState::new(task_id, execution_id)));
697                let single_execution_future = async {
698                    if this.stopped.load(Ordering::Acquire) {
699                        this.backend.task_execution_canceled(task_id, &*this);
700                        return false;
701                    }
702
703                    let Some(TaskExecutionSpec { future, span }) =
704                        this.backend.try_start_task_execution(task_id, &*this)
705                    else {
706                        return false;
707                    };
708
709                    async {
710                        let result = CaptureFuture::new(future).await;
711
712                        // wait for all spawned local tasks using `local` to finish
713                        let ltt = CURRENT_TASK_STATE
714                            .with(|ts| ts.read().unwrap().local_task_tracker.clone());
715                        ltt.close();
716                        ltt.wait().await;
717
718                        let result = match result {
719                            Ok(Ok(raw_vc)) => Ok(raw_vc),
720                            Ok(Err(err)) => Err(err.into()),
721                            Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
722                        };
723
724                        let FinishedTaskState {
725                            stateful,
726                            has_invalidator,
727                        } = this.finish_current_task_state();
728                        let cell_counters = CURRENT_TASK_STATE
729                            .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
730                        this.backend.task_execution_completed(
731                            task_id,
732                            result,
733                            &cell_counters,
734                            stateful,
735                            has_invalidator,
736                            &*this,
737                        )
738                    }
739                    .instrument(span)
740                    .await
741                };
742                schedule_again = CURRENT_TASK_STATE
743                    .scope(current_task_state, single_execution_future)
744                    .await;
745            }
746            this.finish_foreground_job();
747            anyhow::Ok(())
748        };
749
750        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
751
752        #[cfg(feature = "tokio_tracing")]
753        {
754            let description = self.backend.get_task_description(task_id);
755            tokio::task::Builder::new()
756                .name(&description)
757                .spawn(future)
758                .unwrap();
759        }
760        #[cfg(not(feature = "tokio_tracing"))]
761        tokio::task::spawn(future);
762    }
763
764    fn schedule_local_task(
765        &self,
766        ty: LocalTaskSpec,
767        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
768        persistence: TaskPersistence,
769    ) -> RawVc {
770        let task_type = ty.task_type;
771        let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
772            .with(|gts| {
773                let mut gts_write = gts.write().unwrap();
774                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
775                    done_event: Event::new(move || {
776                        move || format!("LocalTask({task_type})::done_event")
777                    }),
778                });
779                (
780                    Arc::clone(gts),
781                    gts_write.task_id,
782                    gts_write.execution_id,
783                    local_task_id,
784                )
785            });
786
787        #[cfg(feature = "tokio_tracing")]
788        let description = format!(
789            "[local] (parent: {}) {}",
790            self.backend.get_task_description(parent_task_id),
791            ty.task_type,
792        );
793        #[cfg(not(feature = "tokio_tracing"))]
794        let _ = parent_task_id; // suppress unused variable warning
795
796        let this = self.pin();
797        let future = async move {
798            let span = match &ty.task_type {
799                LocalTaskType::ResolveNative { native_fn } => native_fn.resolve_span(),
800                LocalTaskType::ResolveTrait { trait_method } => trait_method.resolve_span(),
801            };
802            async move {
803                let result = match ty.task_type {
804                    LocalTaskType::ResolveNative { native_fn } => {
805                        LocalTaskType::run_resolve_native(
806                            native_fn,
807                            ty.this,
808                            &*ty.arg,
809                            persistence,
810                            this,
811                        )
812                        .await
813                    }
814                    LocalTaskType::ResolveTrait { trait_method } => {
815                        LocalTaskType::run_resolve_trait(
816                            trait_method,
817                            ty.this.unwrap(),
818                            &*ty.arg,
819                            persistence,
820                            this,
821                        )
822                        .await
823                    }
824                };
825
826                let output = match result {
827                    Ok(raw_vc) => OutputContent::Link(raw_vc),
828                    Err(err) => OutputContent::Error(
829                        TurboTasksExecutionError::from(err).with_task_context(task_type),
830                    ),
831                };
832
833                let local_task = LocalTask::Done { output };
834
835                let done_event = CURRENT_TASK_STATE.with(move |gts| {
836                    let mut gts_write = gts.write().unwrap();
837                    let scheduled_task =
838                        std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
839                    let LocalTask::Scheduled { done_event } = scheduled_task else {
840                        panic!("local task finished, but was not in the scheduled state?");
841                    };
842                    done_event
843                });
844                done_event.notify(usize::MAX)
845            }
846            .instrument(span)
847            .await
848        };
849        let future = global_task_state
850            .read()
851            .unwrap()
852            .local_task_tracker
853            .track_future(future);
854        let future = CURRENT_TASK_STATE.scope(global_task_state, future);
855        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
856
857        #[cfg(feature = "tokio_tracing")]
858        tokio::task::Builder::new()
859            .name(&description)
860            .spawn(future)
861            .unwrap();
862        #[cfg(not(feature = "tokio_tracing"))]
863        tokio::task::spawn(future);
864
865        RawVc::LocalOutput(execution_id, local_task_id, persistence)
866    }
867
868    fn begin_foreground_job(&self) {
869        if self
870            .currently_scheduled_foreground_jobs
871            .fetch_add(1, Ordering::AcqRel)
872            == 0
873        {
874            *self.start.lock().unwrap() = Some(Instant::now());
875            self.event_foreground_start.notify(usize::MAX);
876            self.backend.idle_end(self);
877        }
878    }
879
880    fn finish_foreground_job(&self) {
881        if self
882            .currently_scheduled_foreground_jobs
883            .fetch_sub(1, Ordering::AcqRel)
884            == 1
885        {
886            self.backend.idle_start(self);
887            // That's not super race-condition-safe, but it's only for
888            // statistical reasons
889            let total = self.scheduled_tasks.load(Ordering::Acquire);
890            self.scheduled_tasks.store(0, Ordering::Release);
891            if let Some(start) = *self.start.lock().unwrap() {
892                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
893                if let Some(update) = update.as_mut() {
894                    update.0 += start.elapsed();
895                    update.1 += total;
896                } else {
897                    *update = Some((start.elapsed(), total));
898                }
899            }
900            self.event_foreground_done.notify(usize::MAX);
901        }
902    }
903
904    fn begin_background_job(&self) {
905        self.currently_scheduled_background_jobs
906            .fetch_add(1, Ordering::Relaxed);
907    }
908
909    fn finish_background_job(&self) {
910        if self
911            .currently_scheduled_background_jobs
912            .fetch_sub(1, Ordering::Relaxed)
913            == 1
914        {
915            self.event_background_done.notify(usize::MAX);
916        }
917    }
918
919    pub fn get_in_progress_count(&self) -> usize {
920        self.currently_scheduled_foreground_jobs
921            .load(Ordering::Acquire)
922    }
923
924    /// Waits for the given task to finish executing. This works by performing an untracked read,
925    /// and discarding the value of the task output.
926    ///
927    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
928    /// before all dependencies have completely settled.
929    ///
930    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
931    /// to fully settle before returning.
932    ///
933    /// As this function is typically called in top-level code that waits for results to be ready
934    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
935    pub async fn wait_task_completion(
936        &self,
937        id: TaskId,
938        consistency: ReadConsistency,
939    ) -> Result<()> {
940        read_task_output(
941            self,
942            id,
943            ReadOutputOptions {
944                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
945                tracking: ReadTracking::Untracked,
946                consistency,
947            },
948        )
949        .await?;
950        Ok(())
951    }
952
953    /// Returns [UpdateInfo] with all updates aggregated over a given duration
954    /// (`aggregation`). Will wait until an update happens.
955    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
956        self.aggregated_update_info(aggregation, Duration::MAX)
957            .await
958            .unwrap()
959    }
960
961    /// Returns [UpdateInfo] with all updates aggregated over a given duration
962    /// (`aggregation`). Will only return None when the timeout is reached while
963    /// waiting for the first update.
964    pub async fn aggregated_update_info(
965        &self,
966        aggregation: Duration,
967        timeout: Duration,
968    ) -> Option<UpdateInfo> {
969        let listener = self
970            .event_foreground_done
971            .listen_with_note(|| || "wait for update info".to_string());
972        let wait_for_finish = {
973            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
974            if aggregation.is_zero() {
975                if let Some((duration, tasks)) = update.take() {
976                    return Some(UpdateInfo {
977                        duration,
978                        tasks,
979                        reasons: take(reason_set),
980                        placeholder_for_future_fields: (),
981                    });
982                } else {
983                    true
984                }
985            } else {
986                update.is_none()
987            }
988        };
989        if wait_for_finish {
990            if timeout == Duration::MAX {
991                // wait for finish
992                listener.await;
993            } else {
994                // wait for start, then wait for finish or timeout
995                let start_listener = self
996                    .event_foreground_start
997                    .listen_with_note(|| || "wait for update info".to_string());
998                if self
999                    .currently_scheduled_foreground_jobs
1000                    .load(Ordering::Acquire)
1001                    == 0
1002                {
1003                    start_listener.await;
1004                } else {
1005                    drop(start_listener);
1006                }
1007                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1008                    // Timeout
1009                    return None;
1010                }
1011            }
1012        }
1013        if !aggregation.is_zero() {
1014            loop {
1015                select! {
1016                    () = tokio::time::sleep(aggregation) => {
1017                        break;
1018                    }
1019                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1020                        // Resets the sleep
1021                    }
1022                }
1023            }
1024        }
1025        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1026        if let Some((duration, tasks)) = update.take() {
1027            Some(UpdateInfo {
1028                duration,
1029                tasks,
1030                reasons: take(reason_set),
1031                placeholder_for_future_fields: (),
1032            })
1033        } else {
1034            panic!("aggregated_update_info must not called concurrently")
1035        }
1036    }
1037
1038    pub async fn wait_background_done(&self) {
1039        let listener = self.event_background_done.listen();
1040        if self
1041            .currently_scheduled_background_jobs
1042            .load(Ordering::Acquire)
1043            != 0
1044        {
1045            listener.await;
1046        }
1047    }
1048
1049    pub async fn stop_and_wait(&self) {
1050        turbo_tasks_future_scope(self.pin(), async move {
1051            self.backend.stopping(self);
1052            self.stopped.store(true, Ordering::Release);
1053            {
1054                let listener = self
1055                    .event_foreground_done
1056                    .listen_with_note(|| || "wait for stop".to_string());
1057                if self
1058                    .currently_scheduled_foreground_jobs
1059                    .load(Ordering::Acquire)
1060                    != 0
1061                {
1062                    listener.await;
1063                }
1064            }
1065            {
1066                let listener = self.event_background_done.listen();
1067                if self
1068                    .currently_scheduled_background_jobs
1069                    .load(Ordering::Acquire)
1070                    != 0
1071                {
1072                    listener.await;
1073                }
1074            }
1075            self.backend.stop(self);
1076        })
1077        .await;
1078    }
1079
1080    #[track_caller]
1081    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1082    where
1083        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1084        T::CallOnceFuture: Send,
1085    {
1086        let mut this = self.pin();
1087        this.begin_foreground_job();
1088        tokio::spawn(
1089            TURBO_TASKS
1090                .scope(this.clone(), async move {
1091                    if !this.stopped.load(Ordering::Acquire) {
1092                        this = func(this.clone()).await;
1093                    }
1094                    this.finish_foreground_job();
1095                })
1096                .in_current_span(),
1097        );
1098    }
1099
1100    #[track_caller]
1101    pub(crate) fn schedule_background_job<T>(&self, func: T)
1102    where
1103        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1104        T::CallOnceFuture: Send,
1105    {
1106        let mut this = self.pin();
1107        self.begin_background_job();
1108        tokio::spawn(
1109            TURBO_TASKS
1110                .scope(this.clone(), async move {
1111                    if !this.stopped.load(Ordering::Acquire) {
1112                        this = func(this).await;
1113                    }
1114                    this.finish_background_job();
1115                })
1116                .in_current_span(),
1117        );
1118    }
1119
1120    fn finish_current_task_state(&self) -> FinishedTaskState {
1121        let (stateful, has_invalidator) = CURRENT_TASK_STATE.with(|cell| {
1122            let CurrentTaskState {
1123                stateful,
1124                has_invalidator,
1125                ..
1126            } = &mut *cell.write().unwrap();
1127            (*stateful, *has_invalidator)
1128        });
1129
1130        FinishedTaskState {
1131            stateful,
1132            has_invalidator,
1133        }
1134    }
1135
1136    pub fn backend(&self) -> &B {
1137        &self.backend
1138    }
1139}
1140
1141struct FinishedTaskState {
1142    /// True if the task has state in cells
1143    stateful: bool,
1144
1145    /// True if the task uses an external invalidator
1146    has_invalidator: bool,
1147}
1148
1149impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1150    fn dynamic_call(
1151        &self,
1152        native_fn: &'static NativeFunction,
1153        this: Option<RawVc>,
1154        arg: Box<dyn MagicAny>,
1155        persistence: TaskPersistence,
1156    ) -> RawVc {
1157        self.dynamic_call(native_fn, this, arg, persistence)
1158    }
1159    fn native_call(
1160        &self,
1161        native_fn: &'static NativeFunction,
1162        this: Option<RawVc>,
1163        arg: Box<dyn MagicAny>,
1164        persistence: TaskPersistence,
1165    ) -> RawVc {
1166        self.native_call(native_fn, this, arg, persistence)
1167    }
1168    fn trait_call(
1169        &self,
1170        trait_method: &'static TraitMethod,
1171        this: RawVc,
1172        arg: Box<dyn MagicAny>,
1173        persistence: TaskPersistence,
1174    ) -> RawVc {
1175        self.trait_call(trait_method, this, arg, persistence)
1176    }
1177
1178    #[track_caller]
1179    fn run(
1180        &self,
1181        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1182    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1183        let this = self.pin();
1184        Box::pin(async move { this.run(future).await })
1185    }
1186
1187    #[track_caller]
1188    fn run_once(
1189        &self,
1190        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1191    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1192        let this = self.pin();
1193        Box::pin(async move { this.run_once(future).await })
1194    }
1195
1196    #[track_caller]
1197    fn run_once_with_reason(
1198        &self,
1199        reason: StaticOrArc<dyn InvalidationReason>,
1200        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1201    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1202        {
1203            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1204            reason_set.insert(reason);
1205        }
1206        let this = self.pin();
1207        Box::pin(async move { this.run_once(future).await })
1208    }
1209
1210    #[track_caller]
1211    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1212        self.start_once_process(future)
1213    }
1214}
1215
1216impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1217    #[instrument(level = "info", skip_all, name = "invalidate")]
1218    fn invalidate(&self, task: TaskId) {
1219        self.backend.invalidate_task(task, self);
1220    }
1221
1222    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1223    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1224        {
1225            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1226            reason_set.insert(reason);
1227        }
1228        self.backend.invalidate_task(task, self);
1229    }
1230
1231    fn invalidate_serialization(&self, task: TaskId) {
1232        self.backend.invalidate_serialization(task, self);
1233    }
1234
1235    fn try_read_task_output(
1236        &self,
1237        task: TaskId,
1238        options: ReadOutputOptions,
1239    ) -> Result<Result<RawVc, EventListener>> {
1240        self.backend.try_read_task_output(
1241            task,
1242            current_task_if_available("reading Vcs"),
1243            options,
1244            self,
1245        )
1246    }
1247
1248    fn try_read_task_cell(
1249        &self,
1250        task: TaskId,
1251        index: CellId,
1252        options: ReadCellOptions,
1253    ) -> Result<Result<TypedCellContent, EventListener>> {
1254        self.backend.try_read_task_cell(
1255            task,
1256            index,
1257            current_task_if_available("reading Vcs"),
1258            options,
1259            self,
1260        )
1261    }
1262
1263    fn try_read_own_task_cell(
1264        &self,
1265        current_task: TaskId,
1266        index: CellId,
1267        options: ReadCellOptions,
1268    ) -> Result<TypedCellContent> {
1269        self.backend
1270            .try_read_own_task_cell(current_task, index, options, self)
1271    }
1272
1273    fn try_read_local_output(
1274        &self,
1275        execution_id: ExecutionId,
1276        local_task_id: LocalTaskId,
1277    ) -> Result<Result<RawVc, EventListener>> {
1278        CURRENT_TASK_STATE.with(|gts| {
1279            let gts_read = gts.read().unwrap();
1280
1281            // Local Vcs are local to their parent task's current execution, and do not exist
1282            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1283            // marker trait. This assertion exists to handle any potential escapes that the
1284            // compile-time checks cannot capture.
1285            gts_read.assert_execution_id(execution_id);
1286
1287            match gts_read.get_local_task(local_task_id) {
1288                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1289                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1290            }
1291        })
1292    }
1293
1294    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1295        self.backend.read_task_collectibles(
1296            task,
1297            trait_id,
1298            current_task_if_available("reading collectibles"),
1299            self,
1300        )
1301    }
1302
1303    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1304        self.backend.emit_collectible(
1305            trait_type,
1306            collectible,
1307            current_task("emitting collectible"),
1308            self,
1309        );
1310    }
1311
1312    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1313        self.backend.unemit_collectible(
1314            trait_type,
1315            collectible,
1316            count,
1317            current_task("emitting collectible"),
1318            self,
1319        );
1320    }
1321
1322    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1323        for (&collectible, &count) in collectibles {
1324            if count > 0 {
1325                self.backend.unemit_collectible(
1326                    trait_type,
1327                    collectible,
1328                    count as u32,
1329                    current_task("emitting collectible"),
1330                    self,
1331                );
1332            }
1333        }
1334    }
1335
1336    fn read_own_task_cell(
1337        &self,
1338        task: TaskId,
1339        index: CellId,
1340        options: ReadCellOptions,
1341    ) -> Result<TypedCellContent> {
1342        self.try_read_own_task_cell(task, index, options)
1343    }
1344
1345    fn update_own_task_cell(
1346        &self,
1347        task: TaskId,
1348        index: CellId,
1349        content: CellContent,
1350        verification_mode: VerificationMode,
1351    ) {
1352        self.backend
1353            .update_task_cell(task, index, content, verification_mode, self);
1354    }
1355
1356    fn connect_task(&self, task: TaskId) {
1357        self.backend
1358            .connect_task(task, current_task_if_available("connecting task"), self);
1359    }
1360
1361    fn mark_own_task_as_finished(&self, task: TaskId) {
1362        self.backend.mark_own_task_as_finished(task, self);
1363    }
1364
1365    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1366        self.backend
1367            .set_own_task_aggregation_number(task, aggregation_number, self);
1368    }
1369
1370    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1371        self.backend.mark_own_task_as_session_dependent(task, self);
1372    }
1373
1374    /// Creates a future that inherits the current task id and task state. The current global task
1375    /// will wait for this future to be dropped before exiting.
1376    fn detached_for_testing(
1377        &self,
1378        fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1379    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1380        // this is similar to what happens for a local task, except that we keep the local task's
1381        // state as well.
1382        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1383        let tracked_fut = {
1384            let ts = global_task_state.read().unwrap();
1385            ts.local_task_tracker.track_future(fut)
1386        };
1387        Box::pin(TURBO_TASKS.scope(
1388            turbo_tasks(),
1389            CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1390        ))
1391    }
1392
1393    fn task_statistics(&self) -> &TaskStatisticsApi {
1394        self.backend.task_statistics()
1395    }
1396
1397    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1398        let this = self.pin();
1399        Box::pin(async move {
1400            this.stop_and_wait().await;
1401        })
1402    }
1403
1404    fn subscribe_to_compilation_events(
1405        &self,
1406        event_types: Option<Vec<String>>,
1407    ) -> Receiver<Arc<dyn CompilationEvent>> {
1408        self.compilation_events.subscribe(event_types)
1409    }
1410
1411    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1412        if let Err(e) = self.compilation_events.send(event) {
1413            tracing::warn!("Failed to send compilation event: {e}");
1414        }
1415    }
1416
1417    fn is_tracking_dependencies(&self) -> bool {
1418        self.backend.is_tracking_dependencies()
1419    }
1420}
1421
1422impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1423    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1424        self.pin()
1425    }
1426    fn backend(&self) -> &B {
1427        &self.backend
1428    }
1429
1430    #[track_caller]
1431    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1432        self.schedule_background_job(async move |this| {
1433            this.backend.run_backend_job(job, &*this).await;
1434            this
1435        })
1436    }
1437
1438    #[track_caller]
1439    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1440        self.schedule_foreground_job(async move |this| {
1441            this.backend.run_backend_job(job, &*this).await;
1442            this
1443        })
1444    }
1445
1446    #[track_caller]
1447    fn schedule(&self, task: TaskId) {
1448        self.schedule(task)
1449    }
1450
1451    fn program_duration_until(&self, instant: Instant) -> Duration {
1452        instant - self.program_start
1453    }
1454
1455    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1456        // SAFETY: This is a fresh id from the factory
1457        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1458    }
1459
1460    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1461        // SAFETY: This is a fresh id from the factory
1462        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1463    }
1464
1465    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1466        unsafe { self.task_id_factory.reuse(id.into()) }
1467    }
1468
1469    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1470        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1471    }
1472
1473    fn is_idle(&self) -> bool {
1474        self.currently_scheduled_foreground_jobs
1475            .load(Ordering::Acquire)
1476            == 0
1477    }
1478}
1479
1480pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1481    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1482        Ok(id) => id,
1483        Err(_) => panic!(
1484            "{from} can only be used in the context of a turbo_tasks task execution or \
1485             turbo_tasks run"
1486        ),
1487    }
1488}
1489
1490pub(crate) fn current_task(from: &str) -> TaskId {
1491    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1492        Ok(Some(id)) => id,
1493        Ok(None) | Err(_) => {
1494            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1495        }
1496    }
1497}
1498
1499pub async fn run<T: Send + 'static>(
1500    tt: Arc<dyn TurboTasksApi>,
1501    future: impl Future<Output = Result<T>> + Send + 'static,
1502) -> Result<T> {
1503    let (tx, rx) = tokio::sync::oneshot::channel();
1504
1505    tt.run(Box::pin(async move {
1506        let result = future.await?;
1507        tx.send(result)
1508            .map_err(|_| anyhow!("unable to send result"))?;
1509        Ok(())
1510    }))
1511    .await?;
1512
1513    Ok(rx.await?)
1514}
1515
1516pub async fn run_once<T: Send + 'static>(
1517    tt: Arc<dyn TurboTasksApi>,
1518    future: impl Future<Output = Result<T>> + Send + 'static,
1519) -> Result<T> {
1520    let (tx, rx) = tokio::sync::oneshot::channel();
1521
1522    tt.run_once(Box::pin(async move {
1523        let result = future.await?;
1524        tx.send(result)
1525            .map_err(|_| anyhow!("unable to send result"))?;
1526        Ok(())
1527    }))
1528    .await?;
1529
1530    Ok(rx.await?)
1531}
1532
1533pub async fn run_once_with_reason<T: Send + 'static>(
1534    tt: Arc<dyn TurboTasksApi>,
1535    reason: impl InvalidationReason,
1536    future: impl Future<Output = Result<T>> + Send + 'static,
1537) -> Result<T> {
1538    let (tx, rx) = tokio::sync::oneshot::channel();
1539
1540    tt.run_once_with_reason(
1541        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1542        Box::pin(async move {
1543            let result = future.await?;
1544            tx.send(result)
1545                .map_err(|_| anyhow!("unable to send result"))?;
1546            Ok(())
1547        }),
1548    )
1549    .await?;
1550
1551    Ok(rx.await?)
1552}
1553
1554/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1555pub fn dynamic_call(
1556    func: &'static NativeFunction,
1557    this: Option<RawVc>,
1558    arg: Box<dyn MagicAny>,
1559    persistence: TaskPersistence,
1560) -> RawVc {
1561    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1562}
1563
1564/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1565pub fn trait_call(
1566    trait_method: &'static TraitMethod,
1567    this: RawVc,
1568    arg: Box<dyn MagicAny>,
1569    persistence: TaskPersistence,
1570) -> RawVc {
1571    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1572}
1573
1574pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1575    TURBO_TASKS.with(|arc| arc.clone())
1576}
1577
1578pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1579    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1580}
1581
1582pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1583    TURBO_TASKS.with(|arc| func(arc))
1584}
1585
1586pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1587    TURBO_TASKS.sync_scope(tt, f)
1588}
1589
1590pub fn turbo_tasks_future_scope<T>(
1591    tt: Arc<dyn TurboTasksApi>,
1592    f: impl Future<Output = T>,
1593) -> impl Future<Output = T> {
1594    TURBO_TASKS.scope(tt, f)
1595}
1596
1597pub fn with_turbo_tasks_for_testing<T>(
1598    tt: Arc<dyn TurboTasksApi>,
1599    current_task: TaskId,
1600    execution_id: ExecutionId,
1601    f: impl Future<Output = T>,
1602) -> impl Future<Output = T> {
1603    TURBO_TASKS.scope(
1604        tt,
1605        CURRENT_TASK_STATE.scope(
1606            Arc::new(RwLock::new(CurrentTaskState::new(
1607                current_task,
1608                execution_id,
1609            ))),
1610            f,
1611        ),
1612    )
1613}
1614
1615/// Spawns the given future within the context of the current task.
1616///
1617/// Beware: this method is not safe to use in production code. It is only
1618/// intended for use in tests and for debugging purposes.
1619pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1620    tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1621}
1622
1623pub fn current_task_for_testing() -> Option<TaskId> {
1624    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1625}
1626
1627/// Marks the current task as dirty when restored from filesystem cache.
1628pub fn mark_session_dependent() {
1629    with_turbo_tasks(|tt| {
1630        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1631    });
1632}
1633
1634/// Marks the current task as a root in the aggregation graph.  This means it starts with the
1635/// correct aggregation number instead of needing to recompute it after the fact.
1636pub fn mark_root() {
1637    with_turbo_tasks(|tt| {
1638        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1639    });
1640}
1641
1642/// Marks the current task as finished. This excludes it from waiting for
1643/// strongly consistency.
1644pub fn mark_finished() {
1645    with_turbo_tasks(|tt| {
1646        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1647    });
1648}
1649
1650/// Marks the current task as stateful. This prevents the tasks from being
1651/// dropped without persisting the state.
1652///
1653/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1654/// serialization of the current task cells
1655pub fn mark_stateful() -> SerializationInvalidator {
1656    CURRENT_TASK_STATE.with(|cell| {
1657        let CurrentTaskState {
1658            stateful, task_id, ..
1659        } = &mut *cell.write().unwrap();
1660        *stateful = true;
1661        let Some(task_id) = *task_id else {
1662            panic!(
1663                "mark_stateful() can only be used in the context of a turbo_tasks task execution"
1664            );
1665        };
1666        SerializationInvalidator::new(task_id)
1667    })
1668}
1669
1670pub fn mark_invalidator() {
1671    CURRENT_TASK_STATE.with(|cell| {
1672        let CurrentTaskState {
1673            has_invalidator, ..
1674        } = &mut *cell.write().unwrap();
1675        *has_invalidator = true;
1676    })
1677}
1678
1679pub fn prevent_gc() {
1680    // There is a hack in UpdateCellOperation that need to be updated when this is changed.
1681    mark_stateful();
1682}
1683
1684pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1685    with_turbo_tasks(|tt| {
1686        let raw_vc = collectible.node.node;
1687        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1688    })
1689}
1690
1691pub(crate) async fn read_task_output(
1692    this: &dyn TurboTasksApi,
1693    id: TaskId,
1694    options: ReadOutputOptions,
1695) -> Result<RawVc> {
1696    loop {
1697        match this.try_read_task_output(id, options)? {
1698            Ok(result) => return Ok(result),
1699            Err(listener) => listener.await,
1700        }
1701    }
1702}
1703
1704pub(crate) async fn read_task_cell(
1705    this: &dyn TurboTasksApi,
1706    id: TaskId,
1707    index: CellId,
1708    options: ReadCellOptions,
1709) -> Result<TypedCellContent> {
1710    loop {
1711        match this.try_read_task_cell(id, index, options)? {
1712            Ok(result) => return Ok(result),
1713            Err(listener) => listener.await,
1714        }
1715    }
1716}
1717
1718/// A reference to a task's cell with methods that allow updating the contents
1719/// of the cell.
1720///
1721/// Mutations should not outside of the task that that owns this cell. Doing so
1722/// is a logic error, and may lead to incorrect caching behavior.
1723#[derive(Clone, Copy, Serialize, Deserialize)]
1724pub struct CurrentCellRef {
1725    current_task: TaskId,
1726    index: CellId,
1727}
1728
1729type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1730
1731impl CurrentCellRef {
1732    /// Updates the cell if the given `functor` returns a value.
1733    pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1734    where
1735        T: VcValueType,
1736    {
1737        self.conditional_update_with_shared_reference(|old_shared_reference| {
1738            let old_ref = old_shared_reference
1739                .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1740                .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1741            let new_value = functor(old_ref)?;
1742            Some(SharedReference::new(triomphe::Arc::new(
1743                <T::Read as VcRead<T>>::value_to_repr(new_value),
1744            )))
1745        })
1746    }
1747
1748    /// Updates the cell if the given `functor` returns a `SharedReference`.
1749    pub fn conditional_update_with_shared_reference(
1750        &self,
1751        functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1752    ) {
1753        let tt = turbo_tasks();
1754        let cell_content = tt
1755            .read_own_task_cell(
1756                self.current_task,
1757                self.index,
1758                ReadCellOptions {
1759                    // INVALIDATION: Reading our own cell must be untracked
1760                    tracking: ReadTracking::Untracked,
1761                    ..Default::default()
1762                },
1763            )
1764            .ok();
1765        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1766        if let Some(update) = update {
1767            tt.update_own_task_cell(
1768                self.current_task,
1769                self.index,
1770                CellContent(Some(update)),
1771                VerificationMode::EqualityCheck,
1772            )
1773        }
1774    }
1775
1776    /// Replace the current cell's content with `new_value` if the current content is not equal by
1777    /// value with the existing content.
1778    ///
1779    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1780    ///
1781    /// Take this example of a custom equality implementation on a transparent wrapper type:
1782    ///
1783    /// ```
1784    /// #[turbo_tasks::value(transparent, eq = "manual")]
1785    /// struct Wrapper(Vec<u32>);
1786    ///
1787    /// impl PartialEq for Wrapper {
1788    ///     fn eq(&self, other: Wrapper) {
1789    ///         // Example: order doesn't matter for equality
1790    ///         let (mut this, mut other) = (self.clone(), other.clone());
1791    ///         this.sort_unstable();
1792    ///         other.sort_unstable();
1793    ///         this == other
1794    ///     }
1795    /// }
1796    ///
1797    /// impl Eq for Wrapper {}
1798    /// ```
1799    ///
1800    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
1801    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
1802    ///
1803    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
1804    /// just forwards to the inner value's [`PartialEq`].
1805    ///
1806    /// If you already have a `SharedReference`, consider calling
1807    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
1808    /// object.
1809    pub fn compare_and_update<T>(&self, new_value: T)
1810    where
1811        T: PartialEq + VcValueType,
1812    {
1813        self.conditional_update(|old_value| {
1814            if let Some(old_value) = old_value
1815                && old_value == &new_value
1816            {
1817                return None;
1818            }
1819            Some(new_value)
1820        });
1821    }
1822
1823    /// Replace the current cell's content with `new_shared_reference` if the
1824    /// current content is not equal by value with the existing content.
1825    ///
1826    /// If you already have a `SharedReference`, this is a faster version of
1827    /// [`CurrentCellRef::compare_and_update`].
1828    ///
1829    /// The [`SharedReference`] is expected to use the `<T::Read as
1830    /// VcRead<T>>::Repr` type for its representation of the value.
1831    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1832    where
1833        T: VcValueType + PartialEq,
1834    {
1835        fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1836            <T::Read as VcRead<T>>::repr_to_value_ref(
1837                sr.0.downcast_ref::<VcReadRepr<T>>()
1838                    .expect("cannot update SharedReference of different type"),
1839            )
1840        }
1841        self.conditional_update_with_shared_reference(|old_sr| {
1842            if let Some(old_sr) = old_sr {
1843                let old_value: &T = extract_sr_value(old_sr);
1844                let new_value = extract_sr_value(&new_shared_reference);
1845                if old_value == new_value {
1846                    return None;
1847                }
1848            }
1849            Some(new_shared_reference)
1850        });
1851    }
1852
1853    /// Unconditionally updates the content of the cell.
1854    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
1855    where
1856        T: VcValueType,
1857    {
1858        let tt = turbo_tasks();
1859        tt.update_own_task_cell(
1860            self.current_task,
1861            self.index,
1862            CellContent(Some(SharedReference::new(triomphe::Arc::new(
1863                <T::Read as VcRead<T>>::value_to_repr(new_value),
1864            )))),
1865            verification_mode,
1866        )
1867    }
1868
1869    /// A faster version of [`Self::update`] if you already have a
1870    /// [`SharedReference`].
1871    ///
1872    /// If the passed-in [`SharedReference`] is the same as the existing cell's
1873    /// by identity, no update is performed.
1874    ///
1875    /// The [`SharedReference`] is expected to use the `<T::Read as
1876    /// VcRead<T>>::Repr` type for its representation of the value.
1877    pub fn update_with_shared_reference(
1878        &self,
1879        shared_ref: SharedReference,
1880        verification_mode: VerificationMode,
1881    ) {
1882        let tt = turbo_tasks();
1883        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
1884            let content = tt
1885                .read_own_task_cell(
1886                    self.current_task,
1887                    self.index,
1888                    ReadCellOptions {
1889                        // INVALIDATION: Reading our own cell must be untracked
1890                        tracking: ReadTracking::Untracked,
1891                        ..Default::default()
1892                    },
1893                )
1894                .ok();
1895            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1896                // pointer equality (not value equality)
1897                shared_ref_exp != shared_ref
1898            } else {
1899                true
1900            }
1901        } else {
1902            true
1903        };
1904        if update {
1905            tt.update_own_task_cell(
1906                self.current_task,
1907                self.index,
1908                CellContent(Some(shared_ref)),
1909                verification_mode,
1910            )
1911        }
1912    }
1913}
1914
1915impl From<CurrentCellRef> for RawVc {
1916    fn from(cell: CurrentCellRef) -> Self {
1917        RawVc::TaskCell(cell.current_task, cell.index)
1918    }
1919}
1920
1921pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
1922    CURRENT_TASK_STATE.with(|ts| {
1923        let current_task = current_task("celling turbo_tasks values");
1924        let mut ts = ts.write().unwrap();
1925        let map = ts.cell_counters.as_mut().unwrap();
1926        let current_index = map.entry(ty).or_default();
1927        let index = *current_index;
1928        *current_index += 1;
1929        CurrentCellRef {
1930            current_task,
1931            index: CellId { type_id: ty, index },
1932        }
1933    })
1934}
1935
1936pub(crate) async fn read_local_output(
1937    this: &dyn TurboTasksApi,
1938    execution_id: ExecutionId,
1939    local_task_id: LocalTaskId,
1940) -> Result<RawVc> {
1941    loop {
1942        match this.try_read_local_output(execution_id, local_task_id)? {
1943            Ok(raw_vc) => return Ok(raw_vc),
1944            Err(event_listener) => event_listener.await,
1945        }
1946    }
1947}