turbo_tasks/
manager.rs

1use std::{
2    fmt::Display,
3    future::Future,
4    hash::BuildHasherDefault,
5    mem::take,
6    pin::Pin,
7    sync::{
8        Arc, Mutex, RwLock, Weak,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11    time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use bincode::{Decode, Encode};
17use rustc_hash::FxHasher;
18use serde::{Deserialize, Serialize};
19use tokio::{select, sync::mpsc::Receiver, task_local};
20use tokio_util::task::TaskTracker;
21use tracing::{Instrument, instrument};
22
23use crate::{
24    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
25    ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
26    VcValueTrait, VcValueType,
27    backend::{
28        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
29        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
30    },
31    capture_future::CaptureFuture,
32    event::{Event, EventListener},
33    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
34    id_factory::IdFactoryWithReuse,
35    macro_helpers::NativeFunction,
36    magic_any::MagicAny,
37    message_queue::{CompilationEvent, CompilationEventQueue},
38    raw_vc::{CellId, RawVc},
39    registry,
40    serialization_invalidation::SerializationInvalidator,
41    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
42    task_statistics::TaskStatisticsApi,
43    trace::TraceRawVcs,
44    util::{IdFactory, StaticOrArc},
45};
46
47/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
48/// tasks from function calls.
49pub trait TurboTasksCallApi: Sync + Send {
50    /// Calls a native function with arguments. Resolves arguments when needed
51    /// with a wrapper task.
52    fn dynamic_call(
53        &self,
54        native_fn: &'static NativeFunction,
55        this: Option<RawVc>,
56        arg: Box<dyn MagicAny>,
57        persistence: TaskPersistence,
58    ) -> RawVc;
59    /// Call a native function with arguments.
60    /// All inputs must be resolved.
61    fn native_call(
62        &self,
63        native_fn: &'static NativeFunction,
64        this: Option<RawVc>,
65        arg: Box<dyn MagicAny>,
66        persistence: TaskPersistence,
67    ) -> RawVc;
68    /// Calls a trait method with arguments. First input is the `self` object.
69    /// Uses a wrapper task to resolve
70    fn trait_call(
71        &self,
72        trait_method: &'static TraitMethod,
73        this: RawVc,
74        arg: Box<dyn MagicAny>,
75        persistence: TaskPersistence,
76    ) -> RawVc;
77
78    fn run(
79        &self,
80        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
81    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
82    fn run_once(
83        &self,
84        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
85    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
86    fn run_once_with_reason(
87        &self,
88        reason: StaticOrArc<dyn InvalidationReason>,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
91    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
92}
93
94/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
95/// context. Returned by the [`turbo_tasks`] helper function.
96///
97/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
98/// parameter.
99pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
100    fn invalidate(&self, task: TaskId);
101    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
102
103    fn invalidate_serialization(&self, task: TaskId);
104
105    fn try_read_task_output(
106        &self,
107        task: TaskId,
108        options: ReadOutputOptions,
109    ) -> Result<Result<RawVc, EventListener>>;
110
111    fn try_read_task_cell(
112        &self,
113        task: TaskId,
114        index: CellId,
115        options: ReadCellOptions,
116    ) -> Result<Result<TypedCellContent, EventListener>>;
117
118    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
119    /// task points to.
120    ///
121    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
122    /// recursively or in a loop.
123    ///
124    /// This does not accept a consistency argument, as you cannot control consistency of a read of
125    /// an operation owned by your own task. Strongly consistent reads are only allowed on
126    /// [`OperationVc`]s, which should never be local tasks.
127    ///
128    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
129    /// task to depend on itself.
130    ///
131    /// [`OperationVc`]: crate::OperationVc
132    fn try_read_local_output(
133        &self,
134        execution_id: ExecutionId,
135        local_task_id: LocalTaskId,
136    ) -> Result<Result<RawVc, EventListener>>;
137
138    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
139
140    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
141    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
142    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
143
144    /// INVALIDATION: Be careful with this, it will not track dependencies, so
145    /// using it could break cache invalidation.
146    fn try_read_own_task_cell(
147        &self,
148        current_task: TaskId,
149        index: CellId,
150        options: ReadCellOptions,
151    ) -> Result<TypedCellContent>;
152
153    fn read_own_task_cell(
154        &self,
155        task: TaskId,
156        index: CellId,
157        options: ReadCellOptions,
158    ) -> Result<TypedCellContent>;
159    fn update_own_task_cell(
160        &self,
161        task: TaskId,
162        index: CellId,
163        is_serializable_cell_content: bool,
164        content: CellContent,
165        verification_mode: VerificationMode,
166    );
167    fn mark_own_task_as_finished(&self, task: TaskId);
168    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
169    fn mark_own_task_as_session_dependent(&self, task: TaskId);
170
171    fn connect_task(&self, task: TaskId);
172
173    /// Wraps the given future in the current task.
174    ///
175    /// Beware: this method is not safe to use in production code. It is only intended for use in
176    /// tests and for debugging purposes.
177    fn detached_for_testing(
178        &self,
179        f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
180    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
181
182    fn task_statistics(&self) -> &TaskStatisticsApi;
183
184    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
185
186    fn subscribe_to_compilation_events(
187        &self,
188        event_types: Option<Vec<String>>,
189    ) -> Receiver<Arc<dyn CompilationEvent>>;
190    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
191
192    // Returns true if TurboTasks is configured to track dependencies.
193    fn is_tracking_dependencies(&self) -> bool;
194}
195
196/// A wrapper around a value that is unused.
197pub struct Unused<T> {
198    inner: T,
199}
200
201impl<T> Unused<T> {
202    /// Creates a new unused value.
203    ///
204    /// # Safety
205    ///
206    /// The wrapped value must not be used.
207    pub unsafe fn new_unchecked(inner: T) -> Self {
208        Self { inner }
209    }
210
211    /// Get the inner value, without consuming the `Unused` wrapper.
212    ///
213    /// # Safety
214    ///
215    /// The user need to make sure that the value stays unused.
216    pub unsafe fn get_unchecked(&self) -> &T {
217        &self.inner
218    }
219
220    /// Unwraps the value, consuming the `Unused` wrapper.
221    pub fn into(self) -> T {
222        self.inner
223    }
224}
225
226/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
227pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
228    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
229
230    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
231    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
232    /// # Safety
233    ///
234    /// The caller must ensure that the task id is not used anymore.
235    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
236    /// # Safety
237    ///
238    /// The caller must ensure that the task id is not used anymore.
239    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
240
241    /// Schedule a task for execution.
242    fn schedule(&self, task: TaskId);
243
244    /// Schedule a foreground backend job for execution.
245    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
246
247    /// Schedule a background backend job for execution.
248    ///
249    /// Background jobs are not counted towards activeness of the system. The system is considered
250    /// idle even with active background jobs.
251    fn schedule_backend_background_job(&self, job: B::BackendJob);
252
253    /// Returns the duration from the start of the program to the given instant.
254    fn program_duration_until(&self, instant: Instant) -> Duration;
255
256    /// Returns true if the system is idle.
257    fn is_idle(&self) -> bool;
258
259    /// Returns a reference to the backend.
260    fn backend(&self) -> &B;
261}
262
263#[allow(clippy::manual_non_exhaustive)]
264pub struct UpdateInfo {
265    pub duration: Duration,
266    pub tasks: usize,
267    pub reasons: InvalidationReasonSet,
268    #[allow(dead_code)]
269    placeholder_for_future_fields: (),
270}
271
272#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
273pub enum TaskPersistence {
274    /// Tasks that may be persisted across sessions using serialization.
275    Persistent,
276
277    /// Tasks that will be persisted in memory for the life of this session, but won't persist
278    /// between sessions.
279    ///
280    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
281    /// type [`TransientValue`][crate::value::TransientValue] or
282    /// [`TransientInstance`][crate::value::TransientInstance].
283    Transient,
284}
285
286impl Display for TaskPersistence {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        match self {
289            TaskPersistence::Persistent => write!(f, "persistent"),
290            TaskPersistence::Transient => write!(f, "transient"),
291        }
292    }
293}
294
295#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
296pub enum ReadConsistency {
297    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
298    /// may later trigger re-computation.
299    #[default]
300    Eventual,
301    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
302    /// the cost of slower reads.
303    ///
304    /// Top-level code that returns data to the user should use strongly consistent reads.
305    Strong,
306}
307
308#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
309pub enum ReadTracking {
310    /// Reads are tracked as dependencies of the current task.
311    #[default]
312    Tracked,
313    /// The read is only tracked when there is an error, otherwise it is untracked.
314    ///
315    /// INVALIDATION: Be careful with this, it will not track dependencies, so
316    /// using it could break cache invalidation.
317    TrackOnlyError,
318    /// The read is not tracked as a dependency of the current task.
319    ///
320    /// INVALIDATION: Be careful with this, it will not track dependencies, so
321    /// using it could break cache invalidation.
322    Untracked,
323}
324
325impl ReadTracking {
326    pub fn should_track(&self, is_err: bool) -> bool {
327        match self {
328            ReadTracking::Tracked => true,
329            ReadTracking::TrackOnlyError => is_err,
330            ReadTracking::Untracked => false,
331        }
332    }
333}
334
335impl Display for ReadTracking {
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        match self {
338            ReadTracking::Tracked => write!(f, "tracked"),
339            ReadTracking::TrackOnlyError => write!(f, "track only error"),
340            ReadTracking::Untracked => write!(f, "untracked"),
341        }
342    }
343}
344
345pub struct TurboTasks<B: Backend + 'static> {
346    this: Weak<Self>,
347    backend: B,
348    task_id_factory: IdFactoryWithReuse<TaskId>,
349    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
350    execution_id_factory: IdFactory<ExecutionId>,
351    stopped: AtomicBool,
352    currently_scheduled_foreground_jobs: AtomicUsize,
353    currently_scheduled_background_jobs: AtomicUsize,
354    scheduled_tasks: AtomicUsize,
355    start: Mutex<Option<Instant>>,
356    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
357    /// Event that is triggered when currently_scheduled_foreground_jobs becomes non-zero
358    event_foreground_start: Event,
359    /// Event that is triggered when all foreground jobs are done
360    /// (currently_scheduled_foreground_jobs becomes zero)
361    event_foreground_done: Event,
362    /// Event that is triggered when all background jobs are done
363    event_background_done: Event,
364    program_start: Instant,
365    compilation_events: CompilationEventQueue,
366}
367
368/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
369/// all share the same non-local task state.
370///
371/// A non-local task is one that:
372///
373/// - Has a unique task id.
374/// - Is potentially cached.
375/// - The backend is aware of.
376struct CurrentTaskState {
377    task_id: Option<TaskId>,
378    execution_id: ExecutionId,
379
380    /// True if the current task has state in cells
381    stateful: bool,
382
383    /// True if the current task uses an external invalidator
384    has_invalidator: bool,
385
386    /// Tracks how many cells of each type has been allocated so far during this task execution.
387    /// When a task is re-executed, the cell count may not match the existing cell vec length.
388    ///
389    /// This is taken (and becomes `None`) during teardown of a task.
390    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
391
392    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
393    local_tasks: Vec<LocalTask>,
394
395    /// Tracks currently running local tasks, and defers cleanup of the global task until those
396    /// complete. Also used by `detached_for_testing`.
397    local_task_tracker: TaskTracker,
398}
399
400impl CurrentTaskState {
401    fn new(task_id: TaskId, execution_id: ExecutionId) -> Self {
402        Self {
403            task_id: Some(task_id),
404            execution_id,
405            stateful: false,
406            has_invalidator: false,
407            cell_counters: Some(AutoMap::default()),
408            local_tasks: Vec::new(),
409            local_task_tracker: TaskTracker::new(),
410        }
411    }
412
413    fn new_temporary(execution_id: ExecutionId) -> Self {
414        Self {
415            task_id: None,
416            execution_id,
417            stateful: false,
418            has_invalidator: false,
419            cell_counters: None,
420            local_tasks: Vec::new(),
421            local_task_tracker: TaskTracker::new(),
422        }
423    }
424
425    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
426        if self.execution_id != expected_execution_id {
427            panic!(
428                "Local tasks can only be scheduled/awaited within the same execution of the \
429                 parent task that created them"
430            );
431        }
432    }
433
434    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
435        self.local_tasks.push(local_task);
436        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
437        if cfg!(debug_assertions) {
438            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
439        } else {
440            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
441        }
442    }
443
444    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
445        // local task ids are one-indexed (they use NonZeroU32)
446        &self.local_tasks[(*local_task_id as usize) - 1]
447    }
448
449    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
450        &mut self.local_tasks[(*local_task_id as usize) - 1]
451    }
452}
453
454// TODO implement our own thread pool and make these thread locals instead
455task_local! {
456    /// The current TurboTasks instance
457    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
458
459    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
460}
461
462impl<B: Backend + 'static> TurboTasks<B> {
463    // TODO better lifetime management for turbo tasks
464    // consider using unsafe for the task_local turbo tasks
465    // that should be safe as long tasks can't outlife turbo task
466    // so we probably want to make sure that all tasks are joined
467    // when trying to drop turbo tasks
468    pub fn new(backend: B) -> Arc<Self> {
469        let task_id_factory = IdFactoryWithReuse::new(
470            TaskId::MIN,
471            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
472        );
473        let transient_task_id_factory =
474            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
475        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
476        let this = Arc::new_cyclic(|this| Self {
477            this: this.clone(),
478            backend,
479            task_id_factory,
480            transient_task_id_factory,
481            execution_id_factory,
482            stopped: AtomicBool::new(false),
483            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
484            currently_scheduled_background_jobs: AtomicUsize::new(0),
485            scheduled_tasks: AtomicUsize::new(0),
486            start: Default::default(),
487            aggregated_update: Default::default(),
488            event_foreground_done: Event::new(|| {
489                || "TurboTasks::event_foreground_done".to_string()
490            }),
491            event_foreground_start: Event::new(|| {
492                || "TurboTasks::event_foreground_start".to_string()
493            }),
494            event_background_done: Event::new(|| {
495                || "TurboTasks::event_background_done".to_string()
496            }),
497            program_start: Instant::now(),
498            compilation_events: CompilationEventQueue::default(),
499        });
500        this.backend.startup(&*this);
501        this
502    }
503
504    pub fn pin(&self) -> Arc<Self> {
505        self.this.upgrade().unwrap()
506    }
507
508    /// Creates a new root task
509    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
510    where
511        T: ?Sized,
512        F: Fn() -> Fut + Send + Sync + Clone + 'static,
513        Fut: Future<Output = Result<Vc<T>>> + Send,
514    {
515        let id = self.backend.create_transient_task(
516            TransientTaskType::Root(Box::new(move || {
517                let functor = functor.clone();
518                Box::pin(async move {
519                    let raw_vc = functor().await?.node;
520                    raw_vc.to_non_local().await
521                })
522            })),
523            self,
524        );
525        self.schedule(id);
526        id
527    }
528
529    pub fn dispose_root_task(&self, task_id: TaskId) {
530        self.backend.dispose_root_task(task_id, self);
531    }
532
533    // TODO make sure that all dependencies settle before reading them
534    /// Creates a new root task, that is only executed once.
535    /// Dependencies will not invalidate the task.
536    #[track_caller]
537    fn spawn_once_task<T, Fut>(&self, future: Fut)
538    where
539        T: ?Sized,
540        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
541    {
542        let id = self.backend.create_transient_task(
543            TransientTaskType::Once(Box::pin(async move {
544                let raw_vc = future.await?.node;
545                raw_vc.to_non_local().await
546            })),
547            self,
548        );
549        self.schedule(id);
550    }
551
552    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
553        &self,
554        future: impl Future<Output = Result<T>> + Send + 'static,
555    ) -> Result<T> {
556        let (tx, rx) = tokio::sync::oneshot::channel();
557        self.spawn_once_task(async move {
558            let result = future.await;
559            tx.send(result)
560                .map_err(|_| anyhow!("unable to send result"))?;
561            Ok(Completion::new())
562        });
563
564        rx.await?
565    }
566
567    #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
568    pub async fn run<T: TraceRawVcs + Send + 'static>(
569        &self,
570        future: impl Future<Output = Result<T>> + Send + 'static,
571    ) -> Result<T, TurboTasksExecutionError> {
572        self.begin_foreground_job();
573        // it's okay for execution ids to overflow and wrap, they're just used for an assert
574        let execution_id = self.execution_id_factory.wrapping_get();
575        let current_task_state =
576            Arc::new(RwLock::new(CurrentTaskState::new_temporary(execution_id)));
577
578        let result = TURBO_TASKS
579            .scope(
580                self.pin(),
581                CURRENT_TASK_STATE.scope(current_task_state, async {
582                    let result = CaptureFuture::new(future).await;
583
584                    // wait for all spawned local tasks using `local` to finish
585                    let ltt =
586                        CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_task_tracker.clone());
587                    ltt.close();
588                    ltt.wait().await;
589
590                    match result {
591                        Ok(Ok(raw_vc)) => Ok(raw_vc),
592                        Ok(Err(err)) => Err(err.into()),
593                        Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
594                    }
595                }),
596            )
597            .await;
598        self.finish_foreground_job();
599        result
600    }
601
602    pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
603        let this = self.pin();
604        tokio::spawn(async move {
605            this.pin()
606                .run_once(async move {
607                    this.finish_foreground_job();
608                    future.await;
609                    this.begin_foreground_job();
610                    Ok(())
611                })
612                .await
613                .unwrap()
614        });
615    }
616
617    pub(crate) fn native_call(
618        &self,
619        native_fn: &'static NativeFunction,
620        this: Option<RawVc>,
621        arg: Box<dyn MagicAny>,
622        persistence: TaskPersistence,
623    ) -> RawVc {
624        let task_type = CachedTaskType {
625            native_fn,
626            this,
627            arg,
628        };
629        RawVc::TaskOutput(match persistence {
630            TaskPersistence::Transient => self.backend.get_or_create_transient_task(
631                task_type,
632                current_task_if_available("turbo_function calls"),
633                self,
634            ),
635            TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
636                task_type,
637                current_task_if_available("turbo_function calls"),
638                self,
639            ),
640        })
641    }
642
643    pub fn dynamic_call(
644        &self,
645        native_fn: &'static NativeFunction,
646        this: Option<RawVc>,
647        arg: Box<dyn MagicAny>,
648        persistence: TaskPersistence,
649    ) -> RawVc {
650        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
651            return self.native_call(native_fn, this, arg, persistence);
652        }
653        let task_type = LocalTaskSpec {
654            task_type: LocalTaskType::ResolveNative { native_fn },
655            this,
656            arg,
657        };
658        self.schedule_local_task(task_type, persistence)
659    }
660
661    pub fn trait_call(
662        &self,
663        trait_method: &'static TraitMethod,
664        this: RawVc,
665        arg: Box<dyn MagicAny>,
666        persistence: TaskPersistence,
667    ) -> RawVc {
668        // avoid creating a wrapper task if self is already resolved
669        // for resolved cells we already know the value type so we can lookup the
670        // function
671        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
672            match registry::get_value_type(type_id).get_trait_method(trait_method) {
673                Some(native_fn) => {
674                    let arg = native_fn.arg_meta.filter_owned(arg);
675                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
676                }
677                None => {
678                    // We are destined to fail at this point, but we just retry resolution in the
679                    // local task since we cannot report an error from here.
680                    // TODO: A panic seems appropriate since the immediate caller is to blame
681                }
682            }
683        }
684
685        // create a wrapper task to resolve all inputs
686        let task_type = LocalTaskSpec {
687            task_type: LocalTaskType::ResolveTrait { trait_method },
688            this: Some(this),
689            arg,
690        };
691
692        self.schedule_local_task(task_type, persistence)
693    }
694
695    #[track_caller]
696    pub(crate) fn schedule(&self, task_id: TaskId) {
697        self.begin_foreground_job();
698        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
699
700        let this = self.pin();
701        let future = async move {
702            let mut schedule_again = true;
703            while schedule_again {
704                // it's okay for execution ids to overflow and wrap, they're just used for an assert
705                let execution_id = this.execution_id_factory.wrapping_get();
706                let current_task_state =
707                    Arc::new(RwLock::new(CurrentTaskState::new(task_id, execution_id)));
708                let single_execution_future = async {
709                    if this.stopped.load(Ordering::Acquire) {
710                        this.backend.task_execution_canceled(task_id, &*this);
711                        return false;
712                    }
713
714                    let Some(TaskExecutionSpec { future, span }) =
715                        this.backend.try_start_task_execution(task_id, &*this)
716                    else {
717                        return false;
718                    };
719
720                    async {
721                        let result = CaptureFuture::new(future).await;
722
723                        // wait for all spawned local tasks using `local` to finish
724                        let ltt = CURRENT_TASK_STATE
725                            .with(|ts| ts.read().unwrap().local_task_tracker.clone());
726                        ltt.close();
727                        ltt.wait().await;
728
729                        let result = match result {
730                            Ok(Ok(raw_vc)) => Ok(raw_vc),
731                            Ok(Err(err)) => Err(err.into()),
732                            Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
733                        };
734
735                        let FinishedTaskState {
736                            stateful,
737                            has_invalidator,
738                        } = this.finish_current_task_state();
739                        let cell_counters = CURRENT_TASK_STATE
740                            .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
741                        this.backend.task_execution_completed(
742                            task_id,
743                            result,
744                            &cell_counters,
745                            stateful,
746                            has_invalidator,
747                            &*this,
748                        )
749                    }
750                    .instrument(span)
751                    .await
752                };
753                schedule_again = CURRENT_TASK_STATE
754                    .scope(current_task_state, single_execution_future)
755                    .await;
756            }
757            this.finish_foreground_job();
758            anyhow::Ok(())
759        };
760
761        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
762
763        #[cfg(feature = "tokio_tracing")]
764        {
765            let description = self.backend.get_task_description(task_id);
766            tokio::task::Builder::new()
767                .name(&description)
768                .spawn(future)
769                .unwrap();
770        }
771        #[cfg(not(feature = "tokio_tracing"))]
772        tokio::task::spawn(future);
773    }
774
775    fn schedule_local_task(
776        &self,
777        ty: LocalTaskSpec,
778        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
779        persistence: TaskPersistence,
780    ) -> RawVc {
781        let task_type = ty.task_type;
782        let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
783            .with(|gts| {
784                let mut gts_write = gts.write().unwrap();
785                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
786                    done_event: Event::new(move || {
787                        move || format!("LocalTask({task_type})::done_event")
788                    }),
789                });
790                (
791                    Arc::clone(gts),
792                    gts_write.task_id,
793                    gts_write.execution_id,
794                    local_task_id,
795                )
796            });
797
798        #[cfg(feature = "tokio_tracing")]
799        let description = format!(
800            "[local] (parent: {}) {}",
801            self.backend.get_task_description(parent_task_id),
802            ty.task_type,
803        );
804        #[cfg(not(feature = "tokio_tracing"))]
805        let _ = parent_task_id; // suppress unused variable warning
806
807        let this = self.pin();
808        let future = async move {
809            let span = match &ty.task_type {
810                LocalTaskType::ResolveNative { native_fn } => native_fn.resolve_span(),
811                LocalTaskType::ResolveTrait { trait_method } => trait_method.resolve_span(),
812            };
813            async move {
814                let result = match ty.task_type {
815                    LocalTaskType::ResolveNative { native_fn } => {
816                        LocalTaskType::run_resolve_native(
817                            native_fn,
818                            ty.this,
819                            &*ty.arg,
820                            persistence,
821                            this,
822                        )
823                        .await
824                    }
825                    LocalTaskType::ResolveTrait { trait_method } => {
826                        LocalTaskType::run_resolve_trait(
827                            trait_method,
828                            ty.this.unwrap(),
829                            &*ty.arg,
830                            persistence,
831                            this,
832                        )
833                        .await
834                    }
835                };
836
837                let output = match result {
838                    Ok(raw_vc) => OutputContent::Link(raw_vc),
839                    Err(err) => OutputContent::Error(
840                        TurboTasksExecutionError::from(err).with_task_context(task_type, None),
841                    ),
842                };
843
844                let local_task = LocalTask::Done { output };
845
846                let done_event = CURRENT_TASK_STATE.with(move |gts| {
847                    let mut gts_write = gts.write().unwrap();
848                    let scheduled_task =
849                        std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
850                    let LocalTask::Scheduled { done_event } = scheduled_task else {
851                        panic!("local task finished, but was not in the scheduled state?");
852                    };
853                    done_event
854                });
855                done_event.notify(usize::MAX)
856            }
857            .instrument(span)
858            .await
859        };
860        let future = global_task_state
861            .read()
862            .unwrap()
863            .local_task_tracker
864            .track_future(future);
865        let future = CURRENT_TASK_STATE.scope(global_task_state, future);
866        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
867
868        #[cfg(feature = "tokio_tracing")]
869        tokio::task::Builder::new()
870            .name(&description)
871            .spawn(future)
872            .unwrap();
873        #[cfg(not(feature = "tokio_tracing"))]
874        tokio::task::spawn(future);
875
876        RawVc::LocalOutput(execution_id, local_task_id, persistence)
877    }
878
879    fn begin_foreground_job(&self) {
880        if self
881            .currently_scheduled_foreground_jobs
882            .fetch_add(1, Ordering::AcqRel)
883            == 0
884        {
885            *self.start.lock().unwrap() = Some(Instant::now());
886            self.event_foreground_start.notify(usize::MAX);
887            self.backend.idle_end(self);
888        }
889    }
890
891    fn finish_foreground_job(&self) {
892        if self
893            .currently_scheduled_foreground_jobs
894            .fetch_sub(1, Ordering::AcqRel)
895            == 1
896        {
897            self.backend.idle_start(self);
898            // That's not super race-condition-safe, but it's only for
899            // statistical reasons
900            let total = self.scheduled_tasks.load(Ordering::Acquire);
901            self.scheduled_tasks.store(0, Ordering::Release);
902            if let Some(start) = *self.start.lock().unwrap() {
903                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
904                if let Some(update) = update.as_mut() {
905                    update.0 += start.elapsed();
906                    update.1 += total;
907                } else {
908                    *update = Some((start.elapsed(), total));
909                }
910            }
911            self.event_foreground_done.notify(usize::MAX);
912        }
913    }
914
915    fn begin_background_job(&self) {
916        self.currently_scheduled_background_jobs
917            .fetch_add(1, Ordering::Relaxed);
918    }
919
920    fn finish_background_job(&self) {
921        if self
922            .currently_scheduled_background_jobs
923            .fetch_sub(1, Ordering::Relaxed)
924            == 1
925        {
926            self.event_background_done.notify(usize::MAX);
927        }
928    }
929
930    pub fn get_in_progress_count(&self) -> usize {
931        self.currently_scheduled_foreground_jobs
932            .load(Ordering::Acquire)
933    }
934
935    /// Waits for the given task to finish executing. This works by performing an untracked read,
936    /// and discarding the value of the task output.
937    ///
938    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
939    /// before all dependencies have completely settled.
940    ///
941    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
942    /// to fully settle before returning.
943    ///
944    /// As this function is typically called in top-level code that waits for results to be ready
945    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
946    pub async fn wait_task_completion(
947        &self,
948        id: TaskId,
949        consistency: ReadConsistency,
950    ) -> Result<()> {
951        read_task_output(
952            self,
953            id,
954            ReadOutputOptions {
955                // INVALIDATION: This doesn't return a value, only waits for it to be ready.
956                tracking: ReadTracking::Untracked,
957                consistency,
958            },
959        )
960        .await?;
961        Ok(())
962    }
963
964    /// Returns [UpdateInfo] with all updates aggregated over a given duration
965    /// (`aggregation`). Will wait until an update happens.
966    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
967        self.aggregated_update_info(aggregation, Duration::MAX)
968            .await
969            .unwrap()
970    }
971
972    /// Returns [UpdateInfo] with all updates aggregated over a given duration
973    /// (`aggregation`). Will only return None when the timeout is reached while
974    /// waiting for the first update.
975    pub async fn aggregated_update_info(
976        &self,
977        aggregation: Duration,
978        timeout: Duration,
979    ) -> Option<UpdateInfo> {
980        let listener = self
981            .event_foreground_done
982            .listen_with_note(|| || "wait for update info".to_string());
983        let wait_for_finish = {
984            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
985            if aggregation.is_zero() {
986                if let Some((duration, tasks)) = update.take() {
987                    return Some(UpdateInfo {
988                        duration,
989                        tasks,
990                        reasons: take(reason_set),
991                        placeholder_for_future_fields: (),
992                    });
993                } else {
994                    true
995                }
996            } else {
997                update.is_none()
998            }
999        };
1000        if wait_for_finish {
1001            if timeout == Duration::MAX {
1002                // wait for finish
1003                listener.await;
1004            } else {
1005                // wait for start, then wait for finish or timeout
1006                let start_listener = self
1007                    .event_foreground_start
1008                    .listen_with_note(|| || "wait for update info".to_string());
1009                if self
1010                    .currently_scheduled_foreground_jobs
1011                    .load(Ordering::Acquire)
1012                    == 0
1013                {
1014                    start_listener.await;
1015                } else {
1016                    drop(start_listener);
1017                }
1018                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1019                    // Timeout
1020                    return None;
1021                }
1022            }
1023        }
1024        if !aggregation.is_zero() {
1025            loop {
1026                select! {
1027                    () = tokio::time::sleep(aggregation) => {
1028                        break;
1029                    }
1030                    () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1031                        // Resets the sleep
1032                    }
1033                }
1034            }
1035        }
1036        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1037        if let Some((duration, tasks)) = update.take() {
1038            Some(UpdateInfo {
1039                duration,
1040                tasks,
1041                reasons: take(reason_set),
1042                placeholder_for_future_fields: (),
1043            })
1044        } else {
1045            panic!("aggregated_update_info must not called concurrently")
1046        }
1047    }
1048
1049    pub async fn wait_background_done(&self) {
1050        let listener = self.event_background_done.listen();
1051        if self
1052            .currently_scheduled_background_jobs
1053            .load(Ordering::Acquire)
1054            != 0
1055        {
1056            listener.await;
1057        }
1058    }
1059
1060    pub async fn stop_and_wait(&self) {
1061        turbo_tasks_future_scope(self.pin(), async move {
1062            self.backend.stopping(self);
1063            self.stopped.store(true, Ordering::Release);
1064            {
1065                let listener = self
1066                    .event_foreground_done
1067                    .listen_with_note(|| || "wait for stop".to_string());
1068                if self
1069                    .currently_scheduled_foreground_jobs
1070                    .load(Ordering::Acquire)
1071                    != 0
1072                {
1073                    listener.await;
1074                }
1075            }
1076            {
1077                let listener = self.event_background_done.listen();
1078                if self
1079                    .currently_scheduled_background_jobs
1080                    .load(Ordering::Acquire)
1081                    != 0
1082                {
1083                    listener.await;
1084                }
1085            }
1086            self.backend.stop(self);
1087        })
1088        .await;
1089    }
1090
1091    #[track_caller]
1092    pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1093    where
1094        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1095        T::CallOnceFuture: Send,
1096    {
1097        let mut this = self.pin();
1098        this.begin_foreground_job();
1099        tokio::spawn(
1100            TURBO_TASKS
1101                .scope(this.clone(), async move {
1102                    if !this.stopped.load(Ordering::Acquire) {
1103                        this = func(this.clone()).await;
1104                    }
1105                    this.finish_foreground_job();
1106                })
1107                .in_current_span(),
1108        );
1109    }
1110
1111    #[track_caller]
1112    pub(crate) fn schedule_background_job<T>(&self, func: T)
1113    where
1114        T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1115        T::CallOnceFuture: Send,
1116    {
1117        let mut this = self.pin();
1118        self.begin_background_job();
1119        tokio::spawn(
1120            TURBO_TASKS
1121                .scope(this.clone(), async move {
1122                    if !this.stopped.load(Ordering::Acquire) {
1123                        this = func(this).await;
1124                    }
1125                    this.finish_background_job();
1126                })
1127                .in_current_span(),
1128        );
1129    }
1130
1131    fn finish_current_task_state(&self) -> FinishedTaskState {
1132        let (stateful, has_invalidator) = CURRENT_TASK_STATE.with(|cell| {
1133            let CurrentTaskState {
1134                stateful,
1135                has_invalidator,
1136                ..
1137            } = &mut *cell.write().unwrap();
1138            (*stateful, *has_invalidator)
1139        });
1140
1141        FinishedTaskState {
1142            stateful,
1143            has_invalidator,
1144        }
1145    }
1146
1147    pub fn backend(&self) -> &B {
1148        &self.backend
1149    }
1150}
1151
1152struct FinishedTaskState {
1153    /// True if the task has state in cells
1154    stateful: bool,
1155
1156    /// True if the task uses an external invalidator
1157    has_invalidator: bool,
1158}
1159
1160impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1161    fn dynamic_call(
1162        &self,
1163        native_fn: &'static NativeFunction,
1164        this: Option<RawVc>,
1165        arg: Box<dyn MagicAny>,
1166        persistence: TaskPersistence,
1167    ) -> RawVc {
1168        self.dynamic_call(native_fn, this, arg, persistence)
1169    }
1170    fn native_call(
1171        &self,
1172        native_fn: &'static NativeFunction,
1173        this: Option<RawVc>,
1174        arg: Box<dyn MagicAny>,
1175        persistence: TaskPersistence,
1176    ) -> RawVc {
1177        self.native_call(native_fn, this, arg, persistence)
1178    }
1179    fn trait_call(
1180        &self,
1181        trait_method: &'static TraitMethod,
1182        this: RawVc,
1183        arg: Box<dyn MagicAny>,
1184        persistence: TaskPersistence,
1185    ) -> RawVc {
1186        self.trait_call(trait_method, this, arg, persistence)
1187    }
1188
1189    #[track_caller]
1190    fn run(
1191        &self,
1192        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1193    ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1194        let this = self.pin();
1195        Box::pin(async move { this.run(future).await })
1196    }
1197
1198    #[track_caller]
1199    fn run_once(
1200        &self,
1201        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1202    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1203        let this = self.pin();
1204        Box::pin(async move { this.run_once(future).await })
1205    }
1206
1207    #[track_caller]
1208    fn run_once_with_reason(
1209        &self,
1210        reason: StaticOrArc<dyn InvalidationReason>,
1211        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1212    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1213        {
1214            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1215            reason_set.insert(reason);
1216        }
1217        let this = self.pin();
1218        Box::pin(async move { this.run_once(future).await })
1219    }
1220
1221    #[track_caller]
1222    fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1223        self.start_once_process(future)
1224    }
1225}
1226
1227impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1228    #[instrument(level = "info", skip_all, name = "invalidate")]
1229    fn invalidate(&self, task: TaskId) {
1230        self.backend.invalidate_task(task, self);
1231    }
1232
1233    #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1234    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1235        {
1236            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1237            reason_set.insert(reason);
1238        }
1239        self.backend.invalidate_task(task, self);
1240    }
1241
1242    fn invalidate_serialization(&self, task: TaskId) {
1243        self.backend.invalidate_serialization(task, self);
1244    }
1245
1246    fn try_read_task_output(
1247        &self,
1248        task: TaskId,
1249        options: ReadOutputOptions,
1250    ) -> Result<Result<RawVc, EventListener>> {
1251        self.backend.try_read_task_output(
1252            task,
1253            current_task_if_available("reading Vcs"),
1254            options,
1255            self,
1256        )
1257    }
1258
1259    fn try_read_task_cell(
1260        &self,
1261        task: TaskId,
1262        index: CellId,
1263        options: ReadCellOptions,
1264    ) -> Result<Result<TypedCellContent, EventListener>> {
1265        self.backend.try_read_task_cell(
1266            task,
1267            index,
1268            current_task_if_available("reading Vcs"),
1269            options,
1270            self,
1271        )
1272    }
1273
1274    fn try_read_own_task_cell(
1275        &self,
1276        current_task: TaskId,
1277        index: CellId,
1278        options: ReadCellOptions,
1279    ) -> Result<TypedCellContent> {
1280        self.backend
1281            .try_read_own_task_cell(current_task, index, options, self)
1282    }
1283
1284    fn try_read_local_output(
1285        &self,
1286        execution_id: ExecutionId,
1287        local_task_id: LocalTaskId,
1288    ) -> Result<Result<RawVc, EventListener>> {
1289        CURRENT_TASK_STATE.with(|gts| {
1290            let gts_read = gts.read().unwrap();
1291
1292            // Local Vcs are local to their parent task's current execution, and do not exist
1293            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1294            // marker trait. This assertion exists to handle any potential escapes that the
1295            // compile-time checks cannot capture.
1296            gts_read.assert_execution_id(execution_id);
1297
1298            match gts_read.get_local_task(local_task_id) {
1299                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1300                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1301            }
1302        })
1303    }
1304
1305    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1306        self.backend.read_task_collectibles(
1307            task,
1308            trait_id,
1309            current_task_if_available("reading collectibles"),
1310            self,
1311        )
1312    }
1313
1314    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1315        self.backend.emit_collectible(
1316            trait_type,
1317            collectible,
1318            current_task("emitting collectible"),
1319            self,
1320        );
1321    }
1322
1323    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1324        self.backend.unemit_collectible(
1325            trait_type,
1326            collectible,
1327            count,
1328            current_task("emitting collectible"),
1329            self,
1330        );
1331    }
1332
1333    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1334        for (&collectible, &count) in collectibles {
1335            if count > 0 {
1336                self.backend.unemit_collectible(
1337                    trait_type,
1338                    collectible,
1339                    count as u32,
1340                    current_task("emitting collectible"),
1341                    self,
1342                );
1343            }
1344        }
1345    }
1346
1347    fn read_own_task_cell(
1348        &self,
1349        task: TaskId,
1350        index: CellId,
1351        options: ReadCellOptions,
1352    ) -> Result<TypedCellContent> {
1353        self.try_read_own_task_cell(task, index, options)
1354    }
1355
1356    fn update_own_task_cell(
1357        &self,
1358        task: TaskId,
1359        index: CellId,
1360        is_serializable_cell_content: bool,
1361        content: CellContent,
1362        verification_mode: VerificationMode,
1363    ) {
1364        self.backend.update_task_cell(
1365            task,
1366            index,
1367            is_serializable_cell_content,
1368            content,
1369            verification_mode,
1370            self,
1371        );
1372    }
1373
1374    fn connect_task(&self, task: TaskId) {
1375        self.backend
1376            .connect_task(task, current_task_if_available("connecting task"), self);
1377    }
1378
1379    fn mark_own_task_as_finished(&self, task: TaskId) {
1380        self.backend.mark_own_task_as_finished(task, self);
1381    }
1382
1383    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1384        self.backend
1385            .set_own_task_aggregation_number(task, aggregation_number, self);
1386    }
1387
1388    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1389        self.backend.mark_own_task_as_session_dependent(task, self);
1390    }
1391
1392    /// Creates a future that inherits the current task id and task state. The current global task
1393    /// will wait for this future to be dropped before exiting.
1394    fn detached_for_testing(
1395        &self,
1396        fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1397    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1398        // this is similar to what happens for a local task, except that we keep the local task's
1399        // state as well.
1400        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1401        let tracked_fut = {
1402            let ts = global_task_state.read().unwrap();
1403            ts.local_task_tracker.track_future(fut)
1404        };
1405        Box::pin(TURBO_TASKS.scope(
1406            turbo_tasks(),
1407            CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1408        ))
1409    }
1410
1411    fn task_statistics(&self) -> &TaskStatisticsApi {
1412        self.backend.task_statistics()
1413    }
1414
1415    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1416        let this = self.pin();
1417        Box::pin(async move {
1418            this.stop_and_wait().await;
1419        })
1420    }
1421
1422    fn subscribe_to_compilation_events(
1423        &self,
1424        event_types: Option<Vec<String>>,
1425    ) -> Receiver<Arc<dyn CompilationEvent>> {
1426        self.compilation_events.subscribe(event_types)
1427    }
1428
1429    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1430        if let Err(e) = self.compilation_events.send(event) {
1431            tracing::warn!("Failed to send compilation event: {e}");
1432        }
1433    }
1434
1435    fn is_tracking_dependencies(&self) -> bool {
1436        self.backend.is_tracking_dependencies()
1437    }
1438}
1439
1440impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1441    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1442        self.pin()
1443    }
1444    fn backend(&self) -> &B {
1445        &self.backend
1446    }
1447
1448    #[track_caller]
1449    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1450        self.schedule_background_job(async move |this| {
1451            this.backend.run_backend_job(job, &*this).await;
1452            this
1453        })
1454    }
1455
1456    #[track_caller]
1457    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1458        self.schedule_foreground_job(async move |this| {
1459            this.backend.run_backend_job(job, &*this).await;
1460            this
1461        })
1462    }
1463
1464    #[track_caller]
1465    fn schedule(&self, task: TaskId) {
1466        self.schedule(task)
1467    }
1468
1469    fn program_duration_until(&self, instant: Instant) -> Duration {
1470        instant - self.program_start
1471    }
1472
1473    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1474        // SAFETY: This is a fresh id from the factory
1475        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1476    }
1477
1478    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1479        // SAFETY: This is a fresh id from the factory
1480        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1481    }
1482
1483    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1484        unsafe { self.task_id_factory.reuse(id.into()) }
1485    }
1486
1487    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1488        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1489    }
1490
1491    fn is_idle(&self) -> bool {
1492        self.currently_scheduled_foreground_jobs
1493            .load(Ordering::Acquire)
1494            == 0
1495    }
1496}
1497
1498pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1499    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1500        Ok(id) => id,
1501        Err(_) => panic!(
1502            "{from} can only be used in the context of a turbo_tasks task execution or \
1503             turbo_tasks run"
1504        ),
1505    }
1506}
1507
1508pub(crate) fn current_task(from: &str) -> TaskId {
1509    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1510        Ok(Some(id)) => id,
1511        Ok(None) | Err(_) => {
1512            panic!("{from} can only be used in the context of a turbo_tasks task execution")
1513        }
1514    }
1515}
1516
1517pub async fn run<T: Send + 'static>(
1518    tt: Arc<dyn TurboTasksApi>,
1519    future: impl Future<Output = Result<T>> + Send + 'static,
1520) -> Result<T> {
1521    let (tx, rx) = tokio::sync::oneshot::channel();
1522
1523    tt.run(Box::pin(async move {
1524        let result = future.await?;
1525        tx.send(result)
1526            .map_err(|_| anyhow!("unable to send result"))?;
1527        Ok(())
1528    }))
1529    .await?;
1530
1531    Ok(rx.await?)
1532}
1533
1534pub async fn run_once<T: Send + 'static>(
1535    tt: Arc<dyn TurboTasksApi>,
1536    future: impl Future<Output = Result<T>> + Send + 'static,
1537) -> Result<T> {
1538    let (tx, rx) = tokio::sync::oneshot::channel();
1539
1540    tt.run_once(Box::pin(async move {
1541        let result = future.await?;
1542        tx.send(result)
1543            .map_err(|_| anyhow!("unable to send result"))?;
1544        Ok(())
1545    }))
1546    .await?;
1547
1548    Ok(rx.await?)
1549}
1550
1551pub async fn run_once_with_reason<T: Send + 'static>(
1552    tt: Arc<dyn TurboTasksApi>,
1553    reason: impl InvalidationReason,
1554    future: impl Future<Output = Result<T>> + Send + 'static,
1555) -> Result<T> {
1556    let (tx, rx) = tokio::sync::oneshot::channel();
1557
1558    tt.run_once_with_reason(
1559        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1560        Box::pin(async move {
1561            let result = future.await?;
1562            tx.send(result)
1563                .map_err(|_| anyhow!("unable to send result"))?;
1564            Ok(())
1565        }),
1566    )
1567    .await?;
1568
1569    Ok(rx.await?)
1570}
1571
1572/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1573pub fn dynamic_call(
1574    func: &'static NativeFunction,
1575    this: Option<RawVc>,
1576    arg: Box<dyn MagicAny>,
1577    persistence: TaskPersistence,
1578) -> RawVc {
1579    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1580}
1581
1582/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1583pub fn trait_call(
1584    trait_method: &'static TraitMethod,
1585    this: RawVc,
1586    arg: Box<dyn MagicAny>,
1587    persistence: TaskPersistence,
1588) -> RawVc {
1589    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1590}
1591
1592pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1593    TURBO_TASKS.with(|arc| arc.clone())
1594}
1595
1596pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1597    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1598}
1599
1600pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1601    TURBO_TASKS.with(|arc| func(arc))
1602}
1603
1604pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1605    TURBO_TASKS.sync_scope(tt, f)
1606}
1607
1608pub fn turbo_tasks_future_scope<T>(
1609    tt: Arc<dyn TurboTasksApi>,
1610    f: impl Future<Output = T>,
1611) -> impl Future<Output = T> {
1612    TURBO_TASKS.scope(tt, f)
1613}
1614
1615pub fn with_turbo_tasks_for_testing<T>(
1616    tt: Arc<dyn TurboTasksApi>,
1617    current_task: TaskId,
1618    execution_id: ExecutionId,
1619    f: impl Future<Output = T>,
1620) -> impl Future<Output = T> {
1621    TURBO_TASKS.scope(
1622        tt,
1623        CURRENT_TASK_STATE.scope(
1624            Arc::new(RwLock::new(CurrentTaskState::new(
1625                current_task,
1626                execution_id,
1627            ))),
1628            f,
1629        ),
1630    )
1631}
1632
1633/// Spawns the given future within the context of the current task.
1634///
1635/// Beware: this method is not safe to use in production code. It is only
1636/// intended for use in tests and for debugging purposes.
1637pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1638    tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1639}
1640
1641pub fn current_task_for_testing() -> Option<TaskId> {
1642    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1643}
1644
1645/// Marks the current task as dirty when restored from filesystem cache.
1646pub fn mark_session_dependent() {
1647    with_turbo_tasks(|tt| {
1648        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1649    });
1650}
1651
1652/// Marks the current task as a root in the aggregation graph.  This means it starts with the
1653/// correct aggregation number instead of needing to recompute it after the fact.
1654pub fn mark_root() {
1655    with_turbo_tasks(|tt| {
1656        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1657    });
1658}
1659
1660/// Marks the current task as finished. This excludes it from waiting for
1661/// strongly consistency.
1662pub fn mark_finished() {
1663    with_turbo_tasks(|tt| {
1664        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1665    });
1666}
1667
1668/// Marks the current task as stateful. This prevents the tasks from being
1669/// dropped without persisting the state.
1670///
1671/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1672/// serialization of the current task cells
1673pub fn mark_stateful() -> SerializationInvalidator {
1674    CURRENT_TASK_STATE.with(|cell| {
1675        let CurrentTaskState {
1676            stateful, task_id, ..
1677        } = &mut *cell.write().unwrap();
1678        *stateful = true;
1679        let Some(task_id) = *task_id else {
1680            panic!(
1681                "mark_stateful() can only be used in the context of a turbo_tasks task execution"
1682            );
1683        };
1684        SerializationInvalidator::new(task_id)
1685    })
1686}
1687
1688pub fn mark_invalidator() {
1689    CURRENT_TASK_STATE.with(|cell| {
1690        let CurrentTaskState {
1691            has_invalidator, ..
1692        } = &mut *cell.write().unwrap();
1693        *has_invalidator = true;
1694    })
1695}
1696
1697pub fn prevent_gc() {
1698    // There is a hack in UpdateCellOperation that need to be updated when this is changed.
1699    mark_stateful();
1700}
1701
1702pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1703    with_turbo_tasks(|tt| {
1704        let raw_vc = collectible.node.node;
1705        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1706    })
1707}
1708
1709pub(crate) async fn read_task_output(
1710    this: &dyn TurboTasksApi,
1711    id: TaskId,
1712    options: ReadOutputOptions,
1713) -> Result<RawVc> {
1714    loop {
1715        match this.try_read_task_output(id, options)? {
1716            Ok(result) => return Ok(result),
1717            Err(listener) => listener.await,
1718        }
1719    }
1720}
1721
1722pub(crate) async fn read_task_cell(
1723    this: &dyn TurboTasksApi,
1724    id: TaskId,
1725    index: CellId,
1726    options: ReadCellOptions,
1727) -> Result<TypedCellContent> {
1728    loop {
1729        match this.try_read_task_cell(id, index, options)? {
1730            Ok(result) => return Ok(result),
1731            Err(listener) => listener.await,
1732        }
1733    }
1734}
1735
1736/// A reference to a task's cell with methods that allow updating the contents
1737/// of the cell.
1738///
1739/// Mutations should not outside of the task that that owns this cell. Doing so
1740/// is a logic error, and may lead to incorrect caching behavior.
1741#[derive(Clone, Copy)]
1742pub struct CurrentCellRef {
1743    current_task: TaskId,
1744    index: CellId,
1745    is_serializable_cell_content: bool,
1746}
1747
1748type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1749
1750impl CurrentCellRef {
1751    /// Updates the cell if the given `functor` returns a value.
1752    pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1753    where
1754        T: VcValueType,
1755    {
1756        self.conditional_update_with_shared_reference(|old_shared_reference| {
1757            let old_ref = old_shared_reference
1758                .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1759                .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1760            let new_value = functor(old_ref)?;
1761            Some(SharedReference::new(triomphe::Arc::new(
1762                <T::Read as VcRead<T>>::value_to_repr(new_value),
1763            )))
1764        })
1765    }
1766
1767    /// Updates the cell if the given `functor` returns a `SharedReference`.
1768    pub fn conditional_update_with_shared_reference(
1769        &self,
1770        functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1771    ) {
1772        let tt = turbo_tasks();
1773        let cell_content = tt
1774            .read_own_task_cell(
1775                self.current_task,
1776                self.index,
1777                ReadCellOptions {
1778                    // INVALIDATION: Reading our own cell must be untracked
1779                    tracking: ReadTracking::Untracked,
1780                    is_serializable_cell_content: self.is_serializable_cell_content,
1781                    final_read_hint: false,
1782                },
1783            )
1784            .ok();
1785        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1786        if let Some(update) = update {
1787            tt.update_own_task_cell(
1788                self.current_task,
1789                self.index,
1790                self.is_serializable_cell_content,
1791                CellContent(Some(update)),
1792                VerificationMode::EqualityCheck,
1793            )
1794        }
1795    }
1796
1797    /// Replace the current cell's content with `new_value` if the current content is not equal by
1798    /// value with the existing content.
1799    ///
1800    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1801    ///
1802    /// Take this example of a custom equality implementation on a transparent wrapper type:
1803    ///
1804    /// ```
1805    /// #[turbo_tasks::value(transparent, eq = "manual")]
1806    /// struct Wrapper(Vec<u32>);
1807    ///
1808    /// impl PartialEq for Wrapper {
1809    ///     fn eq(&self, other: Wrapper) {
1810    ///         // Example: order doesn't matter for equality
1811    ///         let (mut this, mut other) = (self.clone(), other.clone());
1812    ///         this.sort_unstable();
1813    ///         other.sort_unstable();
1814    ///         this == other
1815    ///     }
1816    /// }
1817    ///
1818    /// impl Eq for Wrapper {}
1819    /// ```
1820    ///
1821    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
1822    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
1823    ///
1824    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
1825    /// just forwards to the inner value's [`PartialEq`].
1826    ///
1827    /// If you already have a `SharedReference`, consider calling
1828    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
1829    /// object.
1830    pub fn compare_and_update<T>(&self, new_value: T)
1831    where
1832        T: PartialEq + VcValueType,
1833    {
1834        self.conditional_update(|old_value| {
1835            if let Some(old_value) = old_value
1836                && old_value == &new_value
1837            {
1838                return None;
1839            }
1840            Some(new_value)
1841        });
1842    }
1843
1844    /// Replace the current cell's content with `new_shared_reference` if the
1845    /// current content is not equal by value with the existing content.
1846    ///
1847    /// If you already have a `SharedReference`, this is a faster version of
1848    /// [`CurrentCellRef::compare_and_update`].
1849    ///
1850    /// The [`SharedReference`] is expected to use the `<T::Read as
1851    /// VcRead<T>>::Repr` type for its representation of the value.
1852    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1853    where
1854        T: VcValueType + PartialEq,
1855    {
1856        fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1857            <T::Read as VcRead<T>>::repr_to_value_ref(
1858                sr.0.downcast_ref::<VcReadRepr<T>>()
1859                    .expect("cannot update SharedReference of different type"),
1860            )
1861        }
1862        self.conditional_update_with_shared_reference(|old_sr| {
1863            if let Some(old_sr) = old_sr {
1864                let old_value: &T = extract_sr_value(old_sr);
1865                let new_value = extract_sr_value(&new_shared_reference);
1866                if old_value == new_value {
1867                    return None;
1868                }
1869            }
1870            Some(new_shared_reference)
1871        });
1872    }
1873
1874    /// Unconditionally updates the content of the cell.
1875    pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
1876    where
1877        T: VcValueType,
1878    {
1879        let tt = turbo_tasks();
1880        tt.update_own_task_cell(
1881            self.current_task,
1882            self.index,
1883            self.is_serializable_cell_content,
1884            CellContent(Some(SharedReference::new(triomphe::Arc::new(
1885                <T::Read as VcRead<T>>::value_to_repr(new_value),
1886            )))),
1887            verification_mode,
1888        )
1889    }
1890
1891    /// A faster version of [`Self::update`] if you already have a
1892    /// [`SharedReference`].
1893    ///
1894    /// If the passed-in [`SharedReference`] is the same as the existing cell's
1895    /// by identity, no update is performed.
1896    ///
1897    /// The [`SharedReference`] is expected to use the `<T::Read as
1898    /// VcRead<T>>::Repr` type for its representation of the value.
1899    pub fn update_with_shared_reference(
1900        &self,
1901        shared_ref: SharedReference,
1902        verification_mode: VerificationMode,
1903    ) {
1904        let tt = turbo_tasks();
1905        let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
1906            let content = tt
1907                .read_own_task_cell(
1908                    self.current_task,
1909                    self.index,
1910                    ReadCellOptions {
1911                        // INVALIDATION: Reading our own cell must be untracked
1912                        tracking: ReadTracking::Untracked,
1913                        is_serializable_cell_content: self.is_serializable_cell_content,
1914                        final_read_hint: false,
1915                    },
1916                )
1917                .ok();
1918            if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1919                // pointer equality (not value equality)
1920                shared_ref_exp != shared_ref
1921            } else {
1922                true
1923            }
1924        } else {
1925            true
1926        };
1927        if update {
1928            tt.update_own_task_cell(
1929                self.current_task,
1930                self.index,
1931                self.is_serializable_cell_content,
1932                CellContent(Some(shared_ref)),
1933                verification_mode,
1934            )
1935        }
1936    }
1937}
1938
1939impl From<CurrentCellRef> for RawVc {
1940    fn from(cell: CurrentCellRef) -> Self {
1941        RawVc::TaskCell(cell.current_task, cell.index)
1942    }
1943}
1944
1945pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
1946    find_cell_by_id(T::get_value_type_id(), T::has_serialization())
1947}
1948
1949pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
1950    CURRENT_TASK_STATE.with(|ts| {
1951        let current_task = current_task("celling turbo_tasks values");
1952        let mut ts = ts.write().unwrap();
1953        let map = ts.cell_counters.as_mut().unwrap();
1954        let current_index = map.entry(ty).or_default();
1955        let index = *current_index;
1956        *current_index += 1;
1957        CurrentCellRef {
1958            current_task,
1959            index: CellId { type_id: ty, index },
1960            is_serializable_cell_content,
1961        }
1962    })
1963}
1964
1965pub(crate) async fn read_local_output(
1966    this: &dyn TurboTasksApi,
1967    execution_id: ExecutionId,
1968    local_task_id: LocalTaskId,
1969) -> Result<RawVc> {
1970    loop {
1971        match this.try_read_local_output(execution_id, local_task_id)? {
1972            Ok(raw_vc) => return Ok(raw_vc),
1973            Err(event_listener) => event_listener.await,
1974        }
1975    }
1976}