turbo_tasks/
manager.rs

1use std::{
2    fmt::Display,
3    future::Future,
4    hash::BuildHasherDefault,
5    mem::take,
6    pin::Pin,
7    sync::{
8        Arc, Mutex, RwLock, Weak,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11    time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use rustc_hash::FxHasher;
17use serde::{Deserialize, Serialize};
18use tokio::{select, sync::mpsc::Receiver, task_local};
19use tokio_util::task::TaskTracker;
20use tracing::{Instrument, instrument};
21
22use crate::{
23    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
24    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
25    VcValueTrait, VcValueType,
26    backend::{
27        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
28        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
29    },
30    capture_future::CaptureFuture,
31    event::{Event, EventListener},
32    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
33    id_factory::IdFactoryWithReuse,
34    macro_helpers::NativeFunction,
35    magic_any::MagicAny,
36    message_queue::{CompilationEvent, CompilationEventQueue},
37    raw_vc::{CellId, RawVc},
38    registry,
39    serialization_invalidation::SerializationInvalidator,
40    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
41    task_statistics::TaskStatisticsApi,
42    trace::TraceRawVcs,
43    util::{IdFactory, StaticOrArc},
44};
45
46/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
47/// tasks from function calls.
48pub trait TurboTasksCallApi: Sync + Send {
49    /// Calls a native function with arguments. Resolves arguments when needed
50    /// with a wrapper task.
51    fn dynamic_call(
52        &self,
53        native_fn: &'static NativeFunction,
54        this: Option<RawVc>,
55        arg: Box<dyn MagicAny>,
56        persistence: TaskPersistence,
57    ) -> RawVc;
58    /// Call a native function with arguments.
59    /// All inputs must be resolved.
60    fn native_call(
61        &self,
62        native_fn: &'static NativeFunction,
63        this: Option<RawVc>,
64        arg: Box<dyn MagicAny>,
65        persistence: TaskPersistence,
66    ) -> RawVc;
67    /// Calls a trait method with arguments. First input is the `self` object.
68    /// Uses a wrapper task to resolve
69    fn trait_call(
70        &self,
71        trait_method: &'static TraitMethod,
72        this: RawVc,
73        arg: Box<dyn MagicAny>,
74        persistence: TaskPersistence,
75    ) -> RawVc;
76
77    fn run(
78        &self,
79        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
81    fn run_once(
82        &self,
83        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
84    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
85    fn run_once_with_reason(
86        &self,
87        reason: StaticOrArc<dyn InvalidationReason>,
88        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
90    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
91}
92
93/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
94/// context. Returned by the [`turbo_tasks`] helper function.
95///
96/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
97/// parameter.
98pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
99    fn invalidate(&self, task: TaskId);
100    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
101
102    fn invalidate_serialization(&self, task: TaskId);
103
104    fn try_read_task_output(
105        &self,
106        task: TaskId,
107        options: ReadOutputOptions,
108    ) -> Result<Result<RawVc, EventListener>>;
109
110    fn try_read_task_cell(
111        &self,
112        task: TaskId,
113        index: CellId,
114        options: ReadCellOptions,
115    ) -> Result<Result<TypedCellContent, EventListener>>;
116
117    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
118    /// task points to.
119    ///
120    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
121    /// recursively or in a loop.
122    ///
123    /// This does not accept a consistency argument, as you cannot control consistency of a read of
124    /// an operation owned by your own task. Strongly consistent reads are only allowed on
125    /// [`OperationVc`]s, which should never be local tasks.
126    ///
127    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
128    /// task to depend on itself.
129    ///
130    /// [`OperationVc`]: crate::OperationVc
131    fn try_read_local_output(
132        &self,
133        execution_id: ExecutionId,
134        local_task_id: LocalTaskId,
135    ) -> Result<Result<RawVc, EventListener>>;
136
137    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
138
139    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
140    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
141    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
142
143    /// INVALIDATION: Be careful with this, it will not track dependencies, so
144    /// using it could break cache invalidation.
145    fn try_read_own_task_cell(
146        &self,
147        current_task: TaskId,
148        index: CellId,
149        options: ReadCellOptions,
150    ) -> Result<TypedCellContent>;
151
152    fn read_own_task_cell(
153        &self,
154        task: TaskId,
155        index: CellId,
156        options: ReadCellOptions,
157    ) -> Result<TypedCellContent>;
158    fn update_own_task_cell(
159        &self,
160        task: TaskId,
161        index: CellId,
162        content: CellContent,
163        verification_mode: VerificationMode,
164    );
165    fn mark_own_task_as_finished(&self, task: TaskId);
166    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
167    fn mark_own_task_as_session_dependent(&self, task: TaskId);
168
169    fn connect_task(&self, task: TaskId);
170
171    /// Wraps the given future in the current task.
172    ///
173    /// Beware: this method is not safe to use in production code. It is only intended for use in
174    /// tests and for debugging purposes.
175    fn detached_for_testing(
176        &self,
177        f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
178    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
179
180    fn task_statistics(&self) -> &TaskStatisticsApi;
181
182    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
183
184    fn subscribe_to_compilation_events(
185        &self,
186        event_types: Option<Vec<String>>,
187    ) -> Receiver<Arc<dyn CompilationEvent>>;
188    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
189
190    // Returns true if TurboTasks is configured to track dependencies.
191    fn is_tracking_dependencies(&self) -> bool;
192}
193
194/// A wrapper around a value that is unused.
195pub struct Unused<T> {
196    inner: T,
197}
198
199impl<T> Unused<T> {
200    /// Creates a new unused value.
201    ///
202    /// # Safety
203    ///
204    /// The wrapped value must not be used.
205    pub unsafe fn new_unchecked(inner: T) -> Self {
206        Self { inner }
207    }
208
209    /// Get the inner value, without consuming the `Unused` wrapper.
210    ///
211    /// # Safety
212    ///
213    /// The user need to make sure that the value stays unused.
214    pub unsafe fn get_unchecked(&self) -> &T {
215        &self.inner
216    }
217
218    /// Unwraps the value, consuming the `Unused` wrapper.
219    pub fn into(self) -> T {
220        self.inner
221    }
222}
223
224/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
225pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
226    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
227
228    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
229    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
230    /// # Safety
231    ///
232    /// The caller must ensure that the task id is not used anymore.
233    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
234    /// # Safety
235    ///
236    /// The caller must ensure that the task id is not used anymore.
237    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
238
239    /// Schedule a task for execution.
240    fn schedule(&self, task: TaskId);
241
242    /// Schedule a foreground backend job for execution.
243    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
244
245    /// Schedule a background backend job for execution.
246    ///
247    /// Background jobs are not counted towards activeness of the system. The system is considered
248    /// idle even with active background jobs.
249    fn schedule_backend_background_job(&self, job: B::BackendJob);
250
251    /// Returns the duration from the start of the program to the given instant.
252    fn program_duration_until(&self, instant: Instant) -> Duration;
253
254    /// Returns true if the system is idle.
255    fn is_idle(&self) -> bool;
256
257    /// Returns a reference to the backend.
258    fn backend(&self) -> &B;
259}
260
261#[allow(clippy::manual_non_exhaustive)]
262pub struct UpdateInfo {
263    pub duration: Duration,
264    pub tasks: usize,
265    pub reasons: InvalidationReasonSet,
266    #[allow(dead_code)]
267    placeholder_for_future_fields: (),
268}
269
270#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
271pub enum TaskPersistence {
272    /// Tasks that may be persisted across sessions using serialization.
273    Persistent,
274
275    /// Tasks that will be persisted in memory for the life of this session, but won't persist
276    /// between sessions.
277    ///
278    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
279    /// type [`TransientValue`][crate::value::TransientValue] or
280    /// [`TransientInstance`][crate::value::TransientInstance].
281    Transient,
282}
283
284impl Display for TaskPersistence {
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        match self {
287            TaskPersistence::Persistent => write!(f, "persistent"),
288            TaskPersistence::Transient => write!(f, "transient"),
289        }
290    }
291}
292
293#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
294pub enum ReadConsistency {
295    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
296    /// may later trigger re-computation.
297    #[default]
298    Eventual,
299    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
300    /// the cost of slower reads.
301    ///
302    /// Top-level code that returns data to the user should use strongly consistent reads.
303    Strong,
304}
305
306#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
307pub enum ReadTracking {
308    /// Reads are tracked as dependencies of the current task.
309    #[default]
310    Tracked,
311    /// The read is only tracked when there is an error, otherwise it is untracked.
312    ///
313    /// INVALIDATION: Be careful with this, it will not track dependencies, so
314    /// using it could break cache invalidation.
315    TrackOnlyError,
316    /// The read is not tracked as a dependency of the current task.
317    ///
318    /// INVALIDATION: Be careful with this, it will not track dependencies, so
319    /// using it could break cache invalidation.
320    Untracked,
321}
322
323impl ReadTracking {
324    pub fn should_track(&self, is_err: bool) -> bool {
325        match self {
326            ReadTracking::Tracked => true,
327            ReadTracking::TrackOnlyError => is_err,
328            ReadTracking::Untracked => false,
329        }
330    }
331}
332
333impl Display for ReadTracking {
334    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335        match self {
336            ReadTracking::Tracked => write!(f, "tracked"),
337            ReadTracking::TrackOnlyError => write!(f, "track only error"),
338            ReadTracking::Untracked => write!(f, "untracked"),
339        }
340    }
341}
342
343pub struct TurboTasks<B: Backend + 'static> {
344    this: Weak<Self>,
345    backend: B,
346    task_id_factory: IdFactoryWithReuse<TaskId>,
347    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
348    execution_id_factory: IdFactory<ExecutionId>,
349    stopped: AtomicBool,
350    currently_scheduled_foreground_jobs: AtomicUsize,
351    currently_scheduled_background_jobs: AtomicUsize,
352    scheduled_tasks: AtomicUsize,
353    start: Mutex<Option<Instant>>,
354    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
355    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
356    event_foreground_start: Event,
357    /// Event that is triggered when all foreground jobs are done
358    /// (currently_scheduled_foreground_jobs becomes zero)
359    event_foreground_done: Event,
360    /// Event that is triggered when all background jobs are done
361    event_background_done: Event,
362    program_start: Instant,
363    compilation_events: CompilationEventQueue,
364}
365
366/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
367/// all share the same non-local task state.
368///
369/// A non-local task is one that:
370///
371/// - Has a unique task id.
372/// - Is potentially cached.
373/// - The backend is aware of.
374struct CurrentTaskState {
375    task_id: Option<TaskId>,
376    execution_id: ExecutionId,
377
378    /// True if the current task has state in cells
379    stateful: bool,
380
381    /// True if the current task uses an external invalidator
382    has_invalidator: bool,
383
384    /// Tracks how many cells of each type has been allocated so far during this task execution.
385    /// When a task is re-executed, the cell count may not match the existing cell vec length.
386    ///
387    /// This is taken (and becomes `None`) during teardown of a task.
388    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
389
390    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
391    local_tasks: Vec<LocalTask>,
392
393    /// Tracks currently running local tasks, and defers cleanup of the global task until those
394    /// complete. Also used by `detached_for_testing`.
395    local_task_tracker: TaskTracker,
396}
397
398impl CurrentTaskState {
399    fn new(task_id: TaskId, execution_id: ExecutionId) -> Self {
400        Self {
401            task_id: Some(task_id),
402            execution_id,
403            stateful: false,
404            has_invalidator: false,
405            cell_counters: Some(AutoMap::default()),
406            local_tasks: Vec::new(),
407            local_task_tracker: TaskTracker::new(),
408        }
409    }
410
411    fn new_temporary(execution_id: ExecutionId) -> Self {
412        Self {
413            task_id: None,
414            execution_id,
415            stateful: false,
416            has_invalidator: false,
417            cell_counters: None,
418            local_tasks: Vec::new(),
419            local_task_tracker: TaskTracker::new(),
420        }
421    }
422
423    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
424        if self.execution_id != expected_execution_id {
425            panic!(
426                "Local tasks can only be scheduled/awaited within the same execution of the \
427                 parent task that created them"
428            );
429        }
430    }
431
432    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
433        self.local_tasks.push(local_task);
434        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
435        if cfg!(debug_assertions) {
436            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
437        } else {
438            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
439        }
440    }
441
442    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
443        // local task ids are one-indexed (they use NonZeroU32)
444        &self.local_tasks[(*local_task_id as usize) - 1]
445    }
446
447    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
448        &mut self.local_tasks[(*local_task_id as usize) - 1]
449    }
450}
451
452// TODO implement our own thread pool and make these thread locals instead
453task_local! {
454    /// The current TurboTasks instance
455    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
456
457    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
458}
459
460impl<B: Backend + 'static> TurboTasks<B> {
461    // TODO better lifetime management for turbo tasks
462    // consider using unsafe for the task_local turbo tasks
463    // that should be safe as long tasks can't outlife turbo task
464    // so we probably want to make sure that all tasks are joined
465    // when trying to drop turbo tasks
466    pub fn new(backend: B) -> Arc<Self> {
467        let task_id_factory = IdFactoryWithReuse::new(
468            TaskId::MIN,
469            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
470        );
471        let transient_task_id_factory =
472            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
473        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
474        let this = Arc::new_cyclic(|this| Self {
475            this: this.clone(),
476            backend,
477            task_id_factory,
478            transient_task_id_factory,
479            execution_id_factory,
480            stopped: AtomicBool::new(false),
481            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
482            currently_scheduled_background_jobs: AtomicUsize::new(0),
483            scheduled_tasks: AtomicUsize::new(0),
484            start: Default::default(),
485            aggregated_update: Default::default(),
486            event_foreground_done: Event::new(|| {
487                || "TurboTasks::event_foreground_done".to_string()
488            }),
489            event_foreground_start: Event::new(|| {
490                || "TurboTasks::event_foreground_start".to_string()
491            }),
492            event_background_done: Event::new(|| {
493                || "TurboTasks::event_background_done".to_string()
494            }),
495            program_start: Instant::now(),
496            compilation_events: CompilationEventQueue::default(),
497        });
498        this.backend.startup(&*this);
499        this
500    }
501
502    pub fn pin(&self) -> Arc<Self> {
503        self.this.upgrade().unwrap()
504    }
505
506    /// Creates a new root task
507    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
508    where
509        T: ?Sized,
510        F: Fn() -> Fut + Send + Sync + Clone + 'static,
511        Fut: Future<Output = Result<Vc<T>>> + Send,
512    {
513        let id = self.backend.create_transient_task(
514            TransientTaskType::Root(Box::new(move || {
515                let functor = functor.clone();
516                Box::pin(async move {
517                    let raw_vc = functor().await?.node;
518                    raw_vc.to_non_local().await
519                })
520            })),
521            self,
522        );
523        self.schedule(id);
524        id
525    }
526
527    pub fn dispose_root_task(&self, task_id: TaskId) {
528        self.backend.dispose_root_task(task_id, self);
529    }
530
531    // TODO make sure that all dependencies settle before reading them
532    /// Creates a new root task, that is only executed once.
533    /// Dependencies will not invalidate the task.
534    #[track_caller]
535    fn spawn_once_task<T, Fut>(&self, future: Fut)
536    where
537        T: ?Sized,
538        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
539    {
540        let id = self.backend.create_transient_task(
541            TransientTaskType::Once(Box::pin(async move {
542                let raw_vc = future.await?.node;
543                raw_vc.to_non_local().await
544            })),
545            self,
546        );
547        self.schedule(id);
548    }
549
550    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
551        &self,
552        future: impl Future<Output = Result<T>> + Send + 'static,
553    ) -> Result<T> {
554        let (tx, rx) = tokio::sync::oneshot::channel();
555        self.spawn_once_task(async move {
556            let result = future.await;
557            tx.send(result)
558                .map_err(|_| anyhow!("unable to send result"))?;
559            Ok(Completion::new())
560        });
561
562        rx.await?
563    }
564
565    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
566    pub async fn run<T: TraceRawVcs + Send + 'static>(
567        &self,
568        future: impl Future<Output = Result<T>> + Send + 'static,
569    ) -> Result<T, TurboTasksExecutionError> {
570        self.begin_foreground_job();
571        // it's okay for execution ids to overflow and wrap, they're just used for an assert
572        let execution_id = self.execution_id_factory.wrapping_get();
573        let current_task_state =
574            Arc::new(RwLock::new(CurrentTaskState::new_temporary(execution_id)));
575
576        let result = TURBO_TASKS
577            .scope(
578                self.pin(),
579                CURRENT_TASK_STATE.scope(current_task_state, async {
580                    let result = CaptureFuture::new(future).await;
581
582                    // wait for all spawned local tasks using `local` to finish
583                    let ltt =
584                        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_task_tracker.clone());
585                    ltt.close();
586                    ltt.wait().await;
587
588                    match result {
589                        Ok(Ok(raw_vc)) => Ok(raw_vc),
590                        Ok(Err(err)) => Err(err.into()),
591                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
592                    }
593                }),
594            )
595            .await;
596        self.finish_foreground_job();
597        result
598    }
599
600    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
601        let this = self.pin();
602        tokio::spawn(async move {
603            this.pin()
604                .run_once(async move {
605                    this.finish_foreground_job();
606                    future.await;
607                    this.begin_foreground_job();
608                    Ok(())
609                })
610                .await
611                .unwrap()
612        });
613    }
614
615    pub(crate) fn native_call(
616        &self,
617        native_fn: &'static NativeFunction,
618        this: Option<RawVc>,
619        arg: Box<dyn MagicAny>,
620        persistence: TaskPersistence,
621    ) -> RawVc {
622        let task_type = CachedTaskType {
623            native_fn,
624            this,
625            arg,
626        };
627        RawVc::TaskOutput(match persistence {
628            TaskPersistence::Transient => self.backend.get_or_create_transient_task(
629                task_type,
630                current_task_if_available("turbo_function calls"),
631                self,
632            ),
633            TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
634                task_type,
635                current_task_if_available("turbo_function calls"),
636                self,
637            ),
638        })
639    }
640
641    pub fn dynamic_call(
642        &self,
643        native_fn: &'static NativeFunction,
644        this: Option<RawVc>,
645        arg: Box<dyn MagicAny>,
646        persistence: TaskPersistence,
647    ) -> RawVc {
648        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
649            return self.native_call(native_fn, this, arg, persistence);
650        }
651        let task_type = LocalTaskSpec {
652            task_type: LocalTaskType::ResolveNative { native_fn },
653            this,
654            arg,
655        };
656        self.schedule_local_task(task_type, persistence)
657    }
658
659    pub fn trait_call(
660        &self,
661        trait_method: &'static TraitMethod,
662        this: RawVc,
663        arg: Box<dyn MagicAny>,
664        persistence: TaskPersistence,
665    ) -> RawVc {
666        // avoid creating a wrapper task if self is already resolved
667        // for resolved cells we already know the value type so we can lookup the
668        // function
669        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
670            match registry::get_value_type(type_id).get_trait_method(trait_method) {
671                Some(native_fn) => {
672                    let arg = native_fn.arg_meta.filter_owned(arg);
673                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
674                }
675                None => {
676                    // We are destined to fail at this point, but we just retry resolution in the
677                    // local task since we cannot report an error from here.
678                    // TODO: A panic seems appropriate since the immediate caller is to blame
679                }
680            }
681        }
682
683        // create a wrapper task to resolve all inputs
684        let task_type = LocalTaskSpec {
685            task_type: LocalTaskType::ResolveTrait { trait_method },
686            this: Some(this),
687            arg,
688        };
689
690        self.schedule_local_task(task_type, persistence)
691    }
692
693    #[track_caller]
694    pub(crate) fn schedule(&self, task_id: TaskId) {
695        self.begin_foreground_job();
696        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
697
698        let this = self.pin();
699        let future = async move {
700            let mut schedule_again = true;
701            while schedule_again {
702                // it's okay for execution ids to overflow and wrap, they're just used for an assert
703                let execution_id = this.execution_id_factory.wrapping_get();
704                let current_task_state =
705                    Arc::new(RwLock::new(CurrentTaskState::new(task_id, execution_id)));
706                let single_execution_future = async {
707                    if this.stopped.load(Ordering::Acquire) {
708                        this.backend.task_execution_canceled(task_id, &*this);
709                        return false;
710                    }
711
712                    let Some(TaskExecutionSpec { future, span }) =
713                        this.backend.try_start_task_execution(task_id, &*this)
714                    else {
715                        return false;
716                    };
717
718                    async {
719                        let result = CaptureFuture::new(future).await;
720
721                        // wait for all spawned local tasks using `local` to finish
722                        let ltt = CURRENT_TASK_STATE
723                            .with(|ts| ts.read().unwrap().local_task_tracker.clone());
724                        ltt.close();
725                        ltt.wait().await;
726
727                        let result = match result {
728                            Ok(Ok(raw_vc)) => Ok(raw_vc),
729                            Ok(Err(err)) => Err(err.into()),
730                            Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
731                        };
732
733                        let FinishedTaskState {
734                            stateful,
735                            has_invalidator,
736                        } = this.finish_current_task_state();
737                        let cell_counters = CURRENT_TASK_STATE
738                            .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
739                        this.backend.task_execution_completed(
740                            task_id,
741                            result,
742                            &cell_counters,
743                            stateful,
744                            has_invalidator,
745                            &*this,
746                        )
747                    }
748                    .instrument(span)
749                    .await
750                };
751                schedule_again = CURRENT_TASK_STATE
752                    .scope(current_task_state, single_execution_future)
753                    .await;
754            }
755            this.finish_foreground_job();
756            anyhow::Ok(())
757        };
758
759        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
760
761        #[cfg(feature = "tokio_tracing")]
762        {
763            let description = self.backend.get_task_description(task_id);
764            tokio::task::Builder::new()
765                .name(&description)
766                .spawn(future)
767                .unwrap();
768        }
769        #[cfg(not(feature = "tokio_tracing"))]
770        tokio::task::spawn(future);
771    }
772
773    fn schedule_local_task(
774        &self,
775        ty: LocalTaskSpec,
776        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
777        persistence: TaskPersistence,
778    ) -> RawVc {
779        let task_type = ty.task_type;
780        let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
781            .with(|gts| {
782                let mut gts_write = gts.write().unwrap();
783                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
784                    done_event: Event::new(move || {
785                        move || format!("LocalTask({task_type})::done_event")
786                    }),
787                });
788                (
789                    Arc::clone(gts),
790                    gts_write.task_id,
791                    gts_write.execution_id,
792                    local_task_id,
793                )
794            });
795
796        #[cfg(feature = "tokio_tracing")]
797        let description = format!(
798            "[local] (parent: {}) {}",
799            self.backend.get_task_description(parent_task_id),
800            ty.task_type,
801        );
802        #[cfg(not(feature = "tokio_tracing"))]
803        let _ = parent_task_id; // suppress unused variable warning
804
805        let this = self.pin();
806        let future = async move {
807            let span = match &ty.task_type {
808                LocalTaskType::ResolveNative { native_fn } => native_fn.resolve_span(),
809                LocalTaskType::ResolveTrait { trait_method } => trait_method.resolve_span(),
810            };
811            async move {
812                let result = match ty.task_type {
813                    LocalTaskType::ResolveNative { native_fn } => {
814                        LocalTaskType::run_resolve_native(
815                            native_fn,
816                            ty.this,
817                            &*ty.arg,
818                            persistence,
819                            this,
820                        )
821                        .await
822                    }
823                    LocalTaskType::ResolveTrait { trait_method } => {
824                        LocalTaskType::run_resolve_trait(
825                            trait_method,
826                            ty.this.unwrap(),
827                            &*ty.arg,
828                            persistence,
829                            this,
830                        )
831                        .await
832                    }
833                };
834
835                let output = match result {
836                    Ok(raw_vc) => OutputContent::Link(raw_vc),
837                    Err(err) => OutputContent::Error(
838                        TurboTasksExecutionError::from(err).with_task_context(task_type, None),
839                    ),
840                };
841
842                let local_task = LocalTask::Done { output };
843
844                let done_event = CURRENT_TASK_STATE.with(move |gts| {
845                    let mut gts_write = gts.write().unwrap();
846                    let scheduled_task =
847                        std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
848                    let LocalTask::Scheduled { done_event } = scheduled_task else {
849                        panic!("local task finished, but was not in the scheduled state?");
850                    };
851                    done_event
852                });
853                done_event.notify(usize::MAX)
854            }
855            .instrument(span)
856            .await
857        };
858        let future = global_task_state
859            .read()
860            .unwrap()
861            .local_task_tracker
862            .track_future(future);
863        let future = CURRENT_TASK_STATE.scope(global_task_state, future);
864        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
865
866        #[cfg(feature = "tokio_tracing")]
867        tokio::task::Builder::new()
868            .name(&description)
869            .spawn(future)
870            .unwrap();
871        #[cfg(not(feature = "tokio_tracing"))]
872        tokio::task::spawn(future);
873
874        RawVc::LocalOutput(execution_id, local_task_id, persistence)
875    }
876
877    fn begin_foreground_job(&self) {
878        if self
879            .currently_scheduled_foreground_jobs
880            .fetch_add(1, Ordering::AcqRel)
881            == 0
882        {
883            *self.start.lock().unwrap() = Some(Instant::now());
884            self.event_foreground_start.notify(usize::MAX);
885            self.backend.idle_end(self);
886        }
887    }
888
889    fn finish_foreground_job(&self) {
890        if self
891            .currently_scheduled_foreground_jobs
892            .fetch_sub(1, Ordering::AcqRel)
893            == 1
894        {
895            self.backend.idle_start(self);
896            // That's not super race-condition-safe, but it's only for
897            // statistical reasons
898            let total = self.scheduled_tasks.load(Ordering::Acquire);
899            self.scheduled_tasks.store(0, Ordering::Release);
900            if let Some(start) = *self.start.lock().unwrap() {
901                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
902                if let Some(update) = update.as_mut() {
903                    update.0 += start.elapsed();
904                    update.1 += total;
905                } else {
906                    *update = Some((start.elapsed(), total));
907                }
908            }
909            self.event_foreground_done.notify(usize::MAX);
910        }
911    }
912
913    fn begin_background_job(&self) {
914        self.currently_scheduled_background_jobs
915            .fetch_add(1, Ordering::Relaxed);
916    }
917
918    fn finish_background_job(&self) {
919        if self
920            .currently_scheduled_background_jobs
921            .fetch_sub(1, Ordering::Relaxed)
922            == 1
923        {
924            self.event_background_done.notify(usize::MAX);
925        }
926    }
927
928    pub fn get_in_progress_count(&self) -> usize {
929        self.currently_scheduled_foreground_jobs
930            .load(Ordering::Acquire)
931    }
932
933    /// Waits for the given task to finish executing. This works by performing an untracked read,
934    /// and discarding the value of the task output.
935    ///
936    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
937    /// before all dependencies have completely settled.
938    ///
939    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
940    /// to fully settle before returning.
941    ///
942    /// As this function is typically called in top-level code that waits for results to be ready
943    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
944    pub async fn wait_task_completion(
945        &self,
946        id: TaskId,
947        consistency: ReadConsistency,
948    ) -> Result<()> {
949        read_task_output(
950            self,
951            id,
952            ReadOutputOptions {
953                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
954                tracking: ReadTracking::Untracked,
955                consistency,
956            },
957        )
958        .await?;
959        Ok(())
960    }
961
962    /// Returns [UpdateInfo] with all updates aggregated over a given duration
963    /// (`aggregation`). Will wait until an update happens.
964    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
965        self.aggregated_update_info(aggregation, Duration::MAX)
966            .await
967            .unwrap()
968    }
969
970    /// Returns [UpdateInfo] with all updates aggregated over a given duration
971    /// (`aggregation`). Will only return None when the timeout is reached while
972    /// waiting for the first update.
973    pub async fn aggregated_update_info(
974        &self,
975        aggregation: Duration,
976        timeout: Duration,
977    ) -> Option<UpdateInfo> {
978        let listener = self
979            .event_foreground_done
980            .listen_with_note(|| || "wait for update info".to_string());
981        let wait_for_finish = {
982            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
983            if aggregation.is_zero() {
984                if let Some((duration, tasks)) = update.take() {
985                    return Some(UpdateInfo {
986                        duration,
987                        tasks,
988                        reasons: take(reason_set),
989                        placeholder_for_future_fields: (),
990                    });
991                } else {
992                    true
993                }
994            } else {
995                update.is_none()
996            }
997        };
998        if wait_for_finish {
999            if timeout == Duration::MAX {
1000                // wait for finish
1001                listener.await;
1002            } else {
1003                // wait for start, then wait for finish or timeout
1004                let start_listener = self
1005                    .event_foreground_start
1006                    .listen_with_note(|| || "wait for update info".to_string());
1007                if self
1008                    .currently_scheduled_foreground_jobs
1009                    .load(Ordering::Acquire)
1010                    == 0
1011                {
1012                    start_listener.await;
1013                } else {
1014                    drop(start_listener);
1015                }
1016                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1017                    // Timeout
1018                    return None;
1019                }
1020            }
1021        }
1022        if !aggregation.is_zero() {
1023            loop {
1024                select! {
1025                    () = tokio::time::sleep(aggregation) => {
1026                        break;
1027                    }
1028                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1029                        // Resets the sleep
1030                    }
1031                }
1032            }
1033        }
1034        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1035        if let Some((duration, tasks)) = update.take() {
1036            Some(UpdateInfo {
1037                duration,
1038                tasks,
1039                reasons: take(reason_set),
1040                placeholder_for_future_fields: (),
1041            })
1042        } else {
1043            panic!("aggregated_update_info must not called concurrently")
1044        }
1045    }
1046
1047    pub async fn wait_background_done(&self) {
1048        let listener = self.event_background_done.listen();
1049        if self
1050            .currently_scheduled_background_jobs
1051            .load(Ordering::Acquire)
1052            != 0
1053        {
1054            listener.await;
1055        }
1056    }
1057
1058    pub async fn stop_and_wait(&self) {
1059        turbo_tasks_future_scope(self.pin(), async move {
1060            self.backend.stopping(self);
1061            self.stopped.store(true, Ordering::Release);
1062            {
1063                let listener = self
1064                    .event_foreground_done
1065                    .listen_with_note(|| || "wait for stop".to_string());
1066                if self
1067                    .currently_scheduled_foreground_jobs
1068                    .load(Ordering::Acquire)
1069                    != 0
1070                {
1071                    listener.await;
1072                }
1073            }
1074            {
1075                let listener = self.event_background_done.listen();
1076                if self
1077                    .currently_scheduled_background_jobs
1078                    .load(Ordering::Acquire)
1079                    != 0
1080                {
1081                    listener.await;
1082                }
1083            }
1084            self.backend.stop(self);
1085        })
1086        .await;
1087    }
1088
1089    #[track_caller]
1090    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1091    where
1092        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1093        T::CallOnceFuture: Send,
1094    {
1095        let mut this = self.pin();
1096        this.begin_foreground_job();
1097        tokio::spawn(
1098            TURBO_TASKS
1099                .scope(this.clone(), async move {
1100                    if !this.stopped.load(Ordering::Acquire) {
1101                        this = func(this.clone()).await;
1102                    }
1103                    this.finish_foreground_job();
1104                })
1105                .in_current_span(),
1106        );
1107    }
1108
1109    #[track_caller]
1110    pub(crate) fn schedule_background_job<T>(&self, func: T)
1111    where
1112        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1113        T::CallOnceFuture: Send,
1114    {
1115        let mut this = self.pin();
1116        self.begin_background_job();
1117        tokio::spawn(
1118            TURBO_TASKS
1119                .scope(this.clone(), async move {
1120                    if !this.stopped.load(Ordering::Acquire) {
1121                        this = func(this).await;
1122                    }
1123                    this.finish_background_job();
1124                })
1125                .in_current_span(),
1126        );
1127    }
1128
1129    fn finish_current_task_state(&self) -> FinishedTaskState {
1130        let (stateful, has_invalidator) = CURRENT_TASK_STATE.with(|cell| {
1131            let CurrentTaskState {
1132                stateful,
1133                has_invalidator,
1134                ..
1135            } = &mut *cell.write().unwrap();
1136            (*stateful, *has_invalidator)
1137        });
1138
1139        FinishedTaskState {
1140            stateful,
1141            has_invalidator,
1142        }
1143    }
1144
1145    pub fn backend(&self) -> &B {
1146        &self.backend
1147    }
1148}
1149
1150struct FinishedTaskState {
1151    /// True if the task has state in cells
1152    stateful: bool,
1153
1154    /// True if the task uses an external invalidator
1155    has_invalidator: bool,
1156}
1157
1158impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1159    fn dynamic_call(
1160        &self,
1161        native_fn: &'static NativeFunction,
1162        this: Option<RawVc>,
1163        arg: Box<dyn MagicAny>,
1164        persistence: TaskPersistence,
1165    ) -> RawVc {
1166        self.dynamic_call(native_fn, this, arg, persistence)
1167    }
1168    fn native_call(
1169        &self,
1170        native_fn: &'static NativeFunction,
1171        this: Option<RawVc>,
1172        arg: Box<dyn MagicAny>,
1173        persistence: TaskPersistence,
1174    ) -> RawVc {
1175        self.native_call(native_fn, this, arg, persistence)
1176    }
1177    fn trait_call(
1178        &self,
1179        trait_method: &'static TraitMethod,
1180        this: RawVc,
1181        arg: Box<dyn MagicAny>,
1182        persistence: TaskPersistence,
1183    ) -> RawVc {
1184        self.trait_call(trait_method, this, arg, persistence)
1185    }
1186
1187    #[track_caller]
1188    fn run(
1189        &self,
1190        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1191    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1192        let this = self.pin();
1193        Box::pin(async move { this.run(future).await })
1194    }
1195
1196    #[track_caller]
1197    fn run_once(
1198        &self,
1199        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1200    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1201        let this = self.pin();
1202        Box::pin(async move { this.run_once(future).await })
1203    }
1204
1205    #[track_caller]
1206    fn run_once_with_reason(
1207        &self,
1208        reason: StaticOrArc<dyn InvalidationReason>,
1209        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1210    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1211        {
1212            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1213            reason_set.insert(reason);
1214        }
1215        let this = self.pin();
1216        Box::pin(async move { this.run_once(future).await })
1217    }
1218
1219    #[track_caller]
1220    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1221        self.start_once_process(future)
1222    }
1223}
1224
1225impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1226    #[instrument(level = "info", skip_all, name = "invalidate")]
1227    fn invalidate(&self, task: TaskId) {
1228        self.backend.invalidate_task(task, self);
1229    }
1230
1231    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1232    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1233        {
1234            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1235            reason_set.insert(reason);
1236        }
1237        self.backend.invalidate_task(task, self);
1238    }
1239
1240    fn invalidate_serialization(&self, task: TaskId) {
1241        self.backend.invalidate_serialization(task, self);
1242    }
1243
1244    fn try_read_task_output(
1245        &self,
1246        task: TaskId,
1247        options: ReadOutputOptions,
1248    ) -> Result<Result<RawVc, EventListener>> {
1249        self.backend.try_read_task_output(
1250            task,
1251            current_task_if_available("reading Vcs"),
1252            options,
1253            self,
1254        )
1255    }
1256
1257    fn try_read_task_cell(
1258        &self,
1259        task: TaskId,
1260        index: CellId,
1261        options: ReadCellOptions,
1262    ) -> Result<Result<TypedCellContent, EventListener>> {
1263        self.backend.try_read_task_cell(
1264            task,
1265            index,
1266            current_task_if_available("reading Vcs"),
1267            options,
1268            self,
1269        )
1270    }
1271
1272    fn try_read_own_task_cell(
1273        &self,
1274        current_task: TaskId,
1275        index: CellId,
1276        options: ReadCellOptions,
1277    ) -> Result<TypedCellContent> {
1278        self.backend
1279            .try_read_own_task_cell(current_task, index, options, self)
1280    }
1281
1282    fn try_read_local_output(
1283        &self,
1284        execution_id: ExecutionId,
1285        local_task_id: LocalTaskId,
1286    ) -> Result<Result<RawVc, EventListener>> {
1287        CURRENT_TASK_STATE.with(|gts| {
1288            let gts_read = gts.read().unwrap();
1289
1290            // Local Vcs are local to their parent task's current execution, and do not exist
1291            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1292            // marker trait. This assertion exists to handle any potential escapes that the
1293            // compile-time checks cannot capture.
1294            gts_read.assert_execution_id(execution_id);
1295
1296            match gts_read.get_local_task(local_task_id) {
1297                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1298                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1299            }
1300        })
1301    }
1302
1303    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1304        self.backend.read_task_collectibles(
1305            task,
1306            trait_id,
1307            current_task_if_available("reading collectibles"),
1308            self,
1309        )
1310    }
1311
1312    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1313        self.backend.emit_collectible(
1314            trait_type,
1315            collectible,
1316            current_task("emitting collectible"),
1317            self,
1318        );
1319    }
1320
1321    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1322        self.backend.unemit_collectible(
1323            trait_type,
1324            collectible,
1325            count,
1326            current_task("emitting collectible"),
1327            self,
1328        );
1329    }
1330
1331    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1332        for (&collectible, &count) in collectibles {
1333            if count > 0 {
1334                self.backend.unemit_collectible(
1335                    trait_type,
1336                    collectible,
1337                    count as u32,
1338                    current_task("emitting collectible"),
1339                    self,
1340                );
1341            }
1342        }
1343    }
1344
1345    fn read_own_task_cell(
1346        &self,
1347        task: TaskId,
1348        index: CellId,
1349        options: ReadCellOptions,
1350    ) -> Result<TypedCellContent> {
1351        self.try_read_own_task_cell(task, index, options)
1352    }
1353
1354    fn update_own_task_cell(
1355        &self,
1356        task: TaskId,
1357        index: CellId,
1358        content: CellContent,
1359        verification_mode: VerificationMode,
1360    ) {
1361        self.backend
1362            .update_task_cell(task, index, content, verification_mode, self);
1363    }
1364
1365    fn connect_task(&self, task: TaskId) {
1366        self.backend
1367            .connect_task(task, current_task_if_available("connecting task"), self);
1368    }
1369
1370    fn mark_own_task_as_finished(&self, task: TaskId) {
1371        self.backend.mark_own_task_as_finished(task, self);
1372    }
1373
1374    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1375        self.backend
1376            .set_own_task_aggregation_number(task, aggregation_number, self);
1377    }
1378
1379    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1380        self.backend.mark_own_task_as_session_dependent(task, self);
1381    }
1382
1383    /// Creates a future that inherits the current task id and task state. The current global task
1384    /// will wait for this future to be dropped before exiting.
1385    fn detached_for_testing(
1386        &self,
1387        fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1388    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1389        // this is similar to what happens for a local task, except that we keep the local task's
1390        // state as well.
1391        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1392        let tracked_fut = {
1393            let ts = global_task_state.read().unwrap();
1394            ts.local_task_tracker.track_future(fut)
1395        };
1396        Box::pin(TURBO_TASKS.scope(
1397            turbo_tasks(),
1398            CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1399        ))
1400    }
1401
1402    fn task_statistics(&self) -> &TaskStatisticsApi {
1403        self.backend.task_statistics()
1404    }
1405
1406    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1407        let this = self.pin();
1408        Box::pin(async move {
1409            this.stop_and_wait().await;
1410        })
1411    }
1412
1413    fn subscribe_to_compilation_events(
1414        &self,
1415        event_types: Option<Vec<String>>,
1416    ) -> Receiver<Arc<dyn CompilationEvent>> {
1417        self.compilation_events.subscribe(event_types)
1418    }
1419
1420    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1421        if let Err(e) = self.compilation_events.send(event) {
1422            tracing::warn!("Failed to send compilation event: {e}");
1423        }
1424    }
1425
1426    fn is_tracking_dependencies(&self) -> bool {
1427        self.backend.is_tracking_dependencies()
1428    }
1429}
1430
1431impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1432    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1433        self.pin()
1434    }
1435    fn backend(&self) -> &B {
1436        &self.backend
1437    }
1438
1439    #[track_caller]
1440    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1441        self.schedule_background_job(async move |this| {
1442            this.backend.run_backend_job(job, &*this).await;
1443            this
1444        })
1445    }
1446
1447    #[track_caller]
1448    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1449        self.schedule_foreground_job(async move |this| {
1450            this.backend.run_backend_job(job, &*this).await;
1451            this
1452        })
1453    }
1454
1455    #[track_caller]
1456    fn schedule(&self, task: TaskId) {
1457        self.schedule(task)
1458    }
1459
1460    fn program_duration_until(&self, instant: Instant) -> Duration {
1461        instant - self.program_start
1462    }
1463
1464    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1465        // SAFETY: This is a fresh id from the factory
1466        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1467    }
1468
1469    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1470        // SAFETY: This is a fresh id from the factory
1471        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1472    }
1473
1474    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1475        unsafe { self.task_id_factory.reuse(id.into()) }
1476    }
1477
1478    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1479        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1480    }
1481
1482    fn is_idle(&self) -> bool {
1483        self.currently_scheduled_foreground_jobs
1484            .load(Ordering::Acquire)
1485            == 0
1486    }
1487}
1488
1489pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1490    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1491        Ok(id) => id,
1492        Err(_) => panic!(
1493            "{from} can only be used in the context of a turbo_tasks task execution or \
1494             turbo_tasks run"
1495        ),
1496    }
1497}
1498
1499pub(crate) fn current_task(from: &str) -> TaskId {
1500    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1501        Ok(Some(id)) => id,
1502        Ok(None) | Err(_) => {
1503            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1504        }
1505    }
1506}
1507
1508pub async fn run<T: Send + 'static>(
1509    tt: Arc<dyn TurboTasksApi>,
1510    future: impl Future<Output = Result<T>> + Send + 'static,
1511) -> Result<T> {
1512    let (tx, rx) = tokio::sync::oneshot::channel();
1513
1514    tt.run(Box::pin(async move {
1515        let result = future.await?;
1516        tx.send(result)
1517            .map_err(|_| anyhow!("unable to send result"))?;
1518        Ok(())
1519    }))
1520    .await?;
1521
1522    Ok(rx.await?)
1523}
1524
1525pub async fn run_once<T: Send + 'static>(
1526    tt: Arc<dyn TurboTasksApi>,
1527    future: impl Future<Output = Result<T>> + Send + 'static,
1528) -> Result<T> {
1529    let (tx, rx) = tokio::sync::oneshot::channel();
1530
1531    tt.run_once(Box::pin(async move {
1532        let result = future.await?;
1533        tx.send(result)
1534            .map_err(|_| anyhow!("unable to send result"))?;
1535        Ok(())
1536    }))
1537    .await?;
1538
1539    Ok(rx.await?)
1540}
1541
1542pub async fn run_once_with_reason<T: Send + 'static>(
1543    tt: Arc<dyn TurboTasksApi>,
1544    reason: impl InvalidationReason,
1545    future: impl Future<Output = Result<T>> + Send + 'static,
1546) -> Result<T> {
1547    let (tx, rx) = tokio::sync::oneshot::channel();
1548
1549    tt.run_once_with_reason(
1550        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1551        Box::pin(async move {
1552            let result = future.await?;
1553            tx.send(result)
1554                .map_err(|_| anyhow!("unable to send result"))?;
1555            Ok(())
1556        }),
1557    )
1558    .await?;
1559
1560    Ok(rx.await?)
1561}
1562
1563/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1564pub fn dynamic_call(
1565    func: &'static NativeFunction,
1566    this: Option<RawVc>,
1567    arg: Box<dyn MagicAny>,
1568    persistence: TaskPersistence,
1569) -> RawVc {
1570    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1571}
1572
1573/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1574pub fn trait_call(
1575    trait_method: &'static TraitMethod,
1576    this: RawVc,
1577    arg: Box<dyn MagicAny>,
1578    persistence: TaskPersistence,
1579) -> RawVc {
1580    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1581}
1582
1583pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1584    TURBO_TASKS.with(|arc| arc.clone())
1585}
1586
1587pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1588    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1589}
1590
1591pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1592    TURBO_TASKS.with(|arc| func(arc))
1593}
1594
1595pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1596    TURBO_TASKS.sync_scope(tt, f)
1597}
1598
1599pub fn turbo_tasks_future_scope<T>(
1600    tt: Arc<dyn TurboTasksApi>,
1601    f: impl Future<Output = T>,
1602) -> impl Future<Output = T> {
1603    TURBO_TASKS.scope(tt, f)
1604}
1605
1606pub fn with_turbo_tasks_for_testing<T>(
1607    tt: Arc<dyn TurboTasksApi>,
1608    current_task: TaskId,
1609    execution_id: ExecutionId,
1610    f: impl Future<Output = T>,
1611) -> impl Future<Output = T> {
1612    TURBO_TASKS.scope(
1613        tt,
1614        CURRENT_TASK_STATE.scope(
1615            Arc::new(RwLock::new(CurrentTaskState::new(
1616                current_task,
1617                execution_id,
1618            ))),
1619            f,
1620        ),
1621    )
1622}
1623
1624/// Spawns the given future within the context of the current task.
1625///
1626/// Beware: this method is not safe to use in production code. It is only
1627/// intended for use in tests and for debugging purposes.
1628pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1629    tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1630}
1631
1632pub fn current_task_for_testing() -> Option<TaskId> {
1633    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1634}
1635
1636/// Marks the current task as dirty when restored from filesystem cache.
1637pub fn mark_session_dependent() {
1638    with_turbo_tasks(|tt| {
1639        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1640    });
1641}
1642
1643/// Marks the current task as a root in the aggregation graph.  This means it starts with the
1644/// correct aggregation number instead of needing to recompute it after the fact.
1645pub fn mark_root() {
1646    with_turbo_tasks(|tt| {
1647        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1648    });
1649}
1650
1651/// Marks the current task as finished. This excludes it from waiting for
1652/// strongly consistency.
1653pub fn mark_finished() {
1654    with_turbo_tasks(|tt| {
1655        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1656    });
1657}
1658
1659/// Marks the current task as stateful. This prevents the tasks from being
1660/// dropped without persisting the state.
1661///
1662/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1663/// serialization of the current task cells
1664pub fn mark_stateful() -> SerializationInvalidator {
1665    CURRENT_TASK_STATE.with(|cell| {
1666        let CurrentTaskState {
1667            stateful, task_id, ..
1668        } = &mut *cell.write().unwrap();
1669        *stateful = true;
1670        let Some(task_id) = *task_id else {
1671            panic!(
1672                "mark_stateful() can only be used in the context of a turbo_tasks task execution"
1673            );
1674        };
1675        SerializationInvalidator::new(task_id)
1676    })
1677}
1678
1679pub fn mark_invalidator() {
1680    CURRENT_TASK_STATE.with(|cell| {
1681        let CurrentTaskState {
1682            has_invalidator, ..
1683        } = &mut *cell.write().unwrap();
1684        *has_invalidator = true;
1685    })
1686}
1687
1688pub fn prevent_gc() {
1689    // There is a hack in UpdateCellOperation that need to be updated when this is changed.
1690    mark_stateful();
1691}
1692
1693pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1694    with_turbo_tasks(|tt| {
1695        let raw_vc = collectible.node.node;
1696        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1697    })
1698}
1699
1700pub(crate) async fn read_task_output(
1701    this: &dyn TurboTasksApi,
1702    id: TaskId,
1703    options: ReadOutputOptions,
1704) -> Result<RawVc> {
1705    loop {
1706        match this.try_read_task_output(id, options)? {
1707            Ok(result) => return Ok(result),
1708            Err(listener) => listener.await,
1709        }
1710    }
1711}
1712
1713pub(crate) async fn read_task_cell(
1714    this: &dyn TurboTasksApi,
1715    id: TaskId,
1716    index: CellId,
1717    options: ReadCellOptions,
1718) -> Result<TypedCellContent> {
1719    loop {
1720        match this.try_read_task_cell(id, index, options)? {
1721            Ok(result) => return Ok(result),
1722            Err(listener) => listener.await,
1723        }
1724    }
1725}
1726
1727/// A reference to a task's cell with methods that allow updating the contents
1728/// of the cell.
1729///
1730/// Mutations should not outside of the task that that owns this cell. Doing so
1731/// is a logic error, and may lead to incorrect caching behavior.
1732#[derive(Clone, Copy, Serialize, Deserialize)]
1733pub struct CurrentCellRef {
1734    current_task: TaskId,
1735    index: CellId,
1736}
1737
1738type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1739
1740impl CurrentCellRef {
1741    /// Updates the cell if the given `functor` returns a value.
1742    pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1743    where
1744        T: VcValueType,
1745    {
1746        self.conditional_update_with_shared_reference(|old_shared_reference| {
1747            let old_ref = old_shared_reference
1748                .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1749                .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1750            let new_value = functor(old_ref)?;
1751            Some(SharedReference::new(triomphe::Arc::new(
1752                <T::Read as VcRead<T>>::value_to_repr(new_value),
1753            )))
1754        })
1755    }
1756
1757    /// Updates the cell if the given `functor` returns a `SharedReference`.
1758    pub fn conditional_update_with_shared_reference(
1759        &self,
1760        functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1761    ) {
1762        let tt = turbo_tasks();
1763        let cell_content = tt
1764            .read_own_task_cell(
1765                self.current_task,
1766                self.index,
1767                ReadCellOptions {
1768                    // INVALIDATION: Reading our own cell must be untracked
1769                    tracking: ReadTracking::Untracked,
1770                    ..Default::default()
1771                },
1772            )
1773            .ok();
1774        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1775        if let Some(update) = update {
1776            tt.update_own_task_cell(
1777                self.current_task,
1778                self.index,
1779                CellContent(Some(update)),
1780                VerificationMode::EqualityCheck,
1781            )
1782        }
1783    }
1784
1785    /// Replace the current cell's content with `new_value` if the current content is not equal by
1786    /// value with the existing content.
1787    ///
1788    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1789    ///
1790    /// Take this example of a custom equality implementation on a transparent wrapper type:
1791    ///
1792    /// ```
1793    /// #[turbo_tasks::value(transparent, eq = "manual")]
1794    /// struct Wrapper(Vec<u32>);
1795    ///
1796    /// impl PartialEq for Wrapper {
1797    ///     fn eq(&self, other: Wrapper) {
1798    ///         // Example: order doesn't matter for equality
1799    ///         let (mut this, mut other) = (self.clone(), other.clone());
1800    ///         this.sort_unstable();
1801    ///         other.sort_unstable();
1802    ///         this == other
1803    ///     }
1804    /// }
1805    ///
1806    /// impl Eq for Wrapper {}
1807    /// ```
1808    ///
1809    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
1810    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
1811    ///
1812    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
1813    /// just forwards to the inner value's [`PartialEq`].
1814    ///
1815    /// If you already have a `SharedReference`, consider calling
1816    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
1817    /// object.
1818    pub fn compare_and_update<T>(&self, new_value: T)
1819    where
1820        T: PartialEq + VcValueType,
1821    {
1822        self.conditional_update(|old_value| {
1823            if let Some(old_value) = old_value
1824                && old_value == &new_value
1825            {
1826                return None;
1827            }
1828            Some(new_value)
1829        });
1830    }
1831
1832    /// Replace the current cell's content with `new_shared_reference` if the
1833    /// current content is not equal by value with the existing content.
1834    ///
1835    /// If you already have a `SharedReference`, this is a faster version of
1836    /// [`CurrentCellRef::compare_and_update`].
1837    ///
1838    /// The [`SharedReference`] is expected to use the `<T::Read as
1839    /// VcRead<T>>::Repr` type for its representation of the value.
1840    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1841    where
1842        T: VcValueType + PartialEq,
1843    {
1844        fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1845            <T::Read as VcRead<T>>::repr_to_value_ref(
1846                sr.0.downcast_ref::<VcReadRepr<T>>()
1847                    .expect("cannot update SharedReference of different type"),
1848            )
1849        }
1850        self.conditional_update_with_shared_reference(|old_sr| {
1851            if let Some(old_sr) = old_sr {
1852                let old_value: &T = extract_sr_value(old_sr);
1853                let new_value = extract_sr_value(&new_shared_reference);
1854                if old_value == new_value {
1855                    return None;
1856                }
1857            }
1858            Some(new_shared_reference)
1859        });
1860    }
1861
1862    /// Unconditionally updates the content of the cell.
1863    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
1864    where
1865        T: VcValueType,
1866    {
1867        let tt = turbo_tasks();
1868        tt.update_own_task_cell(
1869            self.current_task,
1870            self.index,
1871            CellContent(Some(SharedReference::new(triomphe::Arc::new(
1872                <T::Read as VcRead<T>>::value_to_repr(new_value),
1873            )))),
1874            verification_mode,
1875        )
1876    }
1877
1878    /// A faster version of [`Self::update`] if you already have a
1879    /// [`SharedReference`].
1880    ///
1881    /// If the passed-in [`SharedReference`] is the same as the existing cell's
1882    /// by identity, no update is performed.
1883    ///
1884    /// The [`SharedReference`] is expected to use the `<T::Read as
1885    /// VcRead<T>>::Repr` type for its representation of the value.
1886    pub fn update_with_shared_reference(
1887        &self,
1888        shared_ref: SharedReference,
1889        verification_mode: VerificationMode,
1890    ) {
1891        let tt = turbo_tasks();
1892        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
1893            let content = tt
1894                .read_own_task_cell(
1895                    self.current_task,
1896                    self.index,
1897                    ReadCellOptions {
1898                        // INVALIDATION: Reading our own cell must be untracked
1899                        tracking: ReadTracking::Untracked,
1900                        ..Default::default()
1901                    },
1902                )
1903                .ok();
1904            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1905                // pointer equality (not value equality)
1906                shared_ref_exp != shared_ref
1907            } else {
1908                true
1909            }
1910        } else {
1911            true
1912        };
1913        if update {
1914            tt.update_own_task_cell(
1915                self.current_task,
1916                self.index,
1917                CellContent(Some(shared_ref)),
1918                verification_mode,
1919            )
1920        }
1921    }
1922}
1923
1924impl From<CurrentCellRef> for RawVc {
1925    fn from(cell: CurrentCellRef) -> Self {
1926        RawVc::TaskCell(cell.current_task, cell.index)
1927    }
1928}
1929
1930pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
1931    CURRENT_TASK_STATE.with(|ts| {
1932        let current_task = current_task("celling turbo_tasks values");
1933        let mut ts = ts.write().unwrap();
1934        let map = ts.cell_counters.as_mut().unwrap();
1935        let current_index = map.entry(ty).or_default();
1936        let index = *current_index;
1937        *current_index += 1;
1938        CurrentCellRef {
1939            current_task,
1940            index: CellId { type_id: ty, index },
1941        }
1942    })
1943}
1944
1945pub(crate) async fn read_local_output(
1946    this: &dyn TurboTasksApi,
1947    execution_id: ExecutionId,
1948    local_task_id: LocalTaskId,
1949) -> Result<RawVc> {
1950    loop {
1951        match this.try_read_local_output(execution_id, local_task_id)? {
1952            Ok(raw_vc) => return Ok(raw_vc),
1953            Err(event_listener) => event_listener.await,
1954        }
1955    }
1956}