turbo_tasks/
manager.rs

1use std::{
2    fmt::Display,
3    future::Future,
4    hash::BuildHasherDefault,
5    mem::take,
6    pin::Pin,
7    sync::{
8        Arc, Mutex, RwLock, Weak,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11    time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use rustc_hash::FxHasher;
17use serde::{Deserialize, Serialize};
18use tokio::{select, sync::mpsc::Receiver, task_local};
19use tokio_util::task::TaskTracker;
20use tracing::{Instrument, instrument};
21
22use crate::{
23    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
24    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
25    VcValueTrait, VcValueType,
26    backend::{
27        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
28        TransientTaskType, TurboTasksExecutionError, TypedCellContent,
29    },
30    capture_future::CaptureFuture,
31    event::{Event, EventListener},
32    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
33    id_factory::IdFactoryWithReuse,
34    macro_helpers::NativeFunction,
35    magic_any::MagicAny,
36    message_queue::{CompilationEvent, CompilationEventQueue},
37    raw_vc::{CellId, RawVc},
38    registry,
39    serialization_invalidation::SerializationInvalidator,
40    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
41    task_statistics::TaskStatisticsApi,
42    trace::TraceRawVcs,
43    util::{IdFactory, StaticOrArc},
44};
45
46/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
47/// tasks from function calls.
48pub trait TurboTasksCallApi: Sync + Send {
49    /// Calls a native function with arguments. Resolves arguments when needed
50    /// with a wrapper task.
51    fn dynamic_call(
52        &self,
53        native_fn: &'static NativeFunction,
54        this: Option<RawVc>,
55        arg: Box<dyn MagicAny>,
56        persistence: TaskPersistence,
57    ) -> RawVc;
58    /// Call a native function with arguments.
59    /// All inputs must be resolved.
60    fn native_call(
61        &self,
62        native_fn: &'static NativeFunction,
63        this: Option<RawVc>,
64        arg: Box<dyn MagicAny>,
65        persistence: TaskPersistence,
66    ) -> RawVc;
67    /// Calls a trait method with arguments. First input is the `self` object.
68    /// Uses a wrapper task to resolve
69    fn trait_call(
70        &self,
71        trait_method: &'static TraitMethod,
72        this: RawVc,
73        arg: Box<dyn MagicAny>,
74        persistence: TaskPersistence,
75    ) -> RawVc;
76
77    fn run(
78        &self,
79        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
81    fn run_once(
82        &self,
83        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
84    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
85    fn run_once_with_reason(
86        &self,
87        reason: StaticOrArc<dyn InvalidationReason>,
88        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
90    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
91}
92
93/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
94/// context. Returned by the [`turbo_tasks`] helper function.
95///
96/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
97/// parameter.
98pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
99    fn invalidate(&self, task: TaskId);
100    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
101
102    fn invalidate_serialization(&self, task: TaskId);
103
104    fn try_read_task_output(
105        &self,
106        task: TaskId,
107        options: ReadOutputOptions,
108    ) -> Result<Result<RawVc, EventListener>>;
109
110    fn try_read_task_cell(
111        &self,
112        task: TaskId,
113        index: CellId,
114        options: ReadCellOptions,
115    ) -> Result<Result<TypedCellContent, EventListener>>;
116
117    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
118    /// task points to.
119    ///
120    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
121    /// recursively or in a loop.
122    ///
123    /// This does not accept a consistency argument, as you cannot control consistency of a read of
124    /// an operation owned by your own task. Strongly consistent reads are only allowed on
125    /// [`OperationVc`]s, which should never be local tasks.
126    ///
127    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
128    /// task to depend on itself.
129    ///
130    /// [`OperationVc`]: crate::OperationVc
131    fn try_read_local_output(
132        &self,
133        execution_id: ExecutionId,
134        local_task_id: LocalTaskId,
135    ) -> Result<Result<RawVc, EventListener>>;
136
137    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
138
139    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
140    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
141    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
142
143    /// INVALIDATION: Be careful with this, it will not track dependencies, so
144    /// using it could break cache invalidation.
145    fn try_read_own_task_cell(
146        &self,
147        current_task: TaskId,
148        index: CellId,
149        options: ReadCellOptions,
150    ) -> Result<TypedCellContent>;
151
152    fn read_own_task_cell(
153        &self,
154        task: TaskId,
155        index: CellId,
156        options: ReadCellOptions,
157    ) -> Result<TypedCellContent>;
158    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
159    fn mark_own_task_as_finished(&self, task: TaskId);
160    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
161    fn mark_own_task_as_session_dependent(&self, task: TaskId);
162
163    fn connect_task(&self, task: TaskId);
164
165    /// Wraps the given future in the current task.
166    ///
167    /// Beware: this method is not safe to use in production code. It is only intended for use in
168    /// tests and for debugging purposes.
169    fn detached_for_testing(
170        &self,
171        f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
172    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
173
174    fn task_statistics(&self) -> &TaskStatisticsApi;
175
176    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
177
178    fn subscribe_to_compilation_events(
179        &self,
180        event_types: Option<Vec<String>>,
181    ) -> Receiver<Arc<dyn CompilationEvent>>;
182    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
183
184    // Returns true if TurboTasks is configured to track dependencies.
185    fn is_tracking_dependencies(&self) -> bool;
186}
187
188/// A wrapper around a value that is unused.
189pub struct Unused<T> {
190    inner: T,
191}
192
193impl<T> Unused<T> {
194    /// Creates a new unused value.
195    ///
196    /// # Safety
197    ///
198    /// The wrapped value must not be used.
199    pub unsafe fn new_unchecked(inner: T) -> Self {
200        Self { inner }
201    }
202
203    /// Get the inner value, without consuming the `Unused` wrapper.
204    ///
205    /// # Safety
206    ///
207    /// The user need to make sure that the value stays unused.
208    pub unsafe fn get_unchecked(&self) -> &T {
209        &self.inner
210    }
211
212    /// Unwraps the value, consuming the `Unused` wrapper.
213    pub fn into(self) -> T {
214        self.inner
215    }
216}
217
218/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
219pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
220    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
221
222    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
223    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
224    /// # Safety
225    ///
226    /// The caller must ensure that the task id is not used anymore.
227    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
228    /// # Safety
229    ///
230    /// The caller must ensure that the task id is not used anymore.
231    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
232
233    /// Schedule a task for execution.
234    fn schedule(&self, task: TaskId);
235
236    /// Schedule a foreground backend job for execution.
237    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
238
239    /// Schedule a background backend job for execution.
240    ///
241    /// Background jobs are not counted towards activeness of the system. The system is considered
242    /// idle even with active background jobs.
243    fn schedule_backend_background_job(&self, job: B::BackendJob);
244
245    /// Returns the duration from the start of the program to the given instant.
246    fn program_duration_until(&self, instant: Instant) -> Duration;
247
248    /// Returns true if the system is idle.
249    fn is_idle(&self) -> bool;
250
251    /// Returns a reference to the backend.
252    fn backend(&self) -> &B;
253}
254
255#[allow(clippy::manual_non_exhaustive)]
256pub struct UpdateInfo {
257    pub duration: Duration,
258    pub tasks: usize,
259    pub reasons: InvalidationReasonSet,
260    #[allow(dead_code)]
261    placeholder_for_future_fields: (),
262}
263
264#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
265pub enum TaskPersistence {
266    /// Tasks that may be persisted across sessions using serialization.
267    Persistent,
268
269    /// Tasks that will be persisted in memory for the life of this session, but won't persist
270    /// between sessions.
271    ///
272    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
273    /// type [`TransientValue`][crate::value::TransientValue] or
274    /// [`TransientInstance`][crate::value::TransientInstance].
275    Transient,
276
277    /// Tasks that are persisted only for the lifetime of the nearest non-`Local` parent caller.
278    ///
279    /// This task does not have a unique task id, and is not shared with the backend. Instead it
280    /// uses the parent task's id.
281    ///
282    /// This is useful for functions that have a low cache hit rate. Those functions could be
283    /// converted to non-task functions, but that would break their function signature. This
284    /// provides a mechanism for skipping caching without changing the function signature.
285    Local,
286}
287
288#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
289pub enum ReadConsistency {
290    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
291    /// may later trigger re-computation.
292    #[default]
293    Eventual,
294    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
295    /// the cost of slower reads.
296    ///
297    /// Top-level code that returns data to the user should use strongly consistent reads.
298    Strong,
299}
300
301#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
302pub enum ReadTracking {
303    /// Reads are tracked as dependencies of the current task.
304    #[default]
305    Tracked,
306    /// The read is only tracked when there is an error, otherwise it is untracked.
307    ///
308    /// INVALIDATION: Be careful with this, it will not track dependencies, so
309    /// using it could break cache invalidation.
310    TrackOnlyError,
311    /// The read is not tracked as a dependency of the current task.
312    ///
313    /// INVALIDATION: Be careful with this, it will not track dependencies, so
314    /// using it could break cache invalidation.
315    Untracked,
316}
317
318impl ReadTracking {
319    pub fn should_track(&self, is_err: bool) -> bool {
320        match self {
321            ReadTracking::Tracked => true,
322            ReadTracking::TrackOnlyError => is_err,
323            ReadTracking::Untracked => false,
324        }
325    }
326}
327
328impl Display for ReadTracking {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        match self {
331            ReadTracking::Tracked => write!(f, "tracked"),
332            ReadTracking::TrackOnlyError => write!(f, "track only error"),
333            ReadTracking::Untracked => write!(f, "untracked"),
334        }
335    }
336}
337
338pub struct TurboTasks<B: Backend + 'static> {
339    this: Weak<Self>,
340    backend: B,
341    task_id_factory: IdFactoryWithReuse<TaskId>,
342    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
343    execution_id_factory: IdFactory<ExecutionId>,
344    stopped: AtomicBool,
345    currently_scheduled_foreground_jobs: AtomicUsize,
346    currently_scheduled_background_jobs: AtomicUsize,
347    scheduled_tasks: AtomicUsize,
348    start: Mutex<Option<Instant>>,
349    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
350    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
351    event_foreground_start: Event,
352    /// Event that is triggered when all foreground jobs are done
353    /// (currently_scheduled_foreground_jobs becomes zero)
354    event_foreground_done: Event,
355    /// Event that is triggered when all background jobs are done
356    event_background_done: Event,
357    program_start: Instant,
358    compilation_events: CompilationEventQueue,
359}
360
361/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
362/// all share the same non-local task state.
363///
364/// A non-local task is one that:
365///
366/// - Has a unique task id.
367/// - Is potentially cached.
368/// - The backend is aware of.
369struct CurrentTaskState {
370    task_id: Option<TaskId>,
371    execution_id: ExecutionId,
372
373    /// True if the current task has state in cells
374    stateful: bool,
375
376    /// True if the current task uses an external invalidator
377    has_invalidator: bool,
378
379    /// Tracks how many cells of each type has been allocated so far during this task execution.
380    /// When a task is re-executed, the cell count may not match the existing cell vec length.
381    ///
382    /// This is taken (and becomes `None`) during teardown of a task.
383    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
384
385    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
386    local_tasks: Vec<LocalTask>,
387
388    /// Tracks currently running local tasks, and defers cleanup of the global task until those
389    /// complete. Also used by `detached_for_testing`.
390    local_task_tracker: TaskTracker,
391}
392
393impl CurrentTaskState {
394    fn new(task_id: TaskId, execution_id: ExecutionId) -> Self {
395        Self {
396            task_id: Some(task_id),
397            execution_id,
398            stateful: false,
399            has_invalidator: false,
400            cell_counters: Some(AutoMap::default()),
401            local_tasks: Vec::new(),
402            local_task_tracker: TaskTracker::new(),
403        }
404    }
405
406    fn new_temporary(execution_id: ExecutionId) -> Self {
407        Self {
408            task_id: None,
409            execution_id,
410            stateful: false,
411            has_invalidator: false,
412            cell_counters: None,
413            local_tasks: Vec::new(),
414            local_task_tracker: TaskTracker::new(),
415        }
416    }
417
418    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
419        if self.execution_id != expected_execution_id {
420            panic!(
421                "Local tasks can only be scheduled/awaited within the same execution of the \
422                 parent task that created them"
423            );
424        }
425    }
426
427    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
428        self.local_tasks.push(local_task);
429        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
430        if cfg!(debug_assertions) {
431            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
432        } else {
433            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
434        }
435    }
436
437    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
438        // local task ids are one-indexed (they use NonZeroU32)
439        &self.local_tasks[(*local_task_id as usize) - 1]
440    }
441
442    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
443        &mut self.local_tasks[(*local_task_id as usize) - 1]
444    }
445}
446
447// TODO implement our own thread pool and make these thread locals instead
448task_local! {
449    /// The current TurboTasks instance
450    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
451
452    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
453}
454
455impl<B: Backend + 'static> TurboTasks<B> {
456    // TODO better lifetime management for turbo tasks
457    // consider using unsafe for the task_local turbo tasks
458    // that should be safe as long tasks can't outlife turbo task
459    // so we probably want to make sure that all tasks are joined
460    // when trying to drop turbo tasks
461    pub fn new(backend: B) -> Arc<Self> {
462        let task_id_factory = IdFactoryWithReuse::new(
463            TaskId::MIN,
464            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
465        );
466        let transient_task_id_factory =
467            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
468        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
469        let this = Arc::new_cyclic(|this| Self {
470            this: this.clone(),
471            backend,
472            task_id_factory,
473            transient_task_id_factory,
474            execution_id_factory,
475            stopped: AtomicBool::new(false),
476            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
477            currently_scheduled_background_jobs: AtomicUsize::new(0),
478            scheduled_tasks: AtomicUsize::new(0),
479            start: Default::default(),
480            aggregated_update: Default::default(),
481            event_foreground_done: Event::new(|| {
482                || "TurboTasks::event_foreground_done".to_string()
483            }),
484            event_foreground_start: Event::new(|| {
485                || "TurboTasks::event_foreground_start".to_string()
486            }),
487            event_background_done: Event::new(|| {
488                || "TurboTasks::event_background_done".to_string()
489            }),
490            program_start: Instant::now(),
491            compilation_events: CompilationEventQueue::default(),
492        });
493        this.backend.startup(&*this);
494        this
495    }
496
497    pub fn pin(&self) -> Arc<Self> {
498        self.this.upgrade().unwrap()
499    }
500
501    /// Creates a new root task
502    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
503    where
504        T: ?Sized,
505        F: Fn() -> Fut + Send + Sync + Clone + 'static,
506        Fut: Future<Output = Result<Vc<T>>> + Send,
507    {
508        let id = self.backend.create_transient_task(
509            TransientTaskType::Root(Box::new(move || {
510                let functor = functor.clone();
511                Box::pin(async move {
512                    let raw_vc = functor().await?.node;
513                    raw_vc.to_non_local().await
514                })
515            })),
516            self,
517        );
518        self.schedule(id);
519        id
520    }
521
522    pub fn dispose_root_task(&self, task_id: TaskId) {
523        self.backend.dispose_root_task(task_id, self);
524    }
525
526    // TODO make sure that all dependencies settle before reading them
527    /// Creates a new root task, that is only executed once.
528    /// Dependencies will not invalidate the task.
529    #[track_caller]
530    fn spawn_once_task<T, Fut>(&self, future: Fut)
531    where
532        T: ?Sized,
533        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
534    {
535        let id = self.backend.create_transient_task(
536            TransientTaskType::Once(Box::pin(async move {
537                let raw_vc = future.await?.node;
538                raw_vc.to_non_local().await
539            })),
540            self,
541        );
542        self.schedule(id);
543    }
544
545    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
546        &self,
547        future: impl Future<Output = Result<T>> + Send + 'static,
548    ) -> Result<T> {
549        let (tx, rx) = tokio::sync::oneshot::channel();
550        self.spawn_once_task(async move {
551            let result = future.await;
552            tx.send(result)
553                .map_err(|_| anyhow!("unable to send result"))?;
554            Ok(Completion::new())
555        });
556
557        rx.await?
558    }
559
560    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
561    pub async fn run<T: TraceRawVcs + Send + 'static>(
562        &self,
563        future: impl Future<Output = Result<T>> + Send + 'static,
564    ) -> Result<T, TurboTasksExecutionError> {
565        self.begin_foreground_job();
566        // it's okay for execution ids to overflow and wrap, they're just used for an assert
567        let execution_id = self.execution_id_factory.wrapping_get();
568        let current_task_state =
569            Arc::new(RwLock::new(CurrentTaskState::new_temporary(execution_id)));
570
571        let result = TURBO_TASKS
572            .scope(
573                self.pin(),
574                CURRENT_TASK_STATE.scope(current_task_state, async {
575                    let (result, _duration, _alloc_info) = CaptureFuture::new(future).await;
576
577                    // wait for all spawned local tasks using `local` to finish
578                    let ltt =
579                        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_task_tracker.clone());
580                    ltt.close();
581                    ltt.wait().await;
582
583                    match result {
584                        Ok(Ok(raw_vc)) => Ok(raw_vc),
585                        Ok(Err(err)) => Err(err.into()),
586                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
587                    }
588                }),
589            )
590            .await;
591        self.finish_foreground_job();
592        result
593    }
594
595    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
596        let this = self.pin();
597        tokio::spawn(async move {
598            this.pin()
599                .run_once(async move {
600                    this.finish_foreground_job();
601                    future.await;
602                    this.begin_foreground_job();
603                    Ok(())
604                })
605                .await
606                .unwrap()
607        });
608    }
609
610    pub(crate) fn native_call(
611        &self,
612        native_fn: &'static NativeFunction,
613        this: Option<RawVc>,
614        arg: Box<dyn MagicAny>,
615        persistence: TaskPersistence,
616    ) -> RawVc {
617        match persistence {
618            TaskPersistence::Local => {
619                let task_type = LocalTaskSpec {
620                    task_type: LocalTaskType::Native { native_fn },
621                    this,
622                    arg,
623                };
624                self.schedule_local_task(task_type, persistence)
625            }
626            TaskPersistence::Transient => {
627                let task_type = CachedTaskType {
628                    native_fn,
629                    this,
630                    arg,
631                };
632
633                RawVc::TaskOutput(self.backend.get_or_create_transient_task(
634                    task_type,
635                    current_task_if_available("turbo_function calls"),
636                    self,
637                ))
638            }
639            TaskPersistence::Persistent => {
640                let task_type = CachedTaskType {
641                    native_fn,
642                    this,
643                    arg,
644                };
645
646                RawVc::TaskOutput(self.backend.get_or_create_persistent_task(
647                    task_type,
648                    current_task_if_available("turbo_function calls"),
649                    self,
650                ))
651            }
652        }
653    }
654
655    pub fn dynamic_call(
656        &self,
657        native_fn: &'static NativeFunction,
658        this: Option<RawVc>,
659        arg: Box<dyn MagicAny>,
660        persistence: TaskPersistence,
661    ) -> RawVc {
662        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
663            return self.native_call(native_fn, this, arg, persistence);
664        }
665        let task_type = LocalTaskSpec {
666            task_type: LocalTaskType::ResolveNative { native_fn },
667            this,
668            arg,
669        };
670        self.schedule_local_task(task_type, persistence)
671    }
672
673    pub fn trait_call(
674        &self,
675        trait_method: &'static TraitMethod,
676        this: RawVc,
677        arg: Box<dyn MagicAny>,
678        persistence: TaskPersistence,
679    ) -> RawVc {
680        // avoid creating a wrapper task if self is already resolved
681        // for resolved cells we already know the value type so we can lookup the
682        // function
683        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
684            match registry::get_value_type(type_id).get_trait_method(trait_method) {
685                Some(native_fn) => {
686                    let arg = native_fn.arg_meta.filter_owned(arg);
687                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
688                }
689                None => {
690                    // We are destined to fail at this point, but we just retry resolution in the
691                    // local task since we cannot report an error from here.
692                    // TODO: A panic seems appropriate since the immediate caller is to blame
693                }
694            }
695        }
696
697        // create a wrapper task to resolve all inputs
698        let task_type = LocalTaskSpec {
699            task_type: LocalTaskType::ResolveTrait { trait_method },
700            this: Some(this),
701            arg,
702        };
703
704        self.schedule_local_task(task_type, persistence)
705    }
706
707    #[track_caller]
708    pub(crate) fn schedule(&self, task_id: TaskId) {
709        self.begin_foreground_job();
710        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
711
712        let this = self.pin();
713        let future = async move {
714            let mut schedule_again = true;
715            while schedule_again {
716                // it's okay for execution ids to overflow and wrap, they're just used for an assert
717                let execution_id = this.execution_id_factory.wrapping_get();
718                let current_task_state =
719                    Arc::new(RwLock::new(CurrentTaskState::new(task_id, execution_id)));
720                let single_execution_future = async {
721                    if this.stopped.load(Ordering::Acquire) {
722                        this.backend.task_execution_canceled(task_id, &*this);
723                        return false;
724                    }
725
726                    let Some(TaskExecutionSpec { future, span }) =
727                        this.backend.try_start_task_execution(task_id, &*this)
728                    else {
729                        return false;
730                    };
731
732                    async {
733                        let (result, duration, alloc_info) = CaptureFuture::new(future).await;
734
735                        // wait for all spawned local tasks using `local` to finish
736                        let ltt = CURRENT_TASK_STATE
737                            .with(|ts| ts.read().unwrap().local_task_tracker.clone());
738                        ltt.close();
739                        ltt.wait().await;
740
741                        let result = match result {
742                            Ok(Ok(raw_vc)) => Ok(raw_vc),
743                            Ok(Err(err)) => Err(err.into()),
744                            Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
745                        };
746
747                        let FinishedTaskState {
748                            stateful,
749                            has_invalidator,
750                        } = this.finish_current_task_state();
751                        let cell_counters = CURRENT_TASK_STATE
752                            .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
753                        this.backend.task_execution_completed(
754                            task_id,
755                            duration,
756                            alloc_info.memory_usage(),
757                            result,
758                            &cell_counters,
759                            stateful,
760                            has_invalidator,
761                            &*this,
762                        )
763                    }
764                    .instrument(span)
765                    .await
766                };
767                schedule_again = CURRENT_TASK_STATE
768                    .scope(current_task_state, single_execution_future)
769                    .await;
770            }
771            this.finish_foreground_job();
772            anyhow::Ok(())
773        };
774
775        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
776
777        #[cfg(feature = "tokio_tracing")]
778        {
779            let description = self.backend.get_task_description(task_id);
780            tokio::task::Builder::new()
781                .name(&description)
782                .spawn(future)
783                .unwrap();
784        }
785        #[cfg(not(feature = "tokio_tracing"))]
786        tokio::task::spawn(future);
787    }
788
789    fn schedule_local_task(
790        &self,
791        ty: LocalTaskSpec,
792        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
793        // if this is a `LocalTaskType::Native`, persistence is unused.
794        //
795        // TODO: In the rare case that we're crossing a transient->persistent boundary, we should
796        // force `LocalTaskType::Native` to be spawned as real tasks, so that any cells they create
797        // have the correct persistence. This is not an issue for resolution stub task, as they
798        // don't end up owning any cells.
799        persistence: TaskPersistence,
800    ) -> RawVc {
801        let task_type = ty.task_type;
802        let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
803            .with(|gts| {
804                let mut gts_write = gts.write().unwrap();
805                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
806                    done_event: Event::new(move || {
807                        move || format!("LocalTask({task_type})::done_event")
808                    }),
809                });
810                (
811                    Arc::clone(gts),
812                    gts_write.task_id,
813                    gts_write.execution_id,
814                    local_task_id,
815                )
816            });
817
818        #[cfg(feature = "tokio_tracing")]
819        let description = format!(
820            "[local] (parent: {}) {}",
821            self.backend.get_task_description(parent_task_id),
822            ty.task_type,
823        );
824        #[cfg(not(feature = "tokio_tracing"))]
825        let _ = parent_task_id; // suppress unused variable warning
826
827        let this = self.pin();
828        let future = async move {
829            let TaskExecutionSpec { future, span } =
830                crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
831            async move {
832                let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
833
834                let result = match result {
835                    Ok(Ok(raw_vc)) => Ok(raw_vc),
836                    Ok(Err(err)) => Err(err.into()),
837                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
838                };
839
840                let local_task = LocalTask::Done {
841                    output: match result {
842                        Ok(raw_vc) => OutputContent::Link(raw_vc),
843                        Err(err) => OutputContent::Error(err.with_task_context(task_type)),
844                    },
845                };
846
847                let done_event = CURRENT_TASK_STATE.with(move |gts| {
848                    let mut gts_write = gts.write().unwrap();
849                    let scheduled_task =
850                        std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
851                    let LocalTask::Scheduled { done_event } = scheduled_task else {
852                        panic!("local task finished, but was not in the scheduled state?");
853                    };
854                    done_event
855                });
856                done_event.notify(usize::MAX)
857            }
858            .instrument(span)
859            .await
860        };
861        let future = global_task_state
862            .read()
863            .unwrap()
864            .local_task_tracker
865            .track_future(future);
866        let future = CURRENT_TASK_STATE.scope(global_task_state, future);
867        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
868
869        #[cfg(feature = "tokio_tracing")]
870        tokio::task::Builder::new()
871            .name(&description)
872            .spawn(future)
873            .unwrap();
874        #[cfg(not(feature = "tokio_tracing"))]
875        tokio::task::spawn(future);
876
877        RawVc::LocalOutput(execution_id, local_task_id, persistence)
878    }
879
880    fn begin_foreground_job(&self) {
881        if self
882            .currently_scheduled_foreground_jobs
883            .fetch_add(1, Ordering::AcqRel)
884            == 0
885        {
886            *self.start.lock().unwrap() = Some(Instant::now());
887            self.event_foreground_start.notify(usize::MAX);
888            self.backend.idle_end(self);
889        }
890    }
891
892    fn finish_foreground_job(&self) {
893        if self
894            .currently_scheduled_foreground_jobs
895            .fetch_sub(1, Ordering::AcqRel)
896            == 1
897        {
898            self.backend.idle_start(self);
899            // That's not super race-condition-safe, but it's only for
900            // statistical reasons
901            let total = self.scheduled_tasks.load(Ordering::Acquire);
902            self.scheduled_tasks.store(0, Ordering::Release);
903            if let Some(start) = *self.start.lock().unwrap() {
904                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
905                if let Some(update) = update.as_mut() {
906                    update.0 += start.elapsed();
907                    update.1 += total;
908                } else {
909                    *update = Some((start.elapsed(), total));
910                }
911            }
912            self.event_foreground_done.notify(usize::MAX);
913        }
914    }
915
916    fn begin_background_job(&self) {
917        self.currently_scheduled_background_jobs
918            .fetch_add(1, Ordering::Relaxed);
919    }
920
921    fn finish_background_job(&self) {
922        if self
923            .currently_scheduled_background_jobs
924            .fetch_sub(1, Ordering::Relaxed)
925            == 1
926        {
927            self.event_background_done.notify(usize::MAX);
928        }
929    }
930
931    pub fn get_in_progress_count(&self) -> usize {
932        self.currently_scheduled_foreground_jobs
933            .load(Ordering::Acquire)
934    }
935
936    /// Waits for the given task to finish executing. This works by performing an untracked read,
937    /// and discarding the value of the task output.
938    ///
939    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
940    /// before all dependencies have completely settled.
941    ///
942    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
943    /// to fully settle before returning.
944    ///
945    /// As this function is typically called in top-level code that waits for results to be ready
946    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
947    pub async fn wait_task_completion(
948        &self,
949        id: TaskId,
950        consistency: ReadConsistency,
951    ) -> Result<()> {
952        read_task_output(
953            self,
954            id,
955            ReadOutputOptions {
956                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
957                tracking: ReadTracking::Untracked,
958                consistency,
959            },
960        )
961        .await?;
962        Ok(())
963    }
964
965    /// Returns [UpdateInfo] with all updates aggregated over a given duration
966    /// (`aggregation`). Will wait until an update happens.
967    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
968        self.aggregated_update_info(aggregation, Duration::MAX)
969            .await
970            .unwrap()
971    }
972
973    /// Returns [UpdateInfo] with all updates aggregated over a given duration
974    /// (`aggregation`). Will only return None when the timeout is reached while
975    /// waiting for the first update.
976    pub async fn aggregated_update_info(
977        &self,
978        aggregation: Duration,
979        timeout: Duration,
980    ) -> Option<UpdateInfo> {
981        let listener = self
982            .event_foreground_done
983            .listen_with_note(|| || "wait for update info".to_string());
984        let wait_for_finish = {
985            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
986            if aggregation.is_zero() {
987                if let Some((duration, tasks)) = update.take() {
988                    return Some(UpdateInfo {
989                        duration,
990                        tasks,
991                        reasons: take(reason_set),
992                        placeholder_for_future_fields: (),
993                    });
994                } else {
995                    true
996                }
997            } else {
998                update.is_none()
999            }
1000        };
1001        if wait_for_finish {
1002            if timeout == Duration::MAX {
1003                // wait for finish
1004                listener.await;
1005            } else {
1006                // wait for start, then wait for finish or timeout
1007                let start_listener = self
1008                    .event_foreground_start
1009                    .listen_with_note(|| || "wait for update info".to_string());
1010                if self
1011                    .currently_scheduled_foreground_jobs
1012                    .load(Ordering::Acquire)
1013                    == 0
1014                {
1015                    start_listener.await;
1016                } else {
1017                    drop(start_listener);
1018                }
1019                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1020                    // Timeout
1021                    return None;
1022                }
1023            }
1024        }
1025        if !aggregation.is_zero() {
1026            loop {
1027                select! {
1028                    () = tokio::time::sleep(aggregation) => {
1029                        break;
1030                    }
1031                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1032                        // Resets the sleep
1033                    }
1034                }
1035            }
1036        }
1037        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1038        if let Some((duration, tasks)) = update.take() {
1039            Some(UpdateInfo {
1040                duration,
1041                tasks,
1042                reasons: take(reason_set),
1043                placeholder_for_future_fields: (),
1044            })
1045        } else {
1046            panic!("aggregated_update_info must not called concurrently")
1047        }
1048    }
1049
1050    pub async fn wait_background_done(&self) {
1051        let listener = self.event_background_done.listen();
1052        if self
1053            .currently_scheduled_background_jobs
1054            .load(Ordering::Acquire)
1055            != 0
1056        {
1057            listener.await;
1058        }
1059    }
1060
1061    pub async fn stop_and_wait(&self) {
1062        turbo_tasks_future_scope(self.pin(), async move {
1063            self.backend.stopping(self);
1064            self.stopped.store(true, Ordering::Release);
1065            {
1066                let listener = self
1067                    .event_foreground_done
1068                    .listen_with_note(|| || "wait for stop".to_string());
1069                if self
1070                    .currently_scheduled_foreground_jobs
1071                    .load(Ordering::Acquire)
1072                    != 0
1073                {
1074                    listener.await;
1075                }
1076            }
1077            {
1078                let listener = self.event_background_done.listen();
1079                if self
1080                    .currently_scheduled_background_jobs
1081                    .load(Ordering::Acquire)
1082                    != 0
1083                {
1084                    listener.await;
1085                }
1086            }
1087            self.backend.stop(self);
1088        })
1089        .await;
1090    }
1091
1092    #[track_caller]
1093    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1094    where
1095        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1096        T::CallOnceFuture: Send,
1097    {
1098        let mut this = self.pin();
1099        this.begin_foreground_job();
1100        tokio::spawn(
1101            TURBO_TASKS
1102                .scope(this.clone(), async move {
1103                    if !this.stopped.load(Ordering::Acquire) {
1104                        this = func(this.clone()).await;
1105                    }
1106                    this.finish_foreground_job();
1107                })
1108                .in_current_span(),
1109        );
1110    }
1111
1112    #[track_caller]
1113    pub(crate) fn schedule_background_job<T>(&self, func: T)
1114    where
1115        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1116        T::CallOnceFuture: Send,
1117    {
1118        let mut this = self.pin();
1119        self.begin_background_job();
1120        tokio::spawn(
1121            TURBO_TASKS
1122                .scope(this.clone(), async move {
1123                    if !this.stopped.load(Ordering::Acquire) {
1124                        this = func(this).await;
1125                    }
1126                    this.finish_background_job();
1127                })
1128                .in_current_span(),
1129        );
1130    }
1131
1132    fn finish_current_task_state(&self) -> FinishedTaskState {
1133        let (stateful, has_invalidator) = CURRENT_TASK_STATE.with(|cell| {
1134            let CurrentTaskState {
1135                stateful,
1136                has_invalidator,
1137                ..
1138            } = &mut *cell.write().unwrap();
1139            (*stateful, *has_invalidator)
1140        });
1141
1142        FinishedTaskState {
1143            stateful,
1144            has_invalidator,
1145        }
1146    }
1147
1148    pub fn backend(&self) -> &B {
1149        &self.backend
1150    }
1151}
1152
1153struct FinishedTaskState {
1154    /// True if the task has state in cells
1155    stateful: bool,
1156
1157    /// True if the task uses an external invalidator
1158    has_invalidator: bool,
1159}
1160
1161impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1162    fn dynamic_call(
1163        &self,
1164        native_fn: &'static NativeFunction,
1165        this: Option<RawVc>,
1166        arg: Box<dyn MagicAny>,
1167        persistence: TaskPersistence,
1168    ) -> RawVc {
1169        self.dynamic_call(native_fn, this, arg, persistence)
1170    }
1171    fn native_call(
1172        &self,
1173        native_fn: &'static NativeFunction,
1174        this: Option<RawVc>,
1175        arg: Box<dyn MagicAny>,
1176        persistence: TaskPersistence,
1177    ) -> RawVc {
1178        self.native_call(native_fn, this, arg, persistence)
1179    }
1180    fn trait_call(
1181        &self,
1182        trait_method: &'static TraitMethod,
1183        this: RawVc,
1184        arg: Box<dyn MagicAny>,
1185        persistence: TaskPersistence,
1186    ) -> RawVc {
1187        self.trait_call(trait_method, this, arg, persistence)
1188    }
1189
1190    #[track_caller]
1191    fn run(
1192        &self,
1193        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1194    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1195        let this = self.pin();
1196        Box::pin(async move { this.run(future).await })
1197    }
1198
1199    #[track_caller]
1200    fn run_once(
1201        &self,
1202        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1203    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1204        let this = self.pin();
1205        Box::pin(async move { this.run_once(future).await })
1206    }
1207
1208    #[track_caller]
1209    fn run_once_with_reason(
1210        &self,
1211        reason: StaticOrArc<dyn InvalidationReason>,
1212        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1213    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1214        {
1215            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1216            reason_set.insert(reason);
1217        }
1218        let this = self.pin();
1219        Box::pin(async move { this.run_once(future).await })
1220    }
1221
1222    #[track_caller]
1223    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1224        self.start_once_process(future)
1225    }
1226}
1227
1228impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1229    #[instrument(level = "info", skip_all, name = "invalidate")]
1230    fn invalidate(&self, task: TaskId) {
1231        self.backend.invalidate_task(task, self);
1232    }
1233
1234    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1235    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1236        {
1237            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1238            reason_set.insert(reason);
1239        }
1240        self.backend.invalidate_task(task, self);
1241    }
1242
1243    fn invalidate_serialization(&self, task: TaskId) {
1244        self.backend.invalidate_serialization(task, self);
1245    }
1246
1247    fn try_read_task_output(
1248        &self,
1249        task: TaskId,
1250        options: ReadOutputOptions,
1251    ) -> Result<Result<RawVc, EventListener>> {
1252        self.backend.try_read_task_output(
1253            task,
1254            current_task_if_available("reading Vcs"),
1255            options,
1256            self,
1257        )
1258    }
1259
1260    fn try_read_task_cell(
1261        &self,
1262        task: TaskId,
1263        index: CellId,
1264        options: ReadCellOptions,
1265    ) -> Result<Result<TypedCellContent, EventListener>> {
1266        self.backend.try_read_task_cell(
1267            task,
1268            index,
1269            current_task_if_available("reading Vcs"),
1270            options,
1271            self,
1272        )
1273    }
1274
1275    fn try_read_own_task_cell(
1276        &self,
1277        current_task: TaskId,
1278        index: CellId,
1279        options: ReadCellOptions,
1280    ) -> Result<TypedCellContent> {
1281        self.backend
1282            .try_read_own_task_cell(current_task, index, options, self)
1283    }
1284
1285    fn try_read_local_output(
1286        &self,
1287        execution_id: ExecutionId,
1288        local_task_id: LocalTaskId,
1289    ) -> Result<Result<RawVc, EventListener>> {
1290        CURRENT_TASK_STATE.with(|gts| {
1291            let gts_read = gts.read().unwrap();
1292
1293            // Local Vcs are local to their parent task's current execution, and do not exist
1294            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1295            // marker trait. This assertion exists to handle any potential escapes that the
1296            // compile-time checks cannot capture.
1297            gts_read.assert_execution_id(execution_id);
1298
1299            match gts_read.get_local_task(local_task_id) {
1300                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1301                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1302            }
1303        })
1304    }
1305
1306    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1307        self.backend.read_task_collectibles(
1308            task,
1309            trait_id,
1310            current_task_if_available("reading collectibles"),
1311            self,
1312        )
1313    }
1314
1315    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1316        self.backend.emit_collectible(
1317            trait_type,
1318            collectible,
1319            current_task("emitting collectible"),
1320            self,
1321        );
1322    }
1323
1324    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1325        self.backend.unemit_collectible(
1326            trait_type,
1327            collectible,
1328            count,
1329            current_task("emitting collectible"),
1330            self,
1331        );
1332    }
1333
1334    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1335        for (&collectible, &count) in collectibles {
1336            if count > 0 {
1337                self.backend.unemit_collectible(
1338                    trait_type,
1339                    collectible,
1340                    count as u32,
1341                    current_task("emitting collectible"),
1342                    self,
1343                );
1344            }
1345        }
1346    }
1347
1348    fn read_own_task_cell(
1349        &self,
1350        task: TaskId,
1351        index: CellId,
1352        options: ReadCellOptions,
1353    ) -> Result<TypedCellContent> {
1354        self.try_read_own_task_cell(task, index, options)
1355    }
1356
1357    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
1358        self.backend.update_task_cell(task, index, content, self);
1359    }
1360
1361    fn connect_task(&self, task: TaskId) {
1362        self.backend
1363            .connect_task(task, current_task_if_available("connecting task"), self);
1364    }
1365
1366    fn mark_own_task_as_finished(&self, task: TaskId) {
1367        self.backend.mark_own_task_as_finished(task, self);
1368    }
1369
1370    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1371        self.backend
1372            .set_own_task_aggregation_number(task, aggregation_number, self);
1373    }
1374
1375    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1376        self.backend.mark_own_task_as_session_dependent(task, self);
1377    }
1378
1379    /// Creates a future that inherits the current task id and task state. The current global task
1380    /// will wait for this future to be dropped before exiting.
1381    fn detached_for_testing(
1382        &self,
1383        fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1384    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1385        // this is similar to what happens for a local task, except that we keep the local task's
1386        // state as well.
1387        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1388        let tracked_fut = {
1389            let ts = global_task_state.read().unwrap();
1390            ts.local_task_tracker.track_future(fut)
1391        };
1392        Box::pin(TURBO_TASKS.scope(
1393            turbo_tasks(),
1394            CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1395        ))
1396    }
1397
1398    fn task_statistics(&self) -> &TaskStatisticsApi {
1399        self.backend.task_statistics()
1400    }
1401
1402    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1403        let this = self.pin();
1404        Box::pin(async move {
1405            this.stop_and_wait().await;
1406        })
1407    }
1408
1409    fn subscribe_to_compilation_events(
1410        &self,
1411        event_types: Option<Vec<String>>,
1412    ) -> Receiver<Arc<dyn CompilationEvent>> {
1413        self.compilation_events.subscribe(event_types)
1414    }
1415
1416    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1417        if let Err(e) = self.compilation_events.send(event) {
1418            tracing::warn!("Failed to send compilation event: {e}");
1419        }
1420    }
1421
1422    fn is_tracking_dependencies(&self) -> bool {
1423        self.backend.is_tracking_dependencies()
1424    }
1425}
1426
1427impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1428    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1429        self.pin()
1430    }
1431    fn backend(&self) -> &B {
1432        &self.backend
1433    }
1434
1435    #[track_caller]
1436    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1437        self.schedule_background_job(async move |this| {
1438            this.backend.run_backend_job(job, &*this).await;
1439            this
1440        })
1441    }
1442
1443    #[track_caller]
1444    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1445        self.schedule_foreground_job(async move |this| {
1446            this.backend.run_backend_job(job, &*this).await;
1447            this
1448        })
1449    }
1450
1451    #[track_caller]
1452    fn schedule(&self, task: TaskId) {
1453        self.schedule(task)
1454    }
1455
1456    fn program_duration_until(&self, instant: Instant) -> Duration {
1457        instant - self.program_start
1458    }
1459
1460    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1461        // SAFETY: This is a fresh id from the factory
1462        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1463    }
1464
1465    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1466        // SAFETY: This is a fresh id from the factory
1467        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1468    }
1469
1470    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1471        unsafe { self.task_id_factory.reuse(id.into()) }
1472    }
1473
1474    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1475        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1476    }
1477
1478    fn is_idle(&self) -> bool {
1479        self.currently_scheduled_foreground_jobs
1480            .load(Ordering::Acquire)
1481            == 0
1482    }
1483}
1484
1485pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1486    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1487        Ok(id) => id,
1488        Err(_) => panic!(
1489            "{from} can only be used in the context of a turbo_tasks task execution or \
1490             turbo_tasks run"
1491        ),
1492    }
1493}
1494
1495pub(crate) fn current_task(from: &str) -> TaskId {
1496    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1497        Ok(Some(id)) => id,
1498        Ok(None) | Err(_) => {
1499            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1500        }
1501    }
1502}
1503
1504pub async fn run<T: Send + 'static>(
1505    tt: Arc<dyn TurboTasksApi>,
1506    future: impl Future<Output = Result<T>> + Send + 'static,
1507) -> Result<T> {
1508    let (tx, rx) = tokio::sync::oneshot::channel();
1509
1510    tt.run(Box::pin(async move {
1511        let result = future.await?;
1512        tx.send(result)
1513            .map_err(|_| anyhow!("unable to send result"))?;
1514        Ok(())
1515    }))
1516    .await?;
1517
1518    Ok(rx.await?)
1519}
1520
1521pub async fn run_once<T: Send + 'static>(
1522    tt: Arc<dyn TurboTasksApi>,
1523    future: impl Future<Output = Result<T>> + Send + 'static,
1524) -> Result<T> {
1525    let (tx, rx) = tokio::sync::oneshot::channel();
1526
1527    tt.run_once(Box::pin(async move {
1528        let result = future.await?;
1529        tx.send(result)
1530            .map_err(|_| anyhow!("unable to send result"))?;
1531        Ok(())
1532    }))
1533    .await?;
1534
1535    Ok(rx.await?)
1536}
1537
1538pub async fn run_once_with_reason<T: Send + 'static>(
1539    tt: Arc<dyn TurboTasksApi>,
1540    reason: impl InvalidationReason,
1541    future: impl Future<Output = Result<T>> + Send + 'static,
1542) -> Result<T> {
1543    let (tx, rx) = tokio::sync::oneshot::channel();
1544
1545    tt.run_once_with_reason(
1546        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1547        Box::pin(async move {
1548            let result = future.await?;
1549            tx.send(result)
1550                .map_err(|_| anyhow!("unable to send result"))?;
1551            Ok(())
1552        }),
1553    )
1554    .await?;
1555
1556    Ok(rx.await?)
1557}
1558
1559/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1560pub fn dynamic_call(
1561    func: &'static NativeFunction,
1562    this: Option<RawVc>,
1563    arg: Box<dyn MagicAny>,
1564    persistence: TaskPersistence,
1565) -> RawVc {
1566    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1567}
1568
1569/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1570pub fn trait_call(
1571    trait_method: &'static TraitMethod,
1572    this: RawVc,
1573    arg: Box<dyn MagicAny>,
1574    persistence: TaskPersistence,
1575) -> RawVc {
1576    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1577}
1578
1579pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1580    TURBO_TASKS.with(|arc| arc.clone())
1581}
1582
1583pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1584    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1585}
1586
1587pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1588    TURBO_TASKS.with(|arc| func(arc))
1589}
1590
1591pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1592    TURBO_TASKS.sync_scope(tt, f)
1593}
1594
1595pub fn turbo_tasks_future_scope<T>(
1596    tt: Arc<dyn TurboTasksApi>,
1597    f: impl Future<Output = T>,
1598) -> impl Future<Output = T> {
1599    TURBO_TASKS.scope(tt, f)
1600}
1601
1602pub fn with_turbo_tasks_for_testing<T>(
1603    tt: Arc<dyn TurboTasksApi>,
1604    current_task: TaskId,
1605    execution_id: ExecutionId,
1606    f: impl Future<Output = T>,
1607) -> impl Future<Output = T> {
1608    TURBO_TASKS.scope(
1609        tt,
1610        CURRENT_TASK_STATE.scope(
1611            Arc::new(RwLock::new(CurrentTaskState::new(
1612                current_task,
1613                execution_id,
1614            ))),
1615            f,
1616        ),
1617    )
1618}
1619
1620/// Spawns the given future within the context of the current task.
1621///
1622/// Beware: this method is not safe to use in production code. It is only
1623/// intended for use in tests and for debugging purposes.
1624pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1625    tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1626}
1627
1628pub fn current_task_for_testing() -> Option<TaskId> {
1629    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1630}
1631
1632/// Marks the current task as dirty when restored from filesystem cache.
1633pub fn mark_session_dependent() {
1634    with_turbo_tasks(|tt| {
1635        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1636    });
1637}
1638
1639/// Marks the current task as a root in the aggregation graph.  This means it starts with the
1640/// correct aggregation number instead of needing to recompute it after the fact.
1641pub fn mark_root() {
1642    with_turbo_tasks(|tt| {
1643        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1644    });
1645}
1646
1647/// Marks the current task as finished. This excludes it from waiting for
1648/// strongly consistency.
1649pub fn mark_finished() {
1650    with_turbo_tasks(|tt| {
1651        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1652    });
1653}
1654
1655/// Marks the current task as stateful. This prevents the tasks from being
1656/// dropped without persisting the state.
1657///
1658/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1659/// serialization of the current task cells
1660pub fn mark_stateful() -> SerializationInvalidator {
1661    CURRENT_TASK_STATE.with(|cell| {
1662        let CurrentTaskState {
1663            stateful, task_id, ..
1664        } = &mut *cell.write().unwrap();
1665        *stateful = true;
1666        let Some(task_id) = *task_id else {
1667            panic!(
1668                "mark_stateful() can only be used in the context of a turbo_tasks task execution"
1669            );
1670        };
1671        SerializationInvalidator::new(task_id)
1672    })
1673}
1674
1675pub fn mark_invalidator() {
1676    CURRENT_TASK_STATE.with(|cell| {
1677        let CurrentTaskState {
1678            has_invalidator, ..
1679        } = &mut *cell.write().unwrap();
1680        *has_invalidator = true;
1681    })
1682}
1683
1684pub fn prevent_gc() {
1685    // There is a hack in UpdateCellOperation that need to be updated when this is changed.
1686    mark_stateful();
1687}
1688
1689pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1690    with_turbo_tasks(|tt| {
1691        let raw_vc = collectible.node.node;
1692        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1693    })
1694}
1695
1696pub(crate) async fn read_task_output(
1697    this: &dyn TurboTasksApi,
1698    id: TaskId,
1699    options: ReadOutputOptions,
1700) -> Result<RawVc> {
1701    loop {
1702        match this.try_read_task_output(id, options)? {
1703            Ok(result) => return Ok(result),
1704            Err(listener) => listener.await,
1705        }
1706    }
1707}
1708
1709pub(crate) async fn read_task_cell(
1710    this: &dyn TurboTasksApi,
1711    id: TaskId,
1712    index: CellId,
1713    options: ReadCellOptions,
1714) -> Result<TypedCellContent> {
1715    loop {
1716        match this.try_read_task_cell(id, index, options)? {
1717            Ok(result) => return Ok(result),
1718            Err(listener) => listener.await,
1719        }
1720    }
1721}
1722
1723/// A reference to a task's cell with methods that allow updating the contents
1724/// of the cell.
1725///
1726/// Mutations should not outside of the task that that owns this cell. Doing so
1727/// is a logic error, and may lead to incorrect caching behavior.
1728#[derive(Clone, Copy, Serialize, Deserialize)]
1729pub struct CurrentCellRef {
1730    current_task: TaskId,
1731    index: CellId,
1732}
1733
1734type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1735
1736impl CurrentCellRef {
1737    /// Updates the cell if the given `functor` returns a value.
1738    pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1739    where
1740        T: VcValueType,
1741    {
1742        self.conditional_update_with_shared_reference(|old_shared_reference| {
1743            let old_ref = old_shared_reference
1744                .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1745                .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1746            let new_value = functor(old_ref)?;
1747            Some(SharedReference::new(triomphe::Arc::new(
1748                <T::Read as VcRead<T>>::value_to_repr(new_value),
1749            )))
1750        })
1751    }
1752
1753    /// Updates the cell if the given `functor` returns a `SharedReference`.
1754    pub fn conditional_update_with_shared_reference(
1755        &self,
1756        functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1757    ) {
1758        let tt = turbo_tasks();
1759        let cell_content = tt
1760            .read_own_task_cell(
1761                self.current_task,
1762                self.index,
1763                ReadCellOptions {
1764                    tracking: ReadTracking::Untracked,
1765                    ..Default::default()
1766                },
1767            )
1768            .ok();
1769        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1770        if let Some(update) = update {
1771            tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(update)))
1772        }
1773    }
1774
1775    /// Replace the current cell's content with `new_value` if the current content is not equal by
1776    /// value with the existing content.
1777    ///
1778    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1779    ///
1780    /// Take this example of a custom equality implementation on a transparent wrapper type:
1781    ///
1782    /// ```
1783    /// #[turbo_tasks::value(transparent, eq = "manual")]
1784    /// struct Wrapper(Vec<u32>);
1785    ///
1786    /// impl PartialEq for Wrapper {
1787    ///     fn eq(&self, other: Wrapper) {
1788    ///         // Example: order doesn't matter for equality
1789    ///         let (mut this, mut other) = (self.clone(), other.clone());
1790    ///         this.sort_unstable();
1791    ///         other.sort_unstable();
1792    ///         this == other
1793    ///     }
1794    /// }
1795    ///
1796    /// impl Eq for Wrapper {}
1797    /// ```
1798    ///
1799    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
1800    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
1801    ///
1802    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
1803    /// just forwards to the inner value's [`PartialEq`].
1804    ///
1805    /// If you already have a `SharedReference`, consider calling
1806    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
1807    /// object.
1808    pub fn compare_and_update<T>(&self, new_value: T)
1809    where
1810        T: PartialEq + VcValueType,
1811    {
1812        self.conditional_update(|old_value| {
1813            if let Some(old_value) = old_value
1814                && old_value == &new_value
1815            {
1816                return None;
1817            }
1818            Some(new_value)
1819        });
1820    }
1821
1822    /// Replace the current cell's content with `new_shared_reference` if the
1823    /// current content is not equal by value with the existing content.
1824    ///
1825    /// If you already have a `SharedReference`, this is a faster version of
1826    /// [`CurrentCellRef::compare_and_update`].
1827    ///
1828    /// The [`SharedReference`] is expected to use the `<T::Read as
1829    /// VcRead<T>>::Repr` type for its representation of the value.
1830    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1831    where
1832        T: VcValueType + PartialEq,
1833    {
1834        fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1835            <T::Read as VcRead<T>>::repr_to_value_ref(
1836                sr.0.downcast_ref::<VcReadRepr<T>>()
1837                    .expect("cannot update SharedReference of different type"),
1838            )
1839        }
1840        self.conditional_update_with_shared_reference(|old_sr| {
1841            if let Some(old_sr) = old_sr {
1842                let old_value: &T = extract_sr_value(old_sr);
1843                let new_value = extract_sr_value(&new_shared_reference);
1844                if old_value == new_value {
1845                    return None;
1846                }
1847            }
1848            Some(new_shared_reference)
1849        });
1850    }
1851
1852    /// Unconditionally updates the content of the cell.
1853    pub fn update<T>(&self, new_value: T)
1854    where
1855        T: VcValueType,
1856    {
1857        let tt = turbo_tasks();
1858        tt.update_own_task_cell(
1859            self.current_task,
1860            self.index,
1861            CellContent(Some(SharedReference::new(triomphe::Arc::new(
1862                <T::Read as VcRead<T>>::value_to_repr(new_value),
1863            )))),
1864        )
1865    }
1866
1867    /// A faster version of [`Self::update`] if you already have a
1868    /// [`SharedReference`].
1869    ///
1870    /// If the passed-in [`SharedReference`] is the same as the existing cell's
1871    /// by identity, no update is performed.
1872    ///
1873    /// The [`SharedReference`] is expected to use the `<T::Read as
1874    /// VcRead<T>>::Repr` type for its representation of the value.
1875    pub fn update_with_shared_reference(&self, shared_ref: SharedReference) {
1876        let tt = turbo_tasks();
1877        let content = tt
1878            .read_own_task_cell(
1879                self.current_task,
1880                self.index,
1881                ReadCellOptions {
1882                    tracking: ReadTracking::Untracked,
1883                    ..Default::default()
1884                },
1885            )
1886            .ok();
1887        let update = if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1888            // pointer equality (not value equality)
1889            shared_ref_exp != shared_ref
1890        } else {
1891            true
1892        };
1893        if update {
1894            tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(shared_ref)))
1895        }
1896    }
1897}
1898
1899impl From<CurrentCellRef> for RawVc {
1900    fn from(cell: CurrentCellRef) -> Self {
1901        RawVc::TaskCell(cell.current_task, cell.index)
1902    }
1903}
1904
1905pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
1906    CURRENT_TASK_STATE.with(|ts| {
1907        let current_task = current_task("celling turbo_tasks values");
1908        let mut ts = ts.write().unwrap();
1909        let map = ts.cell_counters.as_mut().unwrap();
1910        let current_index = map.entry(ty).or_default();
1911        let index = *current_index;
1912        *current_index += 1;
1913        CurrentCellRef {
1914            current_task,
1915            index: CellId { type_id: ty, index },
1916        }
1917    })
1918}
1919
1920pub(crate) async fn read_local_output(
1921    this: &dyn TurboTasksApi,
1922    execution_id: ExecutionId,
1923    local_task_id: LocalTaskId,
1924) -> Result<RawVc> {
1925    loop {
1926        match this.try_read_local_output(execution_id, local_task_id)? {
1927            Ok(raw_vc) => return Ok(raw_vc),
1928            Err(event_listener) => event_listener.await,
1929        }
1930    }
1931}