turbo_tasks/
manager.rs

1use std::{
2    any::Any,
3    future::Future,
4    hash::BuildHasherDefault,
5    mem::take,
6    pin::Pin,
7    sync::{
8        Arc, Mutex, RwLock, Weak,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11    time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use rustc_hash::FxHasher;
17use serde::{Deserialize, Serialize};
18use smallvec::SmallVec;
19use tokio::{select, sync::mpsc::Receiver, task_local};
20use tokio_util::task::TaskTracker;
21use tracing::{Instrument, Level, instrument, trace_span};
22
23use crate::{
24    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
25    ResolvedVc, SharedReference, TaskId, TaskIdSet, TraitMethod, ValueTypeId, Vc, VcRead,
26    VcValueTrait, VcValueType,
27    backend::{
28        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
29        TransientTaskType, TurboTasksExecutionError, TypedCellContent,
30    },
31    capture_future::CaptureFuture,
32    event::{Event, EventListener},
33    id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
34    id_factory::IdFactoryWithReuse,
35    macro_helpers::NativeFunction,
36    magic_any::MagicAny,
37    message_queue::{CompilationEvent, CompilationEventQueue},
38    raw_vc::{CellId, RawVc},
39    registry,
40    serialization_invalidation::SerializationInvalidator,
41    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
42    task_statistics::TaskStatisticsApi,
43    trace::TraceRawVcs,
44    util::{IdFactory, StaticOrArc},
45    vc::ReadVcFuture,
46};
47
48/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
49/// tasks from function calls.
50pub trait TurboTasksCallApi: Sync + Send {
51    /// Calls a native function with arguments. Resolves arguments when needed
52    /// with a wrapper task.
53    fn dynamic_call(
54        &self,
55        native_fn: &'static NativeFunction,
56        this: Option<RawVc>,
57        arg: Box<dyn MagicAny>,
58        persistence: TaskPersistence,
59    ) -> RawVc;
60    /// Call a native function with arguments.
61    /// All inputs must be resolved.
62    fn native_call(
63        &self,
64        native_fn: &'static NativeFunction,
65        this: Option<RawVc>,
66        arg: Box<dyn MagicAny>,
67        persistence: TaskPersistence,
68    ) -> RawVc;
69    /// Calls a trait method with arguments. First input is the `self` object.
70    /// Uses a wrapper task to resolve
71    fn trait_call(
72        &self,
73        trait_method: &'static TraitMethod,
74        this: RawVc,
75        arg: Box<dyn MagicAny>,
76        persistence: TaskPersistence,
77    ) -> RawVc;
78
79    fn run_once(
80        &self,
81        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
82    ) -> TaskId;
83    fn run_once_with_reason(
84        &self,
85        reason: StaticOrArc<dyn InvalidationReason>,
86        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
87    ) -> TaskId;
88    fn run_once_process(
89        &self,
90        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
91    ) -> TaskId;
92}
93
94/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
95/// context. Returned by the [`turbo_tasks`] helper function.
96///
97/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
98/// parameter.
99pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
100    fn invalidate(&self, task: TaskId);
101    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
102
103    fn invalidate_serialization(&self, task: TaskId);
104
105    /// Eagerly notifies all tasks that were scheduled for notifications via
106    /// `schedule_notify_tasks_set()`
107    fn notify_scheduled_tasks(&self);
108
109    fn try_read_task_output(
110        &self,
111        task: TaskId,
112        consistency: ReadConsistency,
113    ) -> Result<Result<RawVc, EventListener>>;
114
115    /// INVALIDATION: Be careful with this, it will not track dependencies, so
116    /// using it could break cache invalidation.
117    fn try_read_task_output_untracked(
118        &self,
119        task: TaskId,
120        consistency: ReadConsistency,
121    ) -> Result<Result<RawVc, EventListener>>;
122
123    fn try_read_task_cell(
124        &self,
125        task: TaskId,
126        index: CellId,
127        options: ReadCellOptions,
128    ) -> Result<Result<TypedCellContent, EventListener>>;
129
130    /// INVALIDATION: Be careful with this, it will not track dependencies, so
131    /// using it could break cache invalidation.
132    fn try_read_task_cell_untracked(
133        &self,
134        task: TaskId,
135        index: CellId,
136        options: ReadCellOptions,
137    ) -> Result<Result<TypedCellContent, EventListener>>;
138
139    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
140    /// task points to.
141    ///
142    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
143    /// recursively or in a loop.
144    ///
145    /// This does not accept a consistency argument, as you cannot control consistency of a read of
146    /// an operation owned by your own task. Strongly consistent reads are only allowed on
147    /// [`OperationVc`]s, which should never be local tasks.
148    ///
149    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
150    /// task to depend on itself.
151    ///
152    /// [`OperationVc`]: crate::OperationVc
153    fn try_read_local_output(
154        &self,
155        execution_id: ExecutionId,
156        local_task_id: LocalTaskId,
157    ) -> Result<Result<RawVc, EventListener>>;
158
159    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
160
161    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
162    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
163    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
164
165    /// INVALIDATION: Be careful with this, it will not track dependencies, so
166    /// using it could break cache invalidation.
167    fn try_read_own_task_cell_untracked(
168        &self,
169        current_task: TaskId,
170        index: CellId,
171        options: ReadCellOptions,
172    ) -> Result<TypedCellContent>;
173
174    fn read_own_task_cell(
175        &self,
176        task: TaskId,
177        index: CellId,
178        options: ReadCellOptions,
179    ) -> Result<TypedCellContent>;
180    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
181    fn mark_own_task_as_finished(&self, task: TaskId);
182    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
183    fn mark_own_task_as_session_dependent(&self, task: TaskId);
184
185    fn connect_task(&self, task: TaskId);
186
187    /// Wraps the given future in the current task.
188    ///
189    /// Beware: this method is not safe to use in production code. It is only intended for use in
190    /// tests and for debugging purposes.
191    fn detached_for_testing(
192        &self,
193        f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
194    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
195
196    fn task_statistics(&self) -> &TaskStatisticsApi;
197
198    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
199
200    fn subscribe_to_compilation_events(
201        &self,
202        event_types: Option<Vec<String>>,
203    ) -> Receiver<Arc<dyn CompilationEvent>>;
204    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
205}
206
207/// A wrapper around a value that is unused.
208pub struct Unused<T> {
209    inner: T,
210}
211
212impl<T> Unused<T> {
213    /// Creates a new unused value.
214    ///
215    /// # Safety
216    ///
217    /// The wrapped value must not be used.
218    pub unsafe fn new_unchecked(inner: T) -> Self {
219        Self { inner }
220    }
221
222    /// Get the inner value, without consuming the `Unused` wrapper.
223    ///
224    /// # Safety
225    ///
226    /// The user need to make sure that the value stays unused.
227    pub unsafe fn get_unchecked(&self) -> &T {
228        &self.inner
229    }
230
231    /// Unwraps the value, consuming the `Unused` wrapper.
232    pub fn into(self) -> T {
233        self.inner
234    }
235}
236
237/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
238pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
239    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
240
241    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
242    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
243    /// # Safety
244    ///
245    /// The caller must ensure that the task id is not used anymore.
246    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
247    /// # Safety
248    ///
249    /// The caller must ensure that the task id is not used anymore.
250    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
251
252    fn schedule(&self, task: TaskId);
253    fn schedule_backend_background_job(&self, job: B::BackendJob);
254    fn schedule_backend_foreground_job(&self, job: B::BackendJob);
255
256    fn try_foreground_done(&self) -> Result<(), EventListener>;
257    fn wait_foreground_done_excluding_own<'a>(
258        &'a self,
259    ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>>;
260
261    /// Enqueues tasks for notification of changed dependencies. This will
262    /// eventually call `invalidate_tasks()` on all tasks.
263    fn schedule_notify_tasks(&self, tasks: &[TaskId]);
264
265    /// Enqueues tasks for notification of changed dependencies. This will
266    /// eventually call `invalidate_tasks()` on all tasks.
267    fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet);
268
269    /// Returns the duration from the start of the program to the given instant.
270    fn program_duration_until(&self, instant: Instant) -> Duration;
271
272    /// An untyped object-safe version of [`TurboTasksBackendApiExt::read_task_state`]. Callers
273    /// should prefer the extension trait's version of this method.
274    fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState));
275
276    /// An untyped object-safe version of [`TurboTasksBackendApiExt::write_task_state`]. Callers
277    /// should prefer the extension trait's version of this method.
278    fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState));
279
280    /// Returns true if the system is idle.
281    fn is_idle(&self) -> bool;
282
283    /// Returns a reference to the backend.
284    fn backend(&self) -> &B;
285}
286
287/// An extension trait for methods of [`TurboTasksBackendApi`] that are not object-safe. This is
288/// automatically implemented for all [`TurboTasksBackendApi`]s using a blanket impl.
289pub trait TurboTasksBackendApiExt<B: Backend + 'static>: TurboTasksBackendApi<B> {
290    /// Allows modification of the [`Backend::TaskState`].
291    ///
292    /// This function holds open a non-exclusive read lock that blocks writes, so `func` is expected
293    /// to execute quickly in order to release the lock.
294    fn read_task_state<T>(&self, func: impl FnOnce(&B::TaskState) -> T) -> T {
295        let mut func = Some(func);
296        let mut out = None;
297        self.read_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
298        out.expect("read_task_state_dyn must call `func`")
299    }
300
301    /// Allows modification of the [`Backend::TaskState`].
302    ///
303    /// This function holds open a write lock, so `func` is expected to execute quickly in order to
304    /// release the lock.
305    fn write_task_state<T>(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T {
306        let mut func = Some(func);
307        let mut out = None;
308        self.write_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
309        out.expect("write_task_state_dyn must call `func`")
310    }
311}
312
313impl<TT, B> TurboTasksBackendApiExt<B> for TT
314where
315    TT: TurboTasksBackendApi<B> + ?Sized,
316    B: Backend + 'static,
317{
318}
319
320#[allow(clippy::manual_non_exhaustive)]
321pub struct UpdateInfo {
322    pub duration: Duration,
323    pub tasks: usize,
324    pub reasons: InvalidationReasonSet,
325    #[allow(dead_code)]
326    placeholder_for_future_fields: (),
327}
328
329#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
330pub enum TaskPersistence {
331    /// Tasks that may be persisted across sessions using serialization.
332    Persistent,
333
334    /// Tasks that will be persisted in memory for the life of this session, but won't persist
335    /// between sessions.
336    ///
337    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
338    /// type [`TransientValue`][crate::value::TransientValue] or
339    /// [`TransientInstance`][crate::value::TransientInstance].
340    Transient,
341
342    /// Tasks that are persisted only for the lifetime of the nearest non-`Local` parent caller.
343    ///
344    /// This task does not have a unique task id, and is not shared with the backend. Instead it
345    /// uses the parent task's id.
346    ///
347    /// This is useful for functions that have a low cache hit rate. Those functions could be
348    /// converted to non-task functions, but that would break their function signature. This
349    /// provides a mechanism for skipping caching without changing the function signature.
350    Local,
351}
352
353#[derive(Clone, Copy, Debug, Eq, PartialEq)]
354pub enum ReadConsistency {
355    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
356    /// may later trigger re-computation.
357    Eventual,
358    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
359    /// the cost of slower reads.
360    ///
361    /// Top-level code that returns data to the user should use strongly consistent reads.
362    Strong,
363}
364
365pub struct TurboTasks<B: Backend + 'static> {
366    this: Weak<Self>,
367    backend: B,
368    task_id_factory: IdFactoryWithReuse<TaskId>,
369    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
370    execution_id_factory: IdFactory<ExecutionId>,
371    stopped: AtomicBool,
372    currently_scheduled_tasks: AtomicUsize,
373    currently_scheduled_foreground_jobs: AtomicUsize,
374    currently_scheduled_background_jobs: AtomicUsize,
375    scheduled_tasks: AtomicUsize,
376    start: Mutex<Option<Instant>>,
377    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
378    event: Event,
379    event_start: Event,
380    event_foreground: Event,
381    event_background: Event,
382    program_start: Instant,
383    compilation_events: CompilationEventQueue,
384}
385
386/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
387/// all share the same non-local task state.
388///
389/// A non-local task is one that:
390///
391/// - Has a unique task id.
392/// - Is potentially cached.
393/// - The backend is aware of.
394struct CurrentTaskState {
395    task_id: TaskId,
396    execution_id: ExecutionId,
397
398    /// Affected tasks, that are tracked during task execution. These tasks will
399    /// be invalidated when the execution finishes or before reading a cell
400    /// value.
401    tasks_to_notify: SmallVec<[TaskId; 4]>,
402
403    /// True if the current task has state in cells
404    stateful: bool,
405
406    /// True if the current task uses an external invalidator
407    has_invalidator: bool,
408
409    /// Tracks how many cells of each type has been allocated so far during this task execution.
410    /// When a task is re-executed, the cell count may not match the existing cell vec length.
411    ///
412    /// This is taken (and becomes `None`) during teardown of a task.
413    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
414
415    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
416    local_tasks: Vec<LocalTask>,
417
418    /// Tracks currently running local tasks, and defers cleanup of the global task until those
419    /// complete. Also used by `detached_for_testing`.
420    local_task_tracker: TaskTracker,
421
422    backend_state: Box<dyn Any + Send + Sync>,
423}
424
425impl CurrentTaskState {
426    fn new(
427        task_id: TaskId,
428        execution_id: ExecutionId,
429        backend_state: Box<dyn Any + Send + Sync>,
430    ) -> Self {
431        Self {
432            task_id,
433            execution_id,
434            tasks_to_notify: SmallVec::new(),
435            stateful: false,
436            has_invalidator: false,
437            cell_counters: Some(AutoMap::default()),
438            local_tasks: Vec::new(),
439            local_task_tracker: TaskTracker::new(),
440            backend_state,
441        }
442    }
443
444    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
445        if self.execution_id != expected_execution_id {
446            panic!(
447                "Local tasks can only be scheduled/awaited within the same execution of the \
448                 parent task that created them"
449            );
450        }
451    }
452
453    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
454        self.local_tasks.push(local_task);
455        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
456        if cfg!(debug_assertions) {
457            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
458        } else {
459            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
460        }
461    }
462
463    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
464        // local task ids are one-indexed (they use NonZeroU32)
465        &self.local_tasks[(*local_task_id as usize) - 1]
466    }
467
468    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
469        &mut self.local_tasks[(*local_task_id as usize) - 1]
470    }
471}
472
473// TODO implement our own thread pool and make these thread locals instead
474task_local! {
475    /// The current TurboTasks instance
476    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
477
478    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
479}
480
481impl<B: Backend + 'static> TurboTasks<B> {
482    // TODO better lifetime management for turbo tasks
483    // consider using unsafe for the task_local turbo tasks
484    // that should be safe as long tasks can't outlife turbo task
485    // so we probably want to make sure that all tasks are joined
486    // when trying to drop turbo tasks
487    pub fn new(backend: B) -> Arc<Self> {
488        let task_id_factory = IdFactoryWithReuse::new(
489            TaskId::MIN,
490            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
491        );
492        let transient_task_id_factory =
493            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
494        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
495        let this = Arc::new_cyclic(|this| Self {
496            this: this.clone(),
497            backend,
498            task_id_factory,
499            transient_task_id_factory,
500            execution_id_factory,
501            stopped: AtomicBool::new(false),
502            currently_scheduled_tasks: AtomicUsize::new(0),
503            currently_scheduled_background_jobs: AtomicUsize::new(0),
504            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
505            scheduled_tasks: AtomicUsize::new(0),
506            start: Default::default(),
507            aggregated_update: Default::default(),
508            event: Event::new(|| || "TurboTasks::event".to_string()),
509            event_start: Event::new(|| || "TurboTasks::event_start".to_string()),
510            event_foreground: Event::new(|| || "TurboTasks::event_foreground".to_string()),
511            event_background: Event::new(|| || "TurboTasks::event_background".to_string()),
512            program_start: Instant::now(),
513            compilation_events: CompilationEventQueue::default(),
514        });
515        this.backend.startup(&*this);
516        this
517    }
518
519    pub fn pin(&self) -> Arc<Self> {
520        self.this.upgrade().unwrap()
521    }
522
523    /// Creates a new root task
524    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
525    where
526        T: ?Sized,
527        F: Fn() -> Fut + Send + Sync + Clone + 'static,
528        Fut: Future<Output = Result<Vc<T>>> + Send,
529    {
530        let id = self.backend.create_transient_task(
531            TransientTaskType::Root(Box::new(move || {
532                let functor = functor.clone();
533                Box::pin(async move {
534                    let raw_vc = functor().await?.node;
535                    raw_vc.to_non_local().await
536                })
537            })),
538            self,
539        );
540        self.schedule(id);
541        id
542    }
543
544    pub fn dispose_root_task(&self, task_id: TaskId) {
545        self.backend.dispose_root_task(task_id, self);
546    }
547
548    // TODO make sure that all dependencies settle before reading them
549    /// Creates a new root task, that is only executed once.
550    /// Dependencies will not invalidate the task.
551    #[track_caller]
552    pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
553    where
554        T: ?Sized,
555        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
556    {
557        let id = self.backend.create_transient_task(
558            TransientTaskType::Once(Box::pin(async move {
559                let raw_vc = future.await?.node;
560                raw_vc.to_non_local().await
561            })),
562            self,
563        );
564        self.schedule(id);
565        id
566    }
567
568    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
569        &self,
570        future: impl Future<Output = Result<T>> + Send + 'static,
571    ) -> Result<T> {
572        let (tx, rx) = tokio::sync::oneshot::channel();
573        let task_id = self.spawn_once_task(async move {
574            let result = future.await?;
575            tx.send(result)
576                .map_err(|_| anyhow!("unable to send result"))?;
577            Ok(Completion::new())
578        });
579        // INVALIDATION: A Once task will never invalidate, therefore we don't need to
580        // track a dependency
581        let raw_result =
582            read_task_output_untracked(self, task_id, ReadConsistency::Eventual).await?;
583        turbo_tasks_future_scope(
584            self.pin(),
585            ReadVcFuture::<Completion>::from(raw_result.into_read().untracked()),
586        )
587        .await?;
588
589        Ok(rx.await?)
590    }
591
592    pub(crate) fn native_call(
593        &self,
594        native_fn: &'static NativeFunction,
595        this: Option<RawVc>,
596        arg: Box<dyn MagicAny>,
597        persistence: TaskPersistence,
598    ) -> RawVc {
599        match persistence {
600            TaskPersistence::Local => {
601                let task_type = LocalTaskSpec {
602                    task_type: LocalTaskType::Native { native_fn },
603                    this,
604                    arg,
605                };
606                self.schedule_local_task(task_type, persistence)
607            }
608            TaskPersistence::Transient => {
609                let task_type = CachedTaskType {
610                    native_fn,
611                    this,
612                    arg,
613                };
614
615                RawVc::TaskOutput(self.backend.get_or_create_transient_task(
616                    task_type,
617                    current_task("turbo_function calls"),
618                    self,
619                ))
620            }
621            TaskPersistence::Persistent => {
622                let task_type = CachedTaskType {
623                    native_fn,
624                    this,
625                    arg,
626                };
627
628                RawVc::TaskOutput(self.backend.get_or_create_persistent_task(
629                    task_type,
630                    current_task("turbo_function calls"),
631                    self,
632                ))
633            }
634        }
635    }
636
637    pub fn dynamic_call(
638        &self,
639        native_fn: &'static NativeFunction,
640        this: Option<RawVc>,
641        arg: Box<dyn MagicAny>,
642        persistence: TaskPersistence,
643    ) -> RawVc {
644        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
645            return self.native_call(native_fn, this, arg, persistence);
646        }
647        let task_type = LocalTaskSpec {
648            task_type: LocalTaskType::ResolveNative { native_fn },
649            this,
650            arg,
651        };
652        self.schedule_local_task(task_type, persistence)
653    }
654
655    pub fn trait_call(
656        &self,
657        trait_method: &'static TraitMethod,
658        this: RawVc,
659        arg: Box<dyn MagicAny>,
660        persistence: TaskPersistence,
661    ) -> RawVc {
662        // avoid creating a wrapper task if self is already resolved
663        // for resolved cells we already know the value type so we can lookup the
664        // function
665        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
666            match registry::get_value_type(type_id).get_trait_method(trait_method) {
667                Some(native_fn) => {
668                    let arg = native_fn.arg_meta.filter_owned(arg);
669                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
670                }
671                None => {
672                    // We are destined to fail at this point, but we just retry resolution in the
673                    // local task since we cannot report an error from here.
674                    // TODO: A panic seems appropriate since the immediate caller is to blame
675                }
676            }
677        }
678
679        // create a wrapper task to resolve all inputs
680        let task_type = LocalTaskSpec {
681            task_type: LocalTaskType::ResolveTrait { trait_method },
682            this: Some(this),
683            arg,
684        };
685
686        self.schedule_local_task(task_type, persistence)
687    }
688
689    #[track_caller]
690    pub(crate) fn schedule(&self, task_id: TaskId) {
691        self.begin_primary_job();
692        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
693
694        let this = self.pin();
695        let future = async move {
696            let mut schedule_again = true;
697            while schedule_again {
698                let backend_state = this.backend.new_task_state(task_id);
699                // it's okay for execution ids to overflow and wrap, they're just used for an assert
700                let execution_id = this.execution_id_factory.wrapping_get();
701                let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
702                    task_id,
703                    execution_id,
704                    Box::new(backend_state),
705                )));
706                let single_execution_future = async {
707                    if this.stopped.load(Ordering::Acquire) {
708                        this.backend.task_execution_canceled(task_id, &*this);
709                        return false;
710                    }
711
712                    let Some(TaskExecutionSpec { future, span }) =
713                        this.backend.try_start_task_execution(task_id, &*this)
714                    else {
715                        return false;
716                    };
717
718                    async {
719                        let (result, duration, alloc_info) = CaptureFuture::new(future).await;
720
721                        // wait for all spawned local tasks using `local` to finish
722                        let ltt = CURRENT_TASK_STATE
723                            .with(|ts| ts.read().unwrap().local_task_tracker.clone());
724                        ltt.close();
725                        ltt.wait().await;
726
727                        let result = match result {
728                            Ok(Ok(raw_vc)) => Ok(raw_vc),
729                            Ok(Err(err)) => Err(err.into()),
730                            Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
731                        };
732
733                        this.backend.task_execution_result(task_id, result, &*this);
734                        let FinishedTaskState {
735                            stateful,
736                            has_invalidator,
737                        } = this.finish_current_task_state();
738                        let cell_counters = CURRENT_TASK_STATE
739                            .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
740                        let schedule_again = this.backend.task_execution_completed(
741                            task_id,
742                            duration,
743                            alloc_info.memory_usage(),
744                            &cell_counters,
745                            stateful,
746                            has_invalidator,
747                            &*this,
748                        );
749                        // task_execution_completed might need to notify tasks
750                        this.notify_scheduled_tasks();
751                        schedule_again
752                    }
753                    .instrument(span)
754                    .await
755                };
756                schedule_again = CURRENT_TASK_STATE
757                    .scope(current_task_state, single_execution_future)
758                    .await;
759            }
760            this.finish_primary_job();
761            anyhow::Ok(())
762        };
763
764        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
765
766        #[cfg(feature = "tokio_tracing")]
767        {
768            let description = self.backend.get_task_description(task_id);
769            tokio::task::Builder::new()
770                .name(&description)
771                .spawn(future)
772                .unwrap();
773        }
774        #[cfg(not(feature = "tokio_tracing"))]
775        tokio::task::spawn(future);
776    }
777
778    fn schedule_local_task(
779        &self,
780        ty: LocalTaskSpec,
781        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
782        // if this is a `LocalTaskType::Native`, persistence is unused.
783        //
784        // TODO: In the rare case that we're crossing a transient->persistent boundary, we should
785        // force `LocalTaskType::Native` to be spawned as real tasks, so that any cells they create
786        // have the correct persistence. This is not an issue for resolution stub task, as they
787        // don't end up owning any cells.
788        persistence: TaskPersistence,
789    ) -> RawVc {
790        let task_type = ty.task_type;
791        let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
792            .with(|gts| {
793                let mut gts_write = gts.write().unwrap();
794                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
795                    done_event: Event::new(move || {
796                        move || format!("LocalTask({task_type})::done_event")
797                    }),
798                });
799                (
800                    Arc::clone(gts),
801                    gts_write.task_id,
802                    gts_write.execution_id,
803                    local_task_id,
804                )
805            });
806
807        #[cfg(feature = "tokio_tracing")]
808        let description = format!(
809            "[local] (parent: {}) {}",
810            self.backend.get_task_description(parent_task_id),
811            ty.task_type,
812        );
813        #[cfg(not(feature = "tokio_tracing"))]
814        let _ = parent_task_id; // suppress unused variable warning
815
816        let this = self.pin();
817        let future = async move {
818            let TaskExecutionSpec { future, span } =
819                crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
820            async move {
821                let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
822
823                let result = match result {
824                    Ok(Ok(raw_vc)) => Ok(raw_vc),
825                    Ok(Err(err)) => Err(err.into()),
826                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
827                };
828
829                let local_task = LocalTask::Done {
830                    output: match result {
831                        Ok(raw_vc) => OutputContent::Link(raw_vc),
832                        Err(err) => OutputContent::Error(err.with_task_context(task_type)),
833                    },
834                };
835
836                let done_event = CURRENT_TASK_STATE.with(move |gts| {
837                    let mut gts_write = gts.write().unwrap();
838                    let scheduled_task =
839                        std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
840                    let LocalTask::Scheduled { done_event } = scheduled_task else {
841                        panic!("local task finished, but was not in the scheduled state?");
842                    };
843                    done_event
844                });
845                done_event.notify(usize::MAX)
846            }
847            .instrument(span)
848            .await
849        };
850        let future = global_task_state
851            .read()
852            .unwrap()
853            .local_task_tracker
854            .track_future(future);
855        let future = CURRENT_TASK_STATE.scope(global_task_state, future);
856        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
857
858        #[cfg(feature = "tokio_tracing")]
859        tokio::task::Builder::new()
860            .name(&description)
861            .spawn(future)
862            .unwrap();
863        #[cfg(not(feature = "tokio_tracing"))]
864        tokio::task::spawn(future);
865
866        RawVc::LocalOutput(execution_id, local_task_id, persistence)
867    }
868
869    fn begin_primary_job(&self) {
870        if self
871            .currently_scheduled_tasks
872            .fetch_add(1, Ordering::AcqRel)
873            == 0
874        {
875            *self.start.lock().unwrap() = Some(Instant::now());
876            self.event_start.notify(usize::MAX);
877            self.backend.idle_end(self);
878        }
879    }
880
881    fn begin_foreground_job(&self) {
882        self.begin_primary_job();
883        self.currently_scheduled_foreground_jobs
884            .fetch_add(1, Ordering::AcqRel);
885    }
886
887    fn finish_primary_job(&self) {
888        if self
889            .currently_scheduled_tasks
890            .fetch_sub(1, Ordering::AcqRel)
891            == 1
892        {
893            self.backend.idle_start(self);
894            // That's not super race-condition-safe, but it's only for
895            // statistical reasons
896            let total = self.scheduled_tasks.load(Ordering::Acquire);
897            self.scheduled_tasks.store(0, Ordering::Release);
898            if let Some(start) = *self.start.lock().unwrap() {
899                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
900                if let Some(update) = update.as_mut() {
901                    update.0 += start.elapsed();
902                    update.1 += total;
903                } else {
904                    *update = Some((start.elapsed(), total));
905                }
906            }
907            self.event.notify(usize::MAX);
908        }
909    }
910
911    fn finish_foreground_job(&self) {
912        if self
913            .currently_scheduled_foreground_jobs
914            .fetch_sub(1, Ordering::AcqRel)
915            == 1
916        {
917            self.event_foreground.notify(usize::MAX);
918        }
919        self.finish_primary_job();
920    }
921
922    pub async fn wait_foreground_done(&self) {
923        if self
924            .currently_scheduled_foreground_jobs
925            .load(Ordering::Acquire)
926            == 0
927        {
928            return;
929        }
930        let listener = self.event_foreground.listen();
931        if self
932            .currently_scheduled_foreground_jobs
933            .load(Ordering::Acquire)
934            == 0
935        {
936            return;
937        }
938        listener
939            .instrument(trace_span!("wait_foreground_done"))
940            .await;
941    }
942
943    pub fn get_in_progress_count(&self) -> usize {
944        self.currently_scheduled_tasks.load(Ordering::Acquire)
945    }
946
947    /// Waits for the given task to finish executing. This works by performing an untracked read,
948    /// and discarding the value of the task output.
949    ///
950    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
951    /// before all dependencies have completely settled.
952    ///
953    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
954    /// to fully settle before returning.
955    ///
956    /// As this function is typically called in top-level code that waits for results to be ready
957    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
958    pub async fn wait_task_completion(
959        &self,
960        id: TaskId,
961        consistency: ReadConsistency,
962    ) -> Result<()> {
963        // INVALIDATION: This doesn't return a value, only waits for it to be ready.
964        read_task_output_untracked(self, id, consistency).await?;
965        Ok(())
966    }
967
968    /// Returns [UpdateInfo] with all updates aggregated over a given duration
969    /// (`aggregation`). Will wait until an update happens.
970    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
971        self.aggregated_update_info(aggregation, Duration::MAX)
972            .await
973            .unwrap()
974    }
975
976    /// Returns [UpdateInfo] with all updates aggregated over a given duration
977    /// (`aggregation`). Will only return None when the timeout is reached while
978    /// waiting for the first update.
979    pub async fn aggregated_update_info(
980        &self,
981        aggregation: Duration,
982        timeout: Duration,
983    ) -> Option<UpdateInfo> {
984        let listener = self
985            .event
986            .listen_with_note(|| || "wait for update info".to_string());
987        let wait_for_finish = {
988            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
989            if aggregation.is_zero() {
990                if let Some((duration, tasks)) = update.take() {
991                    return Some(UpdateInfo {
992                        duration,
993                        tasks,
994                        reasons: take(reason_set),
995                        placeholder_for_future_fields: (),
996                    });
997                } else {
998                    true
999                }
1000            } else {
1001                update.is_none()
1002            }
1003        };
1004        if wait_for_finish {
1005            if timeout == Duration::MAX {
1006                // wait for finish
1007                listener.await;
1008            } else {
1009                // wait for start, then wait for finish or timeout
1010                let start_listener = self
1011                    .event_start
1012                    .listen_with_note(|| || "wait for update info".to_string());
1013                if self.currently_scheduled_tasks.load(Ordering::Acquire) == 0 {
1014                    start_listener.await;
1015                } else {
1016                    drop(start_listener);
1017                }
1018                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1019                    // Timeout
1020                    return None;
1021                }
1022            }
1023        }
1024        if !aggregation.is_zero() {
1025            loop {
1026                select! {
1027                    () = tokio::time::sleep(aggregation) => {
1028                        break;
1029                    }
1030                    () = self.event.listen_with_note(|| || "wait for update info".to_string()) => {
1031                        // Resets the sleep
1032                    }
1033                }
1034            }
1035        }
1036        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1037        if let Some((duration, tasks)) = update.take() {
1038            Some(UpdateInfo {
1039                duration,
1040                tasks,
1041                reasons: take(reason_set),
1042                placeholder_for_future_fields: (),
1043            })
1044        } else {
1045            panic!("aggregated_update_info must not called concurrently")
1046        }
1047    }
1048
1049    pub async fn wait_background_done(&self) {
1050        let listener = self.event_background.listen();
1051        if self
1052            .currently_scheduled_background_jobs
1053            .load(Ordering::Acquire)
1054            != 0
1055        {
1056            listener.await;
1057        }
1058    }
1059
1060    pub async fn stop_and_wait(&self) {
1061        turbo_tasks_future_scope(self.pin(), async move {
1062            self.backend.stopping(self);
1063            self.stopped.store(true, Ordering::Release);
1064            {
1065                let listener = self
1066                    .event
1067                    .listen_with_note(|| || "wait for stop".to_string());
1068                if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1069                    listener.await;
1070                }
1071            }
1072            {
1073                let listener = self.event_background.listen();
1074                if self
1075                    .currently_scheduled_background_jobs
1076                    .load(Ordering::Acquire)
1077                    != 0
1078                {
1079                    listener.await;
1080                }
1081            }
1082            self.backend.stop(self);
1083        })
1084        .await;
1085    }
1086
1087    #[track_caller]
1088    pub(crate) fn schedule_background_job<
1089        T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1090        F: Future<Output = ()> + Send + 'static,
1091    >(
1092        &self,
1093        func: T,
1094    ) {
1095        let this = self.pin();
1096        self.currently_scheduled_background_jobs
1097            .fetch_add(1, Ordering::AcqRel);
1098        tokio::spawn(
1099            TURBO_TASKS
1100                .scope(this.clone(), async move {
1101                    while this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1102                        let listener = this.event.listen_with_note(|| {
1103                            || "background job waiting for execution".to_string()
1104                        });
1105                        if this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1106                            listener.await;
1107                        }
1108                    }
1109                    let this2 = this.clone();
1110                    if !this.stopped.load(Ordering::Acquire) {
1111                        func(this).await;
1112                    }
1113                    if this2
1114                        .currently_scheduled_background_jobs
1115                        .fetch_sub(1, Ordering::AcqRel)
1116                        == 1
1117                    {
1118                        this2.event_background.notify(usize::MAX);
1119                    }
1120                })
1121                .in_current_span(),
1122        );
1123    }
1124
1125    #[track_caller]
1126    pub(crate) fn schedule_foreground_job<
1127        T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1128        F: Future<Output = ()> + Send + 'static,
1129    >(
1130        &self,
1131        func: T,
1132    ) {
1133        let this = self.pin();
1134        this.begin_foreground_job();
1135        tokio::spawn(
1136            TURBO_TASKS
1137                .scope(this.clone(), async move {
1138                    if !this.stopped.load(Ordering::Acquire) {
1139                        func(this.clone()).await;
1140                    }
1141                    this.finish_foreground_job();
1142                })
1143                .in_current_span(),
1144        );
1145    }
1146
1147    fn finish_current_task_state(&self) -> FinishedTaskState {
1148        let (stateful, has_invalidator, tasks) = CURRENT_TASK_STATE.with(|cell| {
1149            let CurrentTaskState {
1150                tasks_to_notify,
1151                stateful,
1152                has_invalidator,
1153                ..
1154            } = &mut *cell.write().unwrap();
1155            (*stateful, *has_invalidator, take(tasks_to_notify))
1156        });
1157
1158        if !tasks.is_empty() {
1159            self.backend.invalidate_tasks(&tasks, self);
1160        }
1161        FinishedTaskState {
1162            stateful,
1163            has_invalidator,
1164        }
1165    }
1166
1167    pub fn backend(&self) -> &B {
1168        &self.backend
1169    }
1170}
1171
1172struct FinishedTaskState {
1173    /// True if the task has state in cells
1174    stateful: bool,
1175
1176    /// True if the task uses an external invalidator
1177    has_invalidator: bool,
1178}
1179
1180impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1181    fn dynamic_call(
1182        &self,
1183        native_fn: &'static NativeFunction,
1184        this: Option<RawVc>,
1185        arg: Box<dyn MagicAny>,
1186        persistence: TaskPersistence,
1187    ) -> RawVc {
1188        self.dynamic_call(native_fn, this, arg, persistence)
1189    }
1190    fn native_call(
1191        &self,
1192        native_fn: &'static NativeFunction,
1193        this: Option<RawVc>,
1194        arg: Box<dyn MagicAny>,
1195        persistence: TaskPersistence,
1196    ) -> RawVc {
1197        self.native_call(native_fn, this, arg, persistence)
1198    }
1199    fn trait_call(
1200        &self,
1201        trait_method: &'static TraitMethod,
1202        this: RawVc,
1203        arg: Box<dyn MagicAny>,
1204        persistence: TaskPersistence,
1205    ) -> RawVc {
1206        self.trait_call(trait_method, this, arg, persistence)
1207    }
1208
1209    #[track_caller]
1210    fn run_once(
1211        &self,
1212        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1213    ) -> TaskId {
1214        self.spawn_once_task(async move {
1215            future.await?;
1216            Ok(Completion::new())
1217        })
1218    }
1219
1220    #[track_caller]
1221    fn run_once_with_reason(
1222        &self,
1223        reason: StaticOrArc<dyn InvalidationReason>,
1224        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1225    ) -> TaskId {
1226        {
1227            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1228            reason_set.insert(reason);
1229        }
1230        self.spawn_once_task(async move {
1231            future.await?;
1232            Ok(Completion::new())
1233        })
1234    }
1235
1236    #[track_caller]
1237    fn run_once_process(
1238        &self,
1239        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1240    ) -> TaskId {
1241        let this = self.pin();
1242        self.spawn_once_task(async move {
1243            this.finish_primary_job();
1244            future.await?;
1245            this.begin_primary_job();
1246            Ok(Completion::new())
1247        })
1248    }
1249}
1250
1251impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1252    #[instrument(level = Level::INFO, skip_all, name = "invalidate")]
1253    fn invalidate(&self, task: TaskId) {
1254        self.backend.invalidate_task(task, self);
1255    }
1256
1257    #[instrument(level = Level::INFO, skip_all, name = "invalidate", fields(name = display(&reason)))]
1258    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1259        {
1260            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1261            reason_set.insert(reason);
1262        }
1263        self.backend.invalidate_task(task, self);
1264    }
1265
1266    fn invalidate_serialization(&self, task: TaskId) {
1267        self.backend.invalidate_serialization(task, self);
1268    }
1269
1270    fn notify_scheduled_tasks(&self) {
1271        let _ = CURRENT_TASK_STATE.try_with(|cell| {
1272            let tasks = {
1273                let CurrentTaskState {
1274                    tasks_to_notify, ..
1275                } = &mut *cell.write().unwrap();
1276                take(tasks_to_notify)
1277            };
1278            if tasks.is_empty() {
1279                return;
1280            }
1281            self.backend.invalidate_tasks(&tasks, self);
1282        });
1283    }
1284
1285    fn try_read_task_output(
1286        &self,
1287        task: TaskId,
1288        consistency: ReadConsistency,
1289    ) -> Result<Result<RawVc, EventListener>> {
1290        self.backend
1291            .try_read_task_output(task, current_task("reading Vcs"), consistency, self)
1292    }
1293
1294    fn try_read_task_output_untracked(
1295        &self,
1296        task: TaskId,
1297        consistency: ReadConsistency,
1298    ) -> Result<Result<RawVc, EventListener>> {
1299        self.backend
1300            .try_read_task_output_untracked(task, consistency, self)
1301    }
1302
1303    fn try_read_task_cell(
1304        &self,
1305        task: TaskId,
1306        index: CellId,
1307        options: ReadCellOptions,
1308    ) -> Result<Result<TypedCellContent, EventListener>> {
1309        self.backend
1310            .try_read_task_cell(task, index, current_task("reading Vcs"), options, self)
1311    }
1312
1313    fn try_read_task_cell_untracked(
1314        &self,
1315        task: TaskId,
1316        index: CellId,
1317        options: ReadCellOptions,
1318    ) -> Result<Result<TypedCellContent, EventListener>> {
1319        self.backend
1320            .try_read_task_cell_untracked(task, index, options, self)
1321    }
1322
1323    fn try_read_own_task_cell_untracked(
1324        &self,
1325        current_task: TaskId,
1326        index: CellId,
1327        options: ReadCellOptions,
1328    ) -> Result<TypedCellContent> {
1329        self.backend
1330            .try_read_own_task_cell_untracked(current_task, index, options, self)
1331    }
1332
1333    fn try_read_local_output(
1334        &self,
1335        execution_id: ExecutionId,
1336        local_task_id: LocalTaskId,
1337    ) -> Result<Result<RawVc, EventListener>> {
1338        CURRENT_TASK_STATE.with(|gts| {
1339            let gts_read = gts.read().unwrap();
1340
1341            // Local Vcs are local to their parent task's current execution, and do not exist
1342            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1343            // marker trait. This assertion exists to handle any potential escapes that the
1344            // compile-time checks cannot capture.
1345            gts_read.assert_execution_id(execution_id);
1346
1347            match gts_read.get_local_task(local_task_id) {
1348                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1349                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1350            }
1351        })
1352    }
1353
1354    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1355        self.backend.read_task_collectibles(
1356            task,
1357            trait_id,
1358            current_task("reading collectibles"),
1359            self,
1360        )
1361    }
1362
1363    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1364        self.backend.emit_collectible(
1365            trait_type,
1366            collectible,
1367            current_task("emitting collectible"),
1368            self,
1369        );
1370    }
1371
1372    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1373        self.backend.unemit_collectible(
1374            trait_type,
1375            collectible,
1376            count,
1377            current_task("emitting collectible"),
1378            self,
1379        );
1380    }
1381
1382    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1383        for (&collectible, &count) in collectibles {
1384            if count > 0 {
1385                self.backend.unemit_collectible(
1386                    trait_type,
1387                    collectible,
1388                    count as u32,
1389                    current_task("emitting collectible"),
1390                    self,
1391                );
1392            }
1393        }
1394    }
1395
1396    fn read_own_task_cell(
1397        &self,
1398        task: TaskId,
1399        index: CellId,
1400        options: ReadCellOptions,
1401    ) -> Result<TypedCellContent> {
1402        // INVALIDATION: don't need to track a dependency to itself
1403        self.try_read_own_task_cell_untracked(task, index, options)
1404    }
1405
1406    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
1407        self.backend.update_task_cell(task, index, content, self);
1408    }
1409
1410    fn connect_task(&self, task: TaskId) {
1411        self.backend
1412            .connect_task(task, current_task("connecting task"), self);
1413    }
1414
1415    fn mark_own_task_as_finished(&self, task: TaskId) {
1416        self.backend.mark_own_task_as_finished(task, self);
1417    }
1418
1419    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1420        self.backend
1421            .set_own_task_aggregation_number(task, aggregation_number, self);
1422    }
1423
1424    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1425        self.backend.mark_own_task_as_session_dependent(task, self);
1426    }
1427
1428    /// Creates a future that inherits the current task id and task state. The current global task
1429    /// will wait for this future to be dropped before exiting.
1430    fn detached_for_testing(
1431        &self,
1432        fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1433    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1434        // this is similar to what happens for a local task, except that we keep the local task's
1435        // state as well.
1436        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1437        let tracked_fut = {
1438            let ts = global_task_state.read().unwrap();
1439            ts.local_task_tracker.track_future(fut)
1440        };
1441        Box::pin(TURBO_TASKS.scope(
1442            turbo_tasks(),
1443            CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1444        ))
1445    }
1446
1447    fn task_statistics(&self) -> &TaskStatisticsApi {
1448        self.backend.task_statistics()
1449    }
1450
1451    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1452        let this = self.pin();
1453        Box::pin(async move {
1454            this.stop_and_wait().await;
1455        })
1456    }
1457
1458    fn subscribe_to_compilation_events(
1459        &self,
1460        event_types: Option<Vec<String>>,
1461    ) -> Receiver<Arc<dyn CompilationEvent>> {
1462        self.compilation_events.subscribe(event_types)
1463    }
1464
1465    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1466        if let Err(e) = self.compilation_events.send(event) {
1467            tracing::warn!("Failed to send compilation event: {e}");
1468        }
1469    }
1470}
1471
1472impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1473    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1474        self.pin()
1475    }
1476    fn backend(&self) -> &B {
1477        &self.backend
1478    }
1479
1480    #[track_caller]
1481    fn schedule_backend_background_job(&self, job: B::BackendJob) {
1482        self.schedule_background_job(move |this| async move {
1483            this.backend.run_backend_job(job, &*this).await;
1484        })
1485    }
1486
1487    #[track_caller]
1488    fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1489        self.schedule_foreground_job(move |this| async move {
1490            this.backend.run_backend_job(job, &*this).await;
1491        })
1492    }
1493
1494    fn try_foreground_done(&self) -> Result<(), EventListener> {
1495        if self
1496            .currently_scheduled_foreground_jobs
1497            .load(Ordering::Acquire)
1498            == 0
1499        {
1500            return Ok(());
1501        }
1502        let listener = self.event_foreground.listen();
1503        if self
1504            .currently_scheduled_foreground_jobs
1505            .load(Ordering::Acquire)
1506            == 0
1507        {
1508            return Ok(());
1509        }
1510        Err(listener)
1511    }
1512
1513    fn wait_foreground_done_excluding_own<'a>(
1514        &'a self,
1515    ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>> {
1516        if self
1517            .currently_scheduled_foreground_jobs
1518            .load(Ordering::Acquire)
1519            == 0
1520        {
1521            return None;
1522        }
1523        Some(Box::pin(async {
1524            self.finish_foreground_job();
1525            self.wait_foreground_done().await;
1526            self.begin_foreground_job();
1527        }))
1528    }
1529
1530    /// Enqueues tasks for notification of changed dependencies. This will
1531    /// eventually call `dependent_cell_updated()` on all tasks.
1532    fn schedule_notify_tasks(&self, tasks: &[TaskId]) {
1533        let result = CURRENT_TASK_STATE.try_with(|cell| {
1534            let CurrentTaskState {
1535                tasks_to_notify, ..
1536            } = &mut *cell.write().unwrap();
1537            tasks_to_notify.extend(tasks.iter().copied());
1538        });
1539        if result.is_err() {
1540            let _guard = trace_span!("schedule_notify_tasks", count = tasks.len()).entered();
1541            self.backend.invalidate_tasks(tasks, self);
1542        }
1543    }
1544
1545    /// Enqueues tasks for notification of changed dependencies. This will
1546    /// eventually call `dependent_cell_updated()` on all tasks.
1547    fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet) {
1548        let result = CURRENT_TASK_STATE.try_with(|cell| {
1549            let CurrentTaskState {
1550                tasks_to_notify, ..
1551            } = &mut *cell.write().unwrap();
1552            tasks_to_notify.extend(tasks.iter().copied());
1553        });
1554        if result.is_err() {
1555            let _guard = trace_span!("schedule_notify_tasks_set", count = tasks.len()).entered();
1556            self.backend.invalidate_tasks_set(tasks, self);
1557        };
1558    }
1559
1560    #[track_caller]
1561    fn schedule(&self, task: TaskId) {
1562        self.schedule(task)
1563    }
1564
1565    fn program_duration_until(&self, instant: Instant) -> Duration {
1566        instant - self.program_start
1567    }
1568
1569    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1570        // SAFETY: This is a fresh id from the factory
1571        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1572    }
1573
1574    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1575        // SAFETY: This is a fresh id from the factory
1576        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1577    }
1578
1579    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1580        unsafe { self.task_id_factory.reuse(id.into()) }
1581    }
1582
1583    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1584        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1585    }
1586
1587    fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState)) {
1588        CURRENT_TASK_STATE
1589            .with(move |ts| func(ts.read().unwrap().backend_state.downcast_ref().unwrap()))
1590    }
1591
1592    fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState)) {
1593        CURRENT_TASK_STATE
1594            .with(move |ts| func(ts.write().unwrap().backend_state.downcast_mut().unwrap()))
1595    }
1596
1597    fn is_idle(&self) -> bool {
1598        self.currently_scheduled_tasks.load(Ordering::Acquire) == 0
1599    }
1600}
1601
1602pub(crate) fn current_task(from: &str) -> TaskId {
1603    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1604        Ok(id) => id,
1605        Err(_) => panic!("{from} can only be used in the context of turbo_tasks task execution"),
1606    }
1607}
1608
1609pub async fn run_once<T: Send + 'static>(
1610    tt: Arc<dyn TurboTasksApi>,
1611    future: impl Future<Output = Result<T>> + Send + 'static,
1612) -> Result<T> {
1613    let (tx, rx) = tokio::sync::oneshot::channel();
1614
1615    let task_id = tt.run_once(Box::pin(async move {
1616        let result = future.await?;
1617        tx.send(result)
1618            .map_err(|_| anyhow!("unable to send result"))?;
1619        Ok(())
1620    }));
1621
1622    // INVALIDATION: A Once task will never invalidate, therefore we don't need to
1623    // track a dependency
1624    let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1625    let raw_future = raw_result.into_read().untracked();
1626    turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1627
1628    Ok(rx.await?)
1629}
1630
1631pub async fn run_once_with_reason<T: Send + 'static>(
1632    tt: Arc<dyn TurboTasksApi>,
1633    reason: impl InvalidationReason,
1634    future: impl Future<Output = Result<T>> + Send + 'static,
1635) -> Result<T> {
1636    let (tx, rx) = tokio::sync::oneshot::channel();
1637
1638    let task_id = tt.run_once_with_reason(
1639        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1640        Box::pin(async move {
1641            let result = future.await?;
1642            tx.send(result)
1643                .map_err(|_| anyhow!("unable to send result"))?;
1644            Ok(())
1645        }),
1646    );
1647
1648    // INVALIDATION: A Once task will never invalidate, therefore we don't need to
1649    // track a dependency
1650    let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1651    let raw_future = raw_result.into_read().untracked();
1652    turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1653
1654    Ok(rx.await?)
1655}
1656
1657/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1658pub fn dynamic_call(
1659    func: &'static NativeFunction,
1660    this: Option<RawVc>,
1661    arg: Box<dyn MagicAny>,
1662    persistence: TaskPersistence,
1663) -> RawVc {
1664    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1665}
1666
1667/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1668pub fn trait_call(
1669    trait_method: &'static TraitMethod,
1670    this: RawVc,
1671    arg: Box<dyn MagicAny>,
1672    persistence: TaskPersistence,
1673) -> RawVc {
1674    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1675}
1676
1677pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1678    TURBO_TASKS.with(|arc| arc.clone())
1679}
1680
1681pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1682    TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1683}
1684
1685pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1686    TURBO_TASKS.with(|arc| func(arc))
1687}
1688
1689pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1690    TURBO_TASKS.sync_scope(tt, f)
1691}
1692
1693pub fn turbo_tasks_future_scope<T>(
1694    tt: Arc<dyn TurboTasksApi>,
1695    f: impl Future<Output = T>,
1696) -> impl Future<Output = T> {
1697    TURBO_TASKS.scope(tt, f)
1698}
1699
1700pub fn with_turbo_tasks_for_testing<T>(
1701    tt: Arc<dyn TurboTasksApi>,
1702    current_task: TaskId,
1703    execution_id: ExecutionId,
1704    f: impl Future<Output = T>,
1705) -> impl Future<Output = T> {
1706    TURBO_TASKS.scope(
1707        tt,
1708        CURRENT_TASK_STATE.scope(
1709            Arc::new(RwLock::new(CurrentTaskState::new(
1710                current_task,
1711                execution_id,
1712                Box::new(()),
1713            ))),
1714            f,
1715        ),
1716    )
1717}
1718
1719/// Spawns the given future within the context of the current task.
1720///
1721/// Beware: this method is not safe to use in production code. It is only
1722/// intended for use in tests and for debugging purposes.
1723pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1724    tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1725}
1726
1727pub fn current_task_for_testing() -> TaskId {
1728    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1729}
1730
1731/// Marks the current task as dirty when restored from persistent cache.
1732pub fn mark_session_dependent() {
1733    with_turbo_tasks(|tt| {
1734        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1735    });
1736}
1737
1738/// Marks the current task as a root in the aggregation graph.  This means it starts with the
1739/// correct aggregation number instead of needing to recompute it after the fact.
1740pub fn mark_root() {
1741    with_turbo_tasks(|tt| {
1742        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1743    });
1744}
1745
1746/// Marks the current task as finished. This excludes it from waiting for
1747/// strongly consistency.
1748pub fn mark_finished() {
1749    with_turbo_tasks(|tt| {
1750        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1751    });
1752}
1753
1754/// Marks the current task as stateful. This prevents the tasks from being
1755/// dropped without persisting the state.
1756///
1757/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1758/// serialization of the current task cells
1759pub fn mark_stateful() -> SerializationInvalidator {
1760    CURRENT_TASK_STATE.with(|cell| {
1761        let CurrentTaskState {
1762            stateful, task_id, ..
1763        } = &mut *cell.write().unwrap();
1764        *stateful = true;
1765        SerializationInvalidator::new(*task_id)
1766    })
1767}
1768
1769pub fn mark_invalidator() {
1770    CURRENT_TASK_STATE.with(|cell| {
1771        let CurrentTaskState {
1772            has_invalidator, ..
1773        } = &mut *cell.write().unwrap();
1774        *has_invalidator = true;
1775    })
1776}
1777
1778pub fn prevent_gc() {
1779    // There is a hack in UpdateCellOperation that need to be updated when this is changed.
1780    mark_stateful();
1781}
1782
1783/// Notifies scheduled tasks for execution.
1784pub fn notify_scheduled_tasks() {
1785    with_turbo_tasks(|tt| tt.notify_scheduled_tasks())
1786}
1787
1788pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1789    with_turbo_tasks(|tt| {
1790        let raw_vc = collectible.node.node;
1791        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1792    })
1793}
1794
1795pub(crate) async fn read_task_output(
1796    this: &dyn TurboTasksApi,
1797    id: TaskId,
1798    consistency: ReadConsistency,
1799) -> Result<RawVc> {
1800    loop {
1801        match this.try_read_task_output(id, consistency)? {
1802            Ok(result) => return Ok(result),
1803            Err(listener) => listener.await,
1804        }
1805    }
1806}
1807
1808/// INVALIDATION: Be careful with this, it will not track dependencies, so
1809/// using it could break cache invalidation.
1810pub(crate) async fn read_task_output_untracked(
1811    this: &dyn TurboTasksApi,
1812    id: TaskId,
1813    consistency: ReadConsistency,
1814) -> Result<RawVc> {
1815    loop {
1816        match this.try_read_task_output_untracked(id, consistency)? {
1817            Ok(result) => return Ok(result),
1818            Err(listener) => listener.await,
1819        }
1820    }
1821}
1822
1823pub(crate) async fn read_task_cell(
1824    this: &dyn TurboTasksApi,
1825    id: TaskId,
1826    index: CellId,
1827    options: ReadCellOptions,
1828) -> Result<TypedCellContent> {
1829    loop {
1830        match this.try_read_task_cell(id, index, options)? {
1831            Ok(result) => return Ok(result),
1832            Err(listener) => listener.await,
1833        }
1834    }
1835}
1836
1837/// A reference to a task's cell with methods that allow updating the contents
1838/// of the cell.
1839///
1840/// Mutations should not outside of the task that that owns this cell. Doing so
1841/// is a logic error, and may lead to incorrect caching behavior.
1842#[derive(Clone, Copy, Serialize, Deserialize)]
1843pub struct CurrentCellRef {
1844    current_task: TaskId,
1845    index: CellId,
1846}
1847
1848type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1849
1850impl CurrentCellRef {
1851    /// Updates the cell if the given `functor` returns a value.
1852    pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1853    where
1854        T: VcValueType,
1855    {
1856        self.conditional_update_with_shared_reference(|old_shared_reference| {
1857            let old_ref = old_shared_reference
1858                .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1859                .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1860            let new_value = functor(old_ref)?;
1861            Some(SharedReference::new(triomphe::Arc::new(
1862                <T::Read as VcRead<T>>::value_to_repr(new_value),
1863            )))
1864        })
1865    }
1866
1867    /// Updates the cell if the given `functor` returns a `SharedReference`.
1868    pub fn conditional_update_with_shared_reference(
1869        &self,
1870        functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1871    ) {
1872        let tt = turbo_tasks();
1873        let cell_content = tt
1874            .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
1875            .ok();
1876        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1877        if let Some(update) = update {
1878            tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(update)))
1879        }
1880    }
1881
1882    /// Replace the current cell's content with `new_value` if the current content is not equal by
1883    /// value with the existing content.
1884    ///
1885    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1886    ///
1887    /// Take this example of a custom equality implementation on a transparent wrapper type:
1888    ///
1889    /// ```
1890    /// #[turbo_tasks::value(transparent, eq = "manual")]
1891    /// struct Wrapper(Vec<u32>);
1892    ///
1893    /// impl PartialEq for Wrapper {
1894    ///     fn eq(&self, other: Wrapper) {
1895    ///         // Example: order doesn't matter for equality
1896    ///         let (mut this, mut other) = (self.clone(), other.clone());
1897    ///         this.sort_unstable();
1898    ///         other.sort_unstable();
1899    ///         this == other
1900    ///     }
1901    /// }
1902    ///
1903    /// impl Eq for Wrapper {}
1904    /// ```
1905    ///
1906    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
1907    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
1908    ///
1909    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
1910    /// just forwards to the inner value's [`PartialEq`].
1911    ///
1912    /// If you already have a `SharedReference`, consider calling
1913    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
1914    /// object.
1915    pub fn compare_and_update<T>(&self, new_value: T)
1916    where
1917        T: PartialEq + VcValueType,
1918    {
1919        self.conditional_update(|old_value| {
1920            if let Some(old_value) = old_value
1921                && old_value == &new_value
1922            {
1923                return None;
1924            }
1925            Some(new_value)
1926        });
1927    }
1928
1929    /// Replace the current cell's content with `new_shared_reference` if the
1930    /// current content is not equal by value with the existing content.
1931    ///
1932    /// If you already have a `SharedReference`, this is a faster version of
1933    /// [`CurrentCellRef::compare_and_update`].
1934    ///
1935    /// The [`SharedReference`] is expected to use the `<T::Read as
1936    /// VcRead<T>>::Repr` type for its representation of the value.
1937    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1938    where
1939        T: VcValueType + PartialEq,
1940    {
1941        fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1942            <T::Read as VcRead<T>>::repr_to_value_ref(
1943                sr.0.downcast_ref::<VcReadRepr<T>>()
1944                    .expect("cannot update SharedReference of different type"),
1945            )
1946        }
1947        self.conditional_update_with_shared_reference(|old_sr| {
1948            if let Some(old_sr) = old_sr {
1949                let old_value: &T = extract_sr_value(old_sr);
1950                let new_value = extract_sr_value(&new_shared_reference);
1951                if old_value == new_value {
1952                    return None;
1953                }
1954            }
1955            Some(new_shared_reference)
1956        });
1957    }
1958
1959    /// Unconditionally updates the content of the cell.
1960    pub fn update<T>(&self, new_value: T)
1961    where
1962        T: VcValueType,
1963    {
1964        let tt = turbo_tasks();
1965        tt.update_own_task_cell(
1966            self.current_task,
1967            self.index,
1968            CellContent(Some(SharedReference::new(triomphe::Arc::new(
1969                <T::Read as VcRead<T>>::value_to_repr(new_value),
1970            )))),
1971        )
1972    }
1973
1974    /// A faster version of [`Self::update`] if you already have a
1975    /// [`SharedReference`].
1976    ///
1977    /// If the passed-in [`SharedReference`] is the same as the existing cell's
1978    /// by identity, no update is performed.
1979    ///
1980    /// The [`SharedReference`] is expected to use the `<T::Read as
1981    /// VcRead<T>>::Repr` type for its representation of the value.
1982    pub fn update_with_shared_reference(&self, shared_ref: SharedReference) {
1983        let tt = turbo_tasks();
1984        let content = tt
1985            .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
1986            .ok();
1987        let update = if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1988            // pointer equality (not value equality)
1989            shared_ref_exp != shared_ref
1990        } else {
1991            true
1992        };
1993        if update {
1994            tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(shared_ref)))
1995        }
1996    }
1997}
1998
1999impl From<CurrentCellRef> for RawVc {
2000    fn from(cell: CurrentCellRef) -> Self {
2001        RawVc::TaskCell(cell.current_task, cell.index)
2002    }
2003}
2004
2005pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
2006    CURRENT_TASK_STATE.with(|ts| {
2007        let current_task = current_task("celling turbo_tasks values");
2008        let mut ts = ts.write().unwrap();
2009        let map = ts.cell_counters.as_mut().unwrap();
2010        let current_index = map.entry(ty).or_default();
2011        let index = *current_index;
2012        *current_index += 1;
2013        CurrentCellRef {
2014            current_task,
2015            index: CellId { type_id: ty, index },
2016        }
2017    })
2018}
2019
2020pub(crate) async fn read_local_output(
2021    this: &dyn TurboTasksApi,
2022    execution_id: ExecutionId,
2023    local_task_id: LocalTaskId,
2024) -> Result<RawVc> {
2025    loop {
2026        match this.try_read_local_output(execution_id, local_task_id)? {
2027            Ok(raw_vc) => return Ok(raw_vc),
2028            Err(event_listener) => event_listener.await,
2029        }
2030    }
2031}