turbo_tasks/
manager.rs

1use std::{
2    any::Any,
3    future::Future,
4    hash::BuildHasherDefault,
5    mem::take,
6    pin::Pin,
7    sync::{
8        Arc, Mutex, RwLock, Weak,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11    thread,
12    time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use rustc_hash::FxHasher;
18use serde::{Deserialize, Serialize};
19use smallvec::SmallVec;
20use tokio::{runtime::Handle, select, sync::mpsc::Receiver, task_local};
21use tokio_util::task::TaskTracker;
22use tracing::{Instrument, Level, Span, info_span, instrument, trace_span};
23use turbo_tasks_malloc::TurboMalloc;
24
25use crate::{
26    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
27    ResolvedVc, SharedReference, TaskId, TaskIdSet, TraitMethod, ValueTypeId, Vc, VcRead,
28    VcValueTrait, VcValueType,
29    backend::{
30        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
31        TransientTaskType, TurboTasksExecutionError, TypedCellContent,
32    },
33    capture_future::{self, CaptureFuture},
34    event::{Event, EventListener},
35    id::{BackendJobId, ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
36    id_factory::IdFactoryWithReuse,
37    macro_helpers::NativeFunction,
38    magic_any::MagicAny,
39    message_queue::{CompilationEvent, CompilationEventQueue},
40    raw_vc::{CellId, RawVc},
41    registry,
42    serialization_invalidation::SerializationInvalidator,
43    task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
44    task_statistics::TaskStatisticsApi,
45    trace::TraceRawVcs,
46    util::{IdFactory, StaticOrArc},
47    vc::ReadVcFuture,
48};
49
50/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
51/// tasks from function calls.
52pub trait TurboTasksCallApi: Sync + Send {
53    /// Calls a native function with arguments. Resolves arguments when needed
54    /// with a wrapper task.
55    fn dynamic_call(
56        &self,
57        native_fn: &'static NativeFunction,
58        this: Option<RawVc>,
59        arg: Box<dyn MagicAny>,
60        persistence: TaskPersistence,
61    ) -> RawVc;
62    /// Call a native function with arguments.
63    /// All inputs must be resolved.
64    fn native_call(
65        &self,
66        native_fn: &'static NativeFunction,
67        this: Option<RawVc>,
68        arg: Box<dyn MagicAny>,
69        persistence: TaskPersistence,
70    ) -> RawVc;
71    /// Calls a trait method with arguments. First input is the `self` object.
72    /// Uses a wrapper task to resolve
73    fn trait_call(
74        &self,
75        trait_method: &'static TraitMethod,
76        this: RawVc,
77        arg: Box<dyn MagicAny>,
78        persistence: TaskPersistence,
79    ) -> RawVc;
80
81    fn run_once(
82        &self,
83        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
84    ) -> TaskId;
85    fn run_once_with_reason(
86        &self,
87        reason: StaticOrArc<dyn InvalidationReason>,
88        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89    ) -> TaskId;
90    fn run_once_process(
91        &self,
92        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
93    ) -> TaskId;
94}
95
96/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
97/// context. Returned by the [`turbo_tasks`] helper function.
98///
99/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
100/// parameter.
101pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
102    fn invalidate(&self, task: TaskId);
103    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
104
105    fn invalidate_serialization(&self, task: TaskId);
106
107    /// Eagerly notifies all tasks that were scheduled for notifications via
108    /// `schedule_notify_tasks_set()`
109    fn notify_scheduled_tasks(&self);
110
111    fn try_read_task_output(
112        &self,
113        task: TaskId,
114        consistency: ReadConsistency,
115    ) -> Result<Result<RawVc, EventListener>>;
116
117    /// INVALIDATION: Be careful with this, it will not track dependencies, so
118    /// using it could break cache invalidation.
119    fn try_read_task_output_untracked(
120        &self,
121        task: TaskId,
122        consistency: ReadConsistency,
123    ) -> Result<Result<RawVc, EventListener>>;
124
125    fn try_read_task_cell(
126        &self,
127        task: TaskId,
128        index: CellId,
129        options: ReadCellOptions,
130    ) -> Result<Result<TypedCellContent, EventListener>>;
131
132    /// INVALIDATION: Be careful with this, it will not track dependencies, so
133    /// using it could break cache invalidation.
134    fn try_read_task_cell_untracked(
135        &self,
136        task: TaskId,
137        index: CellId,
138        options: ReadCellOptions,
139    ) -> Result<Result<TypedCellContent, EventListener>>;
140
141    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
142    /// task points to.
143    ///
144    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
145    /// recursively or in a loop.
146    ///
147    /// This does not accept a consistency argument, as you cannot control consistency of a read of
148    /// an operation owned by your own task. Strongly consistent reads are only allowed on
149    /// [`OperationVc`]s, which should never be local tasks.
150    ///
151    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
152    /// task to depend on itself.
153    ///
154    /// [`OperationVc`]: crate::OperationVc
155    fn try_read_local_output(
156        &self,
157        execution_id: ExecutionId,
158        local_task_id: LocalTaskId,
159    ) -> Result<Result<RawVc, EventListener>>;
160
161    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
162
163    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
164    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
165    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
166
167    /// INVALIDATION: Be careful with this, it will not track dependencies, so
168    /// using it could break cache invalidation.
169    fn try_read_own_task_cell_untracked(
170        &self,
171        current_task: TaskId,
172        index: CellId,
173        options: ReadCellOptions,
174    ) -> Result<TypedCellContent>;
175
176    fn read_own_task_cell(
177        &self,
178        task: TaskId,
179        index: CellId,
180        options: ReadCellOptions,
181    ) -> Result<TypedCellContent>;
182    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
183    fn mark_own_task_as_finished(&self, task: TaskId);
184    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
185    fn mark_own_task_as_session_dependent(&self, task: TaskId);
186
187    fn connect_task(&self, task: TaskId);
188
189    /// Wraps the given future in the current task.
190    ///
191    /// Beware: this method is not safe to use in production code. It is only intended for use in
192    /// tests and for debugging purposes.
193    fn detached_for_testing(
194        &self,
195        f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
196    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
197
198    fn task_statistics(&self) -> &TaskStatisticsApi;
199
200    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
201
202    fn subscribe_to_compilation_events(
203        &self,
204        event_types: Option<Vec<String>>,
205    ) -> Receiver<Arc<dyn CompilationEvent>>;
206    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
207}
208
209/// A wrapper around a value that is unused.
210pub struct Unused<T> {
211    inner: T,
212}
213
214impl<T> Unused<T> {
215    /// Creates a new unused value.
216    ///
217    /// # Safety
218    ///
219    /// The wrapped value must not be used.
220    pub unsafe fn new_unchecked(inner: T) -> Self {
221        Self { inner }
222    }
223
224    /// Get the inner value, without consuming the `Unused` wrapper.
225    ///
226    /// # Safety
227    ///
228    /// The user need to make sure that the value stays unused.
229    pub unsafe fn get_unchecked(&self) -> &T {
230        &self.inner
231    }
232
233    /// Unwraps the value, consuming the `Unused` wrapper.
234    pub fn into(self) -> T {
235        self.inner
236    }
237}
238
239/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
240pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
241    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
242
243    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
244    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
245    /// # Safety
246    ///
247    /// The caller must ensure that the task id is not used anymore.
248    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
249    /// # Safety
250    ///
251    /// The caller must ensure that the task id is not used anymore.
252    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
253
254    fn schedule(&self, task: TaskId);
255    fn schedule_backend_background_job(&self, id: BackendJobId);
256    fn schedule_backend_foreground_job(&self, id: BackendJobId);
257
258    fn try_foreground_done(&self) -> Result<(), EventListener>;
259    fn wait_foreground_done_excluding_own<'a>(
260        &'a self,
261    ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>>;
262
263    /// Enqueues tasks for notification of changed dependencies. This will
264    /// eventually call `invalidate_tasks()` on all tasks.
265    fn schedule_notify_tasks(&self, tasks: &[TaskId]);
266
267    /// Enqueues tasks for notification of changed dependencies. This will
268    /// eventually call `invalidate_tasks()` on all tasks.
269    fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet);
270
271    /// Returns the duration from the start of the program to the given instant.
272    fn program_duration_until(&self, instant: Instant) -> Duration;
273
274    /// An untyped object-safe version of [`TurboTasksBackendApiExt::read_task_state`]. Callers
275    /// should prefer the extension trait's version of this method.
276    fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState));
277
278    /// An untyped object-safe version of [`TurboTasksBackendApiExt::write_task_state`]. Callers
279    /// should prefer the extension trait's version of this method.
280    fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState));
281
282    /// Returns true if the system is idle.
283    fn is_idle(&self) -> bool;
284
285    /// Returns a reference to the backend.
286    fn backend(&self) -> &B;
287}
288
289/// An extension trait for methods of [`TurboTasksBackendApi`] that are not object-safe. This is
290/// automatically implemented for all [`TurboTasksBackendApi`]s using a blanket impl.
291pub trait TurboTasksBackendApiExt<B: Backend + 'static>: TurboTasksBackendApi<B> {
292    /// Allows modification of the [`Backend::TaskState`].
293    ///
294    /// This function holds open a non-exclusive read lock that blocks writes, so `func` is expected
295    /// to execute quickly in order to release the lock.
296    fn read_task_state<T>(&self, func: impl FnOnce(&B::TaskState) -> T) -> T {
297        let mut func = Some(func);
298        let mut out = None;
299        self.read_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
300        out.expect("read_task_state_dyn must call `func`")
301    }
302
303    /// Allows modification of the [`Backend::TaskState`].
304    ///
305    /// This function holds open a write lock, so `func` is expected to execute quickly in order to
306    /// release the lock.
307    fn write_task_state<T>(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T {
308        let mut func = Some(func);
309        let mut out = None;
310        self.write_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
311        out.expect("write_task_state_dyn must call `func`")
312    }
313}
314
315impl<TT, B> TurboTasksBackendApiExt<B> for TT
316where
317    TT: TurboTasksBackendApi<B> + ?Sized,
318    B: Backend + 'static,
319{
320}
321
322#[allow(clippy::manual_non_exhaustive)]
323pub struct UpdateInfo {
324    pub duration: Duration,
325    pub tasks: usize,
326    pub reasons: InvalidationReasonSet,
327    #[allow(dead_code)]
328    placeholder_for_future_fields: (),
329}
330
331#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
332pub enum TaskPersistence {
333    /// Tasks that may be persisted across sessions using serialization.
334    Persistent,
335
336    /// Tasks that will be persisted in memory for the life of this session, but won't persist
337    /// between sessions.
338    ///
339    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
340    /// type [`TransientValue`][crate::value::TransientValue] or
341    /// [`TransientInstance`][crate::value::TransientInstance].
342    Transient,
343
344    /// Tasks that are persisted only for the lifetime of the nearest non-`Local` parent caller.
345    ///
346    /// This task does not have a unique task id, and is not shared with the backend. Instead it
347    /// uses the parent task's id.
348    ///
349    /// This is useful for functions that have a low cache hit rate. Those functions could be
350    /// converted to non-task functions, but that would break their function signature. This
351    /// provides a mechanism for skipping caching without changing the function signature.
352    Local,
353}
354
355#[derive(Clone, Copy, Debug, Eq, PartialEq)]
356pub enum ReadConsistency {
357    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
358    /// may later trigger re-computation.
359    Eventual,
360    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
361    /// the cost of slower reads.
362    ///
363    /// Top-level code that returns data to the user should use strongly consistent reads.
364    Strong,
365}
366
367pub struct TurboTasks<B: Backend + 'static> {
368    this: Weak<Self>,
369    backend: B,
370    task_id_factory: IdFactoryWithReuse<TaskId>,
371    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
372    execution_id_factory: IdFactory<ExecutionId>,
373    stopped: AtomicBool,
374    currently_scheduled_tasks: AtomicUsize,
375    currently_scheduled_foreground_jobs: AtomicUsize,
376    currently_scheduled_background_jobs: AtomicUsize,
377    scheduled_tasks: AtomicUsize,
378    start: Mutex<Option<Instant>>,
379    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
380    event: Event,
381    event_start: Event,
382    event_foreground: Event,
383    event_background: Event,
384    program_start: Instant,
385    compilation_events: CompilationEventQueue,
386}
387
388/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
389/// all share the same non-local task state.
390///
391/// A non-local task is one that:
392///
393/// - Has a unique task id.
394/// - Is potentially cached.
395/// - The backend is aware of.
396struct CurrentTaskState {
397    task_id: TaskId,
398    execution_id: ExecutionId,
399
400    /// Affected tasks, that are tracked during task execution. These tasks will
401    /// be invalidated when the execution finishes or before reading a cell
402    /// value.
403    tasks_to_notify: SmallVec<[TaskId; 4]>,
404
405    /// True if the current task has state in cells
406    stateful: bool,
407
408    /// True if the current task uses an external invalidator
409    has_invalidator: bool,
410
411    /// Tracks how many cells of each type has been allocated so far during this task execution.
412    /// When a task is re-executed, the cell count may not match the existing cell vec length.
413    ///
414    /// This is taken (and becomes `None`) during teardown of a task.
415    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
416
417    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
418    local_tasks: Vec<LocalTask>,
419
420    /// Tracks currently running local tasks, and defers cleanup of the global task until those
421    /// complete. Also used by `detached_for_testing`.
422    local_task_tracker: TaskTracker,
423
424    backend_state: Box<dyn Any + Send + Sync>,
425}
426
427impl CurrentTaskState {
428    fn new(
429        task_id: TaskId,
430        execution_id: ExecutionId,
431        backend_state: Box<dyn Any + Send + Sync>,
432    ) -> Self {
433        Self {
434            task_id,
435            execution_id,
436            tasks_to_notify: SmallVec::new(),
437            stateful: false,
438            has_invalidator: false,
439            cell_counters: Some(AutoMap::default()),
440            local_tasks: Vec::new(),
441            local_task_tracker: TaskTracker::new(),
442            backend_state,
443        }
444    }
445
446    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
447        if self.execution_id != expected_execution_id {
448            panic!(
449                "Local tasks can only be scheduled/awaited within the same execution of the \
450                 parent task that created them"
451            );
452        }
453    }
454
455    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
456        self.local_tasks.push(local_task);
457        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
458        if cfg!(debug_assertions) {
459            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
460        } else {
461            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
462        }
463    }
464
465    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
466        // local task ids are one-indexed (they use NonZeroU32)
467        &self.local_tasks[(*local_task_id as usize) - 1]
468    }
469
470    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
471        &mut self.local_tasks[(*local_task_id as usize) - 1]
472    }
473}
474
475// TODO implement our own thread pool and make these thread locals instead
476task_local! {
477    /// The current TurboTasks instance
478    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
479
480    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
481}
482
483impl<B: Backend + 'static> TurboTasks<B> {
484    // TODO better lifetime management for turbo tasks
485    // consider using unsafe for the task_local turbo tasks
486    // that should be safe as long tasks can't outlife turbo task
487    // so we probably want to make sure that all tasks are joined
488    // when trying to drop turbo tasks
489    pub fn new(backend: B) -> Arc<Self> {
490        let task_id_factory = IdFactoryWithReuse::new(
491            TaskId::MIN,
492            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
493        );
494        let transient_task_id_factory =
495            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
496        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
497        let this = Arc::new_cyclic(|this| Self {
498            this: this.clone(),
499            backend,
500            task_id_factory,
501            transient_task_id_factory,
502            execution_id_factory,
503            stopped: AtomicBool::new(false),
504            currently_scheduled_tasks: AtomicUsize::new(0),
505            currently_scheduled_background_jobs: AtomicUsize::new(0),
506            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
507            scheduled_tasks: AtomicUsize::new(0),
508            start: Default::default(),
509            aggregated_update: Default::default(),
510            event: Event::new(|| "TurboTasks::event".to_string()),
511            event_start: Event::new(|| "TurboTasks::event_start".to_string()),
512            event_foreground: Event::new(|| "TurboTasks::event_foreground".to_string()),
513            event_background: Event::new(|| "TurboTasks::event_background".to_string()),
514            program_start: Instant::now(),
515            compilation_events: CompilationEventQueue::default(),
516        });
517        this.backend.startup(&*this);
518        this
519    }
520
521    pub fn pin(&self) -> Arc<Self> {
522        self.this.upgrade().unwrap()
523    }
524
525    /// Creates a new root task
526    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
527    where
528        T: ?Sized,
529        F: Fn() -> Fut + Send + Sync + Clone + 'static,
530        Fut: Future<Output = Result<Vc<T>>> + Send,
531    {
532        let id = self.backend.create_transient_task(
533            TransientTaskType::Root(Box::new(move || {
534                let functor = functor.clone();
535                Box::pin(async move {
536                    let raw_vc = functor().await?.node;
537                    raw_vc.to_non_local().await
538                })
539            })),
540            self,
541        );
542        self.schedule(id);
543        id
544    }
545
546    pub fn dispose_root_task(&self, task_id: TaskId) {
547        self.backend.dispose_root_task(task_id, self);
548    }
549
550    // TODO make sure that all dependencies settle before reading them
551    /// Creates a new root task, that is only executed once.
552    /// Dependencies will not invalidate the task.
553    #[track_caller]
554    pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
555    where
556        T: ?Sized,
557        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
558    {
559        let id = self.backend.create_transient_task(
560            TransientTaskType::Once(Box::pin(async move {
561                let raw_vc = future.await?.node;
562                raw_vc.to_non_local().await
563            })),
564            self,
565        );
566        self.schedule(id);
567        id
568    }
569
570    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
571        &self,
572        future: impl Future<Output = Result<T>> + Send + 'static,
573    ) -> Result<T> {
574        let (tx, rx) = tokio::sync::oneshot::channel();
575        let task_id = self.spawn_once_task(async move {
576            let result = future.await?;
577            tx.send(result)
578                .map_err(|_| anyhow!("unable to send result"))?;
579            Ok(Completion::new())
580        });
581        // INVALIDATION: A Once task will never invalidate, therefore we don't need to
582        // track a dependency
583        let raw_result =
584            read_task_output_untracked(self, task_id, ReadConsistency::Eventual).await?;
585        turbo_tasks_future_scope(
586            self.pin(),
587            ReadVcFuture::<Completion>::from(raw_result.into_read().untracked()),
588        )
589        .await?;
590
591        Ok(rx.await?)
592    }
593
594    pub(crate) fn native_call(
595        &self,
596        native_fn: &'static NativeFunction,
597        this: Option<RawVc>,
598        arg: Box<dyn MagicAny>,
599        persistence: TaskPersistence,
600    ) -> RawVc {
601        match persistence {
602            TaskPersistence::Local => {
603                let task_type = LocalTaskSpec {
604                    task_type: LocalTaskType::Native { native_fn },
605                    this,
606                    arg,
607                };
608                self.schedule_local_task(task_type, persistence)
609            }
610            TaskPersistence::Transient => {
611                let task_type = CachedTaskType {
612                    native_fn,
613                    this,
614                    arg,
615                };
616
617                RawVc::TaskOutput(self.backend.get_or_create_transient_task(
618                    task_type,
619                    current_task("turbo_function calls"),
620                    self,
621                ))
622            }
623            TaskPersistence::Persistent => {
624                let task_type = CachedTaskType {
625                    native_fn,
626                    this,
627                    arg,
628                };
629
630                RawVc::TaskOutput(self.backend.get_or_create_persistent_task(
631                    task_type,
632                    current_task("turbo_function calls"),
633                    self,
634                ))
635            }
636        }
637    }
638
639    pub fn dynamic_call(
640        &self,
641        native_fn: &'static NativeFunction,
642        this: Option<RawVc>,
643        arg: Box<dyn MagicAny>,
644        persistence: TaskPersistence,
645    ) -> RawVc {
646        if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
647            return self.native_call(native_fn, this, arg, persistence);
648        }
649        let task_type = LocalTaskSpec {
650            task_type: LocalTaskType::ResolveNative { native_fn },
651            this,
652            arg,
653        };
654        self.schedule_local_task(task_type, persistence)
655    }
656
657    pub fn trait_call(
658        &self,
659        trait_method: &'static TraitMethod,
660        this: RawVc,
661        arg: Box<dyn MagicAny>,
662        persistence: TaskPersistence,
663    ) -> RawVc {
664        // avoid creating a wrapper task if self is already resolved
665        // for resolved cells we already know the value type so we can lookup the
666        // function
667        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
668            match registry::get_value_type(type_id).get_trait_method(trait_method) {
669                Some(native_fn) => {
670                    let arg = native_fn.arg_meta.filter_owned(arg);
671                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
672                }
673                None => {
674                    // We are destined to fail at this point, but we just retry resolution in the
675                    // local task since we cannot report an error from here.
676                    // TODO: A panic seems appropriate since the immediate caller is to blame
677                }
678            }
679        }
680
681        // create a wrapper task to resolve all inputs
682        let task_type = LocalTaskSpec {
683            task_type: LocalTaskType::ResolveTrait { trait_method },
684            this: Some(this),
685            arg,
686        };
687
688        self.schedule_local_task(task_type, persistence)
689    }
690
691    #[track_caller]
692    pub(crate) fn schedule(&self, task_id: TaskId) {
693        self.begin_primary_job();
694        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
695
696        let this = self.pin();
697        let future = async move {
698            let mut schedule_again = true;
699            while schedule_again {
700                let backend_state = this.backend.new_task_state(task_id);
701                // it's okay for execution ids to overflow and wrap, they're just used for an assert
702                let execution_id = this.execution_id_factory.wrapping_get();
703                let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
704                    task_id,
705                    execution_id,
706                    Box::new(backend_state),
707                )));
708                let single_execution_future = async {
709                    if this.stopped.load(Ordering::Acquire) {
710                        this.backend.task_execution_canceled(task_id, &*this);
711                        return false;
712                    }
713
714                    let Some(TaskExecutionSpec { future, span }) =
715                        this.backend.try_start_task_execution(task_id, &*this)
716                    else {
717                        return false;
718                    };
719
720                    async {
721                        let (result, duration, memory_usage) = CaptureFuture::new(future).await;
722
723                        // wait for all spawned local tasks using `local` to finish
724                        let ltt = CURRENT_TASK_STATE
725                            .with(|ts| ts.read().unwrap().local_task_tracker.clone());
726                        ltt.close();
727                        ltt.wait().await;
728
729                        let result = match result {
730                            Ok(Ok(raw_vc)) => Ok(raw_vc),
731                            Ok(Err(err)) => Err(err.into()),
732                            Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
733                        };
734
735                        this.backend.task_execution_result(task_id, result, &*this);
736                        let FinishedTaskState {
737                            stateful,
738                            has_invalidator,
739                        } = this.finish_current_task_state();
740                        let cell_counters = CURRENT_TASK_STATE
741                            .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
742                        let schedule_again = this.backend.task_execution_completed(
743                            task_id,
744                            duration,
745                            memory_usage,
746                            &cell_counters,
747                            stateful,
748                            has_invalidator,
749                            &*this,
750                        );
751                        // task_execution_completed might need to notify tasks
752                        this.notify_scheduled_tasks();
753                        schedule_again
754                    }
755                    .instrument(span)
756                    .await
757                };
758                schedule_again = CURRENT_TASK_STATE
759                    .scope(current_task_state, single_execution_future)
760                    .await;
761            }
762            this.finish_primary_job();
763            anyhow::Ok(())
764        };
765
766        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
767
768        #[cfg(feature = "tokio_tracing")]
769        {
770            let description = self.backend.get_task_description(task_id);
771            tokio::task::Builder::new()
772                .name(&description)
773                .spawn(future)
774                .unwrap();
775        }
776        #[cfg(not(feature = "tokio_tracing"))]
777        tokio::task::spawn(future);
778    }
779
780    fn schedule_local_task(
781        &self,
782        ty: LocalTaskSpec,
783        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
784        // if this is a `LocalTaskType::Native`, persistence is unused.
785        //
786        // TODO: In the rare case that we're crossing a transient->persistent boundary, we should
787        // force `LocalTaskType::Native` to be spawned as real tasks, so that any cells they create
788        // have the correct persistence. This is not an issue for resolution stub task, as they
789        // don't end up owning any cells.
790        persistence: TaskPersistence,
791    ) -> RawVc {
792        let task_type = ty.task_type;
793        let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
794            .with(|gts| {
795                let mut gts_write = gts.write().unwrap();
796                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
797                    done_event: Event::new({
798                        move || format!("LocalTask({task_type})::done_event")
799                    }),
800                });
801                (
802                    Arc::clone(gts),
803                    gts_write.task_id,
804                    gts_write.execution_id,
805                    local_task_id,
806                )
807            });
808
809        #[cfg(feature = "tokio_tracing")]
810        let description = format!(
811            "[local] (parent: {}) {}",
812            self.backend.get_task_description(parent_task_id),
813            ty,
814        );
815        #[cfg(not(feature = "tokio_tracing"))]
816        let _ = parent_task_id; // suppress unused variable warning
817
818        let this = self.pin();
819        let future = async move {
820            let TaskExecutionSpec { future, span } =
821                crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
822            async move {
823                let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
824
825                let result = match result {
826                    Ok(Ok(raw_vc)) => Ok(raw_vc),
827                    Ok(Err(err)) => Err(err.into()),
828                    Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
829                };
830
831                let local_task = LocalTask::Done {
832                    output: match result {
833                        Ok(raw_vc) => OutputContent::Link(raw_vc),
834                        Err(err) => OutputContent::Error(err.task_context(task_type)),
835                    },
836                };
837
838                let done_event = CURRENT_TASK_STATE.with(move |gts| {
839                    let mut gts_write = gts.write().unwrap();
840                    let scheduled_task =
841                        std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
842                    let LocalTask::Scheduled { done_event } = scheduled_task else {
843                        panic!("local task finished, but was not in the scheduled state?");
844                    };
845                    done_event
846                });
847                done_event.notify(usize::MAX)
848            }
849            .instrument(span)
850            .await
851        };
852        let future = global_task_state
853            .read()
854            .unwrap()
855            .local_task_tracker
856            .track_future(future);
857        let future = CURRENT_TASK_STATE.scope(global_task_state, future);
858        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
859
860        #[cfg(feature = "tokio_tracing")]
861        tokio::task::Builder::new()
862            .name(&description)
863            .spawn(future)
864            .unwrap();
865        #[cfg(not(feature = "tokio_tracing"))]
866        tokio::task::spawn(future);
867
868        RawVc::LocalOutput(execution_id, local_task_id, persistence)
869    }
870
871    fn begin_primary_job(&self) {
872        if self
873            .currently_scheduled_tasks
874            .fetch_add(1, Ordering::AcqRel)
875            == 0
876        {
877            *self.start.lock().unwrap() = Some(Instant::now());
878            self.event_start.notify(usize::MAX);
879            self.backend.idle_end(self);
880        }
881    }
882
883    fn begin_foreground_job(&self) {
884        self.begin_primary_job();
885        self.currently_scheduled_foreground_jobs
886            .fetch_add(1, Ordering::AcqRel);
887    }
888
889    fn finish_primary_job(&self) {
890        if self
891            .currently_scheduled_tasks
892            .fetch_sub(1, Ordering::AcqRel)
893            == 1
894        {
895            self.backend.idle_start(self);
896            // That's not super race-condition-safe, but it's only for
897            // statistical reasons
898            let total = self.scheduled_tasks.load(Ordering::Acquire);
899            self.scheduled_tasks.store(0, Ordering::Release);
900            if let Some(start) = *self.start.lock().unwrap() {
901                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
902                if let Some(update) = update.as_mut() {
903                    update.0 += start.elapsed();
904                    update.1 += total;
905                } else {
906                    *update = Some((start.elapsed(), total));
907                }
908            }
909            self.event.notify(usize::MAX);
910        }
911    }
912
913    fn finish_foreground_job(&self) {
914        if self
915            .currently_scheduled_foreground_jobs
916            .fetch_sub(1, Ordering::AcqRel)
917            == 1
918        {
919            self.event_foreground.notify(usize::MAX);
920        }
921        self.finish_primary_job();
922    }
923
924    pub async fn wait_foreground_done(&self) {
925        if self
926            .currently_scheduled_foreground_jobs
927            .load(Ordering::Acquire)
928            == 0
929        {
930            return;
931        }
932        let listener = self.event_foreground.listen();
933        if self
934            .currently_scheduled_foreground_jobs
935            .load(Ordering::Acquire)
936            == 0
937        {
938            return;
939        }
940        listener
941            .instrument(trace_span!("wait_foreground_done"))
942            .await;
943    }
944
945    pub fn get_in_progress_count(&self) -> usize {
946        self.currently_scheduled_tasks.load(Ordering::Acquire)
947    }
948
949    /// Waits for the given task to finish executing. This works by performing an untracked read,
950    /// and discarding the value of the task output.
951    ///
952    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
953    /// before all dependencies have completely settled.
954    ///
955    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
956    /// to fully settle before returning.
957    ///
958    /// As this function is typically called in top-level code that waits for results to be ready
959    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
960    pub async fn wait_task_completion(
961        &self,
962        id: TaskId,
963        consistency: ReadConsistency,
964    ) -> Result<()> {
965        // INVALIDATION: This doesn't return a value, only waits for it to be ready.
966        read_task_output_untracked(self, id, consistency).await?;
967        Ok(())
968    }
969
970    /// Returns [UpdateInfo] with all updates aggregated over a given duration
971    /// (`aggregation`). Will wait until an update happens.
972    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
973        self.aggregated_update_info(aggregation, Duration::MAX)
974            .await
975            .unwrap()
976    }
977
978    /// Returns [UpdateInfo] with all updates aggregated over a given duration
979    /// (`aggregation`). Will only return None when the timeout is reached while
980    /// waiting for the first update.
981    pub async fn aggregated_update_info(
982        &self,
983        aggregation: Duration,
984        timeout: Duration,
985    ) -> Option<UpdateInfo> {
986        let listener = self
987            .event
988            .listen_with_note(|| "wait for update info".to_string());
989        let wait_for_finish = {
990            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
991            if aggregation.is_zero() {
992                if let Some((duration, tasks)) = update.take() {
993                    return Some(UpdateInfo {
994                        duration,
995                        tasks,
996                        reasons: take(reason_set),
997                        placeholder_for_future_fields: (),
998                    });
999                } else {
1000                    true
1001                }
1002            } else {
1003                update.is_none()
1004            }
1005        };
1006        if wait_for_finish {
1007            if timeout == Duration::MAX {
1008                // wait for finish
1009                listener.await;
1010            } else {
1011                // wait for start, then wait for finish or timeout
1012                let start_listener = self
1013                    .event_start
1014                    .listen_with_note(|| "wait for update info".to_string());
1015                if self.currently_scheduled_tasks.load(Ordering::Acquire) == 0 {
1016                    start_listener.await;
1017                } else {
1018                    drop(start_listener);
1019                }
1020                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1021                    // Timeout
1022                    return None;
1023                }
1024            }
1025        }
1026        if !aggregation.is_zero() {
1027            loop {
1028                select! {
1029                    () = tokio::time::sleep(aggregation) => {
1030                        break;
1031                    }
1032                    () = self.event.listen_with_note(|| "wait for update info".to_string()) => {
1033                        // Resets the sleep
1034                    }
1035                }
1036            }
1037        }
1038        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1039        if let Some((duration, tasks)) = update.take() {
1040            Some(UpdateInfo {
1041                duration,
1042                tasks,
1043                reasons: take(reason_set),
1044                placeholder_for_future_fields: (),
1045            })
1046        } else {
1047            panic!("aggregated_update_info must not called concurrently")
1048        }
1049    }
1050
1051    pub async fn wait_background_done(&self) {
1052        let listener = self.event_background.listen();
1053        if self
1054            .currently_scheduled_background_jobs
1055            .load(Ordering::Acquire)
1056            != 0
1057        {
1058            listener.await;
1059        }
1060    }
1061
1062    pub async fn stop_and_wait(&self) {
1063        self.backend.stopping(self);
1064        self.stopped.store(true, Ordering::Release);
1065        {
1066            let listener = self.event.listen_with_note(|| "wait for stop".to_string());
1067            if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1068                listener.await;
1069            }
1070        }
1071        {
1072            let listener = self.event_background.listen();
1073            if self
1074                .currently_scheduled_background_jobs
1075                .load(Ordering::Acquire)
1076                != 0
1077            {
1078                listener.await;
1079            }
1080        }
1081        self.backend.stop(self);
1082    }
1083
1084    #[track_caller]
1085    pub(crate) fn schedule_background_job<
1086        T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1087        F: Future<Output = ()> + Send + 'static,
1088    >(
1089        &self,
1090        func: T,
1091    ) {
1092        let this = self.pin();
1093        self.currently_scheduled_background_jobs
1094            .fetch_add(1, Ordering::AcqRel);
1095        tokio::spawn(
1096            TURBO_TASKS
1097                .scope(this.clone(), async move {
1098                    while this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1099                        let listener = this.event.listen_with_note(|| {
1100                            "background job waiting for execution".to_string()
1101                        });
1102                        if this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1103                            listener.await;
1104                        }
1105                    }
1106                    let this2 = this.clone();
1107                    if !this.stopped.load(Ordering::Acquire) {
1108                        func(this).await;
1109                    }
1110                    if this2
1111                        .currently_scheduled_background_jobs
1112                        .fetch_sub(1, Ordering::AcqRel)
1113                        == 1
1114                    {
1115                        this2.event_background.notify(usize::MAX);
1116                    }
1117                })
1118                .in_current_span(),
1119        );
1120    }
1121
1122    #[track_caller]
1123    pub(crate) fn schedule_foreground_job<
1124        T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1125        F: Future<Output = ()> + Send + 'static,
1126    >(
1127        &self,
1128        func: T,
1129    ) {
1130        let this = self.pin();
1131        this.begin_foreground_job();
1132        tokio::spawn(
1133            TURBO_TASKS
1134                .scope(this.clone(), async move {
1135                    if !this.stopped.load(Ordering::Acquire) {
1136                        func(this.clone()).await;
1137                    }
1138                    this.finish_foreground_job();
1139                })
1140                .in_current_span(),
1141        );
1142    }
1143
1144    fn finish_current_task_state(&self) -> FinishedTaskState {
1145        let (stateful, has_invalidator, tasks) = CURRENT_TASK_STATE.with(|cell| {
1146            let CurrentTaskState {
1147                tasks_to_notify,
1148                stateful,
1149                has_invalidator,
1150                ..
1151            } = &mut *cell.write().unwrap();
1152            (*stateful, *has_invalidator, take(tasks_to_notify))
1153        });
1154
1155        if !tasks.is_empty() {
1156            self.backend.invalidate_tasks(&tasks, self);
1157        }
1158        FinishedTaskState {
1159            stateful,
1160            has_invalidator,
1161        }
1162    }
1163
1164    pub fn backend(&self) -> &B {
1165        &self.backend
1166    }
1167}
1168
1169struct FinishedTaskState {
1170    /// True if the task has state in cells
1171    stateful: bool,
1172
1173    /// True if the task uses an external invalidator
1174    has_invalidator: bool,
1175}
1176
1177impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1178    fn dynamic_call(
1179        &self,
1180        native_fn: &'static NativeFunction,
1181        this: Option<RawVc>,
1182        arg: Box<dyn MagicAny>,
1183        persistence: TaskPersistence,
1184    ) -> RawVc {
1185        self.dynamic_call(native_fn, this, arg, persistence)
1186    }
1187    fn native_call(
1188        &self,
1189        native_fn: &'static NativeFunction,
1190        this: Option<RawVc>,
1191        arg: Box<dyn MagicAny>,
1192        persistence: TaskPersistence,
1193    ) -> RawVc {
1194        self.native_call(native_fn, this, arg, persistence)
1195    }
1196    fn trait_call(
1197        &self,
1198        trait_method: &'static TraitMethod,
1199        this: RawVc,
1200        arg: Box<dyn MagicAny>,
1201        persistence: TaskPersistence,
1202    ) -> RawVc {
1203        self.trait_call(trait_method, this, arg, persistence)
1204    }
1205
1206    #[track_caller]
1207    fn run_once(
1208        &self,
1209        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1210    ) -> TaskId {
1211        self.spawn_once_task(async move {
1212            future.await?;
1213            Ok(Completion::new())
1214        })
1215    }
1216
1217    #[track_caller]
1218    fn run_once_with_reason(
1219        &self,
1220        reason: StaticOrArc<dyn InvalidationReason>,
1221        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1222    ) -> TaskId {
1223        {
1224            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1225            reason_set.insert(reason);
1226        }
1227        self.spawn_once_task(async move {
1228            future.await?;
1229            Ok(Completion::new())
1230        })
1231    }
1232
1233    #[track_caller]
1234    fn run_once_process(
1235        &self,
1236        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1237    ) -> TaskId {
1238        let this = self.pin();
1239        self.spawn_once_task(async move {
1240            this.finish_primary_job();
1241            future.await?;
1242            this.begin_primary_job();
1243            Ok(Completion::new())
1244        })
1245    }
1246}
1247
1248impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1249    #[instrument(level = Level::INFO, skip_all, name = "invalidate")]
1250    fn invalidate(&self, task: TaskId) {
1251        self.backend.invalidate_task(task, self);
1252    }
1253
1254    #[instrument(level = Level::INFO, skip_all, name = "invalidate", fields(name = display(&reason)))]
1255    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1256        {
1257            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1258            reason_set.insert(reason);
1259        }
1260        self.backend.invalidate_task(task, self);
1261    }
1262
1263    fn invalidate_serialization(&self, task: TaskId) {
1264        self.backend.invalidate_serialization(task, self);
1265    }
1266
1267    fn notify_scheduled_tasks(&self) {
1268        let _ = CURRENT_TASK_STATE.try_with(|cell| {
1269            let tasks = {
1270                let CurrentTaskState {
1271                    tasks_to_notify, ..
1272                } = &mut *cell.write().unwrap();
1273                take(tasks_to_notify)
1274            };
1275            if tasks.is_empty() {
1276                return;
1277            }
1278            self.backend.invalidate_tasks(&tasks, self);
1279        });
1280    }
1281
1282    fn try_read_task_output(
1283        &self,
1284        task: TaskId,
1285        consistency: ReadConsistency,
1286    ) -> Result<Result<RawVc, EventListener>> {
1287        self.backend
1288            .try_read_task_output(task, current_task("reading Vcs"), consistency, self)
1289    }
1290
1291    fn try_read_task_output_untracked(
1292        &self,
1293        task: TaskId,
1294        consistency: ReadConsistency,
1295    ) -> Result<Result<RawVc, EventListener>> {
1296        self.backend
1297            .try_read_task_output_untracked(task, consistency, self)
1298    }
1299
1300    fn try_read_task_cell(
1301        &self,
1302        task: TaskId,
1303        index: CellId,
1304        options: ReadCellOptions,
1305    ) -> Result<Result<TypedCellContent, EventListener>> {
1306        self.backend
1307            .try_read_task_cell(task, index, current_task("reading Vcs"), options, self)
1308    }
1309
1310    fn try_read_task_cell_untracked(
1311        &self,
1312        task: TaskId,
1313        index: CellId,
1314        options: ReadCellOptions,
1315    ) -> Result<Result<TypedCellContent, EventListener>> {
1316        self.backend
1317            .try_read_task_cell_untracked(task, index, options, self)
1318    }
1319
1320    fn try_read_own_task_cell_untracked(
1321        &self,
1322        current_task: TaskId,
1323        index: CellId,
1324        options: ReadCellOptions,
1325    ) -> Result<TypedCellContent> {
1326        self.backend
1327            .try_read_own_task_cell_untracked(current_task, index, options, self)
1328    }
1329
1330    fn try_read_local_output(
1331        &self,
1332        execution_id: ExecutionId,
1333        local_task_id: LocalTaskId,
1334    ) -> Result<Result<RawVc, EventListener>> {
1335        CURRENT_TASK_STATE.with(|gts| {
1336            let gts_read = gts.read().unwrap();
1337
1338            // Local Vcs are local to their parent task's current execution, and do not exist
1339            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1340            // marker trait. This assertion exists to handle any potential escapes that the
1341            // compile-time checks cannot capture.
1342            gts_read.assert_execution_id(execution_id);
1343
1344            match gts_read.get_local_task(local_task_id) {
1345                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1346                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1347            }
1348        })
1349    }
1350
1351    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1352        self.backend.read_task_collectibles(
1353            task,
1354            trait_id,
1355            current_task("reading collectibles"),
1356            self,
1357        )
1358    }
1359
1360    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1361        self.backend.emit_collectible(
1362            trait_type,
1363            collectible,
1364            current_task("emitting collectible"),
1365            self,
1366        );
1367    }
1368
1369    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1370        self.backend.unemit_collectible(
1371            trait_type,
1372            collectible,
1373            count,
1374            current_task("emitting collectible"),
1375            self,
1376        );
1377    }
1378
1379    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1380        for (&collectible, &count) in collectibles {
1381            if count > 0 {
1382                self.backend.unemit_collectible(
1383                    trait_type,
1384                    collectible,
1385                    count as u32,
1386                    current_task("emitting collectible"),
1387                    self,
1388                );
1389            }
1390        }
1391    }
1392
1393    fn read_own_task_cell(
1394        &self,
1395        task: TaskId,
1396        index: CellId,
1397        options: ReadCellOptions,
1398    ) -> Result<TypedCellContent> {
1399        // INVALIDATION: don't need to track a dependency to itself
1400        self.try_read_own_task_cell_untracked(task, index, options)
1401    }
1402
1403    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
1404        self.backend.update_task_cell(task, index, content, self);
1405    }
1406
1407    fn connect_task(&self, task: TaskId) {
1408        self.backend
1409            .connect_task(task, current_task("connecting task"), self);
1410    }
1411
1412    fn mark_own_task_as_finished(&self, task: TaskId) {
1413        self.backend.mark_own_task_as_finished(task, self);
1414    }
1415
1416    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1417        self.backend
1418            .set_own_task_aggregation_number(task, aggregation_number, self);
1419    }
1420
1421    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1422        self.backend.mark_own_task_as_session_dependent(task, self);
1423    }
1424
1425    /// Creates a future that inherits the current task id and task state. The current global task
1426    /// will wait for this future to be dropped before exiting.
1427    fn detached_for_testing(
1428        &self,
1429        fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1430    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1431        // this is similar to what happens for a local task, except that we keep the local task's
1432        // state as well.
1433        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1434        let tracked_fut = {
1435            let ts = global_task_state.read().unwrap();
1436            ts.local_task_tracker.track_future(fut)
1437        };
1438        Box::pin(TURBO_TASKS.scope(
1439            turbo_tasks(),
1440            CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1441        ))
1442    }
1443
1444    fn task_statistics(&self) -> &TaskStatisticsApi {
1445        self.backend.task_statistics()
1446    }
1447
1448    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1449        let this = self.pin();
1450        Box::pin(async move {
1451            this.stop_and_wait().await;
1452        })
1453    }
1454
1455    fn subscribe_to_compilation_events(
1456        &self,
1457        event_types: Option<Vec<String>>,
1458    ) -> Receiver<Arc<dyn CompilationEvent>> {
1459        self.compilation_events.subscribe(event_types)
1460    }
1461
1462    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1463        if let Err(e) = self.compilation_events.send(event) {
1464            tracing::warn!("Failed to send compilation event: {e}");
1465        }
1466    }
1467}
1468
1469impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1470    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1471        self.pin()
1472    }
1473    fn backend(&self) -> &B {
1474        &self.backend
1475    }
1476
1477    #[track_caller]
1478    fn schedule_backend_background_job(&self, id: BackendJobId) {
1479        self.schedule_background_job(move |this| async move {
1480            this.backend.run_backend_job(id, &*this).await;
1481        })
1482    }
1483
1484    #[track_caller]
1485    fn schedule_backend_foreground_job(&self, id: BackendJobId) {
1486        self.schedule_foreground_job(move |this| async move {
1487            this.backend.run_backend_job(id, &*this).await;
1488        })
1489    }
1490
1491    fn try_foreground_done(&self) -> Result<(), EventListener> {
1492        if self
1493            .currently_scheduled_foreground_jobs
1494            .load(Ordering::Acquire)
1495            == 0
1496        {
1497            return Ok(());
1498        }
1499        let listener = self.event_foreground.listen();
1500        if self
1501            .currently_scheduled_foreground_jobs
1502            .load(Ordering::Acquire)
1503            == 0
1504        {
1505            return Ok(());
1506        }
1507        Err(listener)
1508    }
1509
1510    fn wait_foreground_done_excluding_own<'a>(
1511        &'a self,
1512    ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>> {
1513        if self
1514            .currently_scheduled_foreground_jobs
1515            .load(Ordering::Acquire)
1516            == 0
1517        {
1518            return None;
1519        }
1520        Some(Box::pin(async {
1521            self.finish_foreground_job();
1522            self.wait_foreground_done().await;
1523            self.begin_foreground_job();
1524        }))
1525    }
1526
1527    /// Enqueues tasks for notification of changed dependencies. This will
1528    /// eventually call `dependent_cell_updated()` on all tasks.
1529    fn schedule_notify_tasks(&self, tasks: &[TaskId]) {
1530        let result = CURRENT_TASK_STATE.try_with(|cell| {
1531            let CurrentTaskState {
1532                tasks_to_notify, ..
1533            } = &mut *cell.write().unwrap();
1534            tasks_to_notify.extend(tasks.iter().copied());
1535        });
1536        if result.is_err() {
1537            let _guard = trace_span!("schedule_notify_tasks", count = tasks.len()).entered();
1538            self.backend.invalidate_tasks(tasks, self);
1539        }
1540    }
1541
1542    /// Enqueues tasks for notification of changed dependencies. This will
1543    /// eventually call `dependent_cell_updated()` on all tasks.
1544    fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet) {
1545        let result = CURRENT_TASK_STATE.try_with(|cell| {
1546            let CurrentTaskState {
1547                tasks_to_notify, ..
1548            } = &mut *cell.write().unwrap();
1549            tasks_to_notify.extend(tasks.iter().copied());
1550        });
1551        if result.is_err() {
1552            let _guard = trace_span!("schedule_notify_tasks_set", count = tasks.len()).entered();
1553            self.backend.invalidate_tasks_set(tasks, self);
1554        };
1555    }
1556
1557    #[track_caller]
1558    fn schedule(&self, task: TaskId) {
1559        self.schedule(task)
1560    }
1561
1562    fn program_duration_until(&self, instant: Instant) -> Duration {
1563        instant - self.program_start
1564    }
1565
1566    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1567        // SAFETY: This is a fresh id from the factory
1568        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1569    }
1570
1571    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1572        // SAFETY: This is a fresh id from the factory
1573        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1574    }
1575
1576    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1577        unsafe { self.task_id_factory.reuse(id.into()) }
1578    }
1579
1580    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1581        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1582    }
1583
1584    fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState)) {
1585        CURRENT_TASK_STATE
1586            .with(move |ts| func(ts.read().unwrap().backend_state.downcast_ref().unwrap()))
1587    }
1588
1589    fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState)) {
1590        CURRENT_TASK_STATE
1591            .with(move |ts| func(ts.write().unwrap().backend_state.downcast_mut().unwrap()))
1592    }
1593
1594    fn is_idle(&self) -> bool {
1595        self.currently_scheduled_tasks.load(Ordering::Acquire) == 0
1596    }
1597}
1598
1599pub(crate) fn current_task(from: &str) -> TaskId {
1600    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1601        Ok(id) => id,
1602        Err(_) => panic!("{from} can only be used in the context of turbo_tasks task execution"),
1603    }
1604}
1605
1606pub async fn run_once<T: Send + 'static>(
1607    tt: Arc<dyn TurboTasksApi>,
1608    future: impl Future<Output = Result<T>> + Send + 'static,
1609) -> Result<T> {
1610    let (tx, rx) = tokio::sync::oneshot::channel();
1611
1612    let task_id = tt.run_once(Box::pin(async move {
1613        let result = future.await?;
1614        tx.send(result)
1615            .map_err(|_| anyhow!("unable to send result"))?;
1616        Ok(())
1617    }));
1618
1619    // INVALIDATION: A Once task will never invalidate, therefore we don't need to
1620    // track a dependency
1621    let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1622    let raw_future = raw_result.into_read().untracked();
1623    turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1624
1625    Ok(rx.await?)
1626}
1627
1628pub async fn run_once_with_reason<T: Send + 'static>(
1629    tt: Arc<dyn TurboTasksApi>,
1630    reason: impl InvalidationReason,
1631    future: impl Future<Output = Result<T>> + Send + 'static,
1632) -> Result<T> {
1633    let (tx, rx) = tokio::sync::oneshot::channel();
1634
1635    let task_id = tt.run_once_with_reason(
1636        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1637        Box::pin(async move {
1638            let result = future.await?;
1639            tx.send(result)
1640                .map_err(|_| anyhow!("unable to send result"))?;
1641            Ok(())
1642        }),
1643    );
1644
1645    // INVALIDATION: A Once task will never invalidate, therefore we don't need to
1646    // track a dependency
1647    let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1648    let raw_future = raw_result.into_read().untracked();
1649    turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1650
1651    Ok(rx.await?)
1652}
1653
1654/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1655pub fn dynamic_call(
1656    func: &'static NativeFunction,
1657    this: Option<RawVc>,
1658    arg: Box<dyn MagicAny>,
1659    persistence: TaskPersistence,
1660) -> RawVc {
1661    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1662}
1663
1664/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1665pub fn trait_call(
1666    trait_method: &'static TraitMethod,
1667    this: RawVc,
1668    arg: Box<dyn MagicAny>,
1669    persistence: TaskPersistence,
1670) -> RawVc {
1671    with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1672}
1673
1674pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1675    TURBO_TASKS.with(|arc| arc.clone())
1676}
1677
1678pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1679    TURBO_TASKS.with(|arc| func(arc))
1680}
1681
1682pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1683    TURBO_TASKS.sync_scope(tt, f)
1684}
1685
1686pub fn turbo_tasks_future_scope<T>(
1687    tt: Arc<dyn TurboTasksApi>,
1688    f: impl Future<Output = T>,
1689) -> impl Future<Output = T> {
1690    TURBO_TASKS.scope(tt, f)
1691}
1692
1693pub fn with_turbo_tasks_for_testing<T>(
1694    tt: Arc<dyn TurboTasksApi>,
1695    current_task: TaskId,
1696    execution_id: ExecutionId,
1697    f: impl Future<Output = T>,
1698) -> impl Future<Output = T> {
1699    TURBO_TASKS.scope(
1700        tt,
1701        CURRENT_TASK_STATE.scope(
1702            Arc::new(RwLock::new(CurrentTaskState::new(
1703                current_task,
1704                execution_id,
1705                Box::new(()),
1706            ))),
1707            f,
1708        ),
1709    )
1710}
1711
1712/// Spawns the given future within the context of the current task.
1713///
1714/// Beware: this method is not safe to use in production code. It is only
1715/// intended for use in tests and for debugging purposes.
1716pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1717    tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1718}
1719
1720pub fn current_task_for_testing() -> TaskId {
1721    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1722}
1723
1724/// Marks the current task as dirty when restored from persistent cache.
1725pub fn mark_session_dependent() {
1726    with_turbo_tasks(|tt| {
1727        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1728    });
1729}
1730
1731/// Marks the current task as a root in the aggregation graph.  This means it starts with the
1732/// correct aggregation number instead of needing to recompute it after the fact.
1733pub fn mark_root() {
1734    with_turbo_tasks(|tt| {
1735        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1736    });
1737}
1738
1739/// Marks the current task as finished. This excludes it from waiting for
1740/// strongly consistency.
1741pub fn mark_finished() {
1742    with_turbo_tasks(|tt| {
1743        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1744    });
1745}
1746
1747/// Marks the current task as stateful. This prevents the tasks from being
1748/// dropped without persisting the state.
1749///
1750/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1751/// serialization of the current task cells
1752pub fn mark_stateful() -> SerializationInvalidator {
1753    CURRENT_TASK_STATE.with(|cell| {
1754        let CurrentTaskState {
1755            stateful, task_id, ..
1756        } = &mut *cell.write().unwrap();
1757        *stateful = true;
1758        SerializationInvalidator::new(*task_id)
1759    })
1760}
1761
1762pub fn mark_invalidator() {
1763    CURRENT_TASK_STATE.with(|cell| {
1764        let CurrentTaskState {
1765            has_invalidator, ..
1766        } = &mut *cell.write().unwrap();
1767        *has_invalidator = true;
1768    })
1769}
1770
1771pub fn prevent_gc() {
1772    // There is a hack in UpdateCellOperation that need to be updated when this is changed.
1773    mark_stateful();
1774}
1775
1776/// Notifies scheduled tasks for execution.
1777pub fn notify_scheduled_tasks() {
1778    with_turbo_tasks(|tt| tt.notify_scheduled_tasks())
1779}
1780
1781pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1782    with_turbo_tasks(|tt| {
1783        let raw_vc = collectible.node.node;
1784        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1785    })
1786}
1787
1788pub async fn spawn_blocking<T: Send + 'static>(func: impl FnOnce() -> T + Send + 'static) -> T {
1789    let turbo_tasks = turbo_tasks();
1790    let span = Span::current();
1791    let (result, duration, alloc_info) = tokio::task::spawn_blocking(|| {
1792        let _guard = span.entered();
1793        let start = Instant::now();
1794        let start_allocations = TurboMalloc::allocation_counters();
1795        let r = turbo_tasks_scope(turbo_tasks, func);
1796        (r, start.elapsed(), start_allocations.until_now())
1797    })
1798    .await
1799    .unwrap();
1800    capture_future::add_duration(duration);
1801    capture_future::add_allocation_info(alloc_info);
1802    result
1803}
1804
1805pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
1806    let handle = Handle::current();
1807    let span = info_span!("thread").or_current();
1808    thread::spawn(move || {
1809        let span = span.entered();
1810        let guard = handle.enter();
1811        func();
1812        drop(guard);
1813        drop(span);
1814    });
1815}
1816
1817pub(crate) async fn read_task_output(
1818    this: &dyn TurboTasksApi,
1819    id: TaskId,
1820    consistency: ReadConsistency,
1821) -> Result<RawVc> {
1822    loop {
1823        match this.try_read_task_output(id, consistency)? {
1824            Ok(result) => return Ok(result),
1825            Err(listener) => listener.await,
1826        }
1827    }
1828}
1829
1830/// INVALIDATION: Be careful with this, it will not track dependencies, so
1831/// using it could break cache invalidation.
1832pub(crate) async fn read_task_output_untracked(
1833    this: &dyn TurboTasksApi,
1834    id: TaskId,
1835    consistency: ReadConsistency,
1836) -> Result<RawVc> {
1837    loop {
1838        match this.try_read_task_output_untracked(id, consistency)? {
1839            Ok(result) => return Ok(result),
1840            Err(listener) => listener.await,
1841        }
1842    }
1843}
1844
1845pub(crate) async fn read_task_cell(
1846    this: &dyn TurboTasksApi,
1847    id: TaskId,
1848    index: CellId,
1849    options: ReadCellOptions,
1850) -> Result<TypedCellContent> {
1851    loop {
1852        match this.try_read_task_cell(id, index, options)? {
1853            Ok(result) => return Ok(result),
1854            Err(listener) => listener.await,
1855        }
1856    }
1857}
1858
1859/// A reference to a task's cell with methods that allow updating the contents
1860/// of the cell.
1861///
1862/// Mutations should not outside of the task that that owns this cell. Doing so
1863/// is a logic error, and may lead to incorrect caching behavior.
1864#[derive(Clone, Copy, Serialize, Deserialize)]
1865pub struct CurrentCellRef {
1866    current_task: TaskId,
1867    index: CellId,
1868}
1869
1870type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1871
1872impl CurrentCellRef {
1873    /// Updates the cell if the given `functor` returns a value.
1874    pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1875    where
1876        T: VcValueType,
1877    {
1878        self.conditional_update_with_shared_reference(|old_shared_reference| {
1879            let old_ref = old_shared_reference
1880                .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1881                .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1882            let new_value = functor(old_ref)?;
1883            Some(SharedReference::new(triomphe::Arc::new(
1884                <T::Read as VcRead<T>>::value_to_repr(new_value),
1885            )))
1886        })
1887    }
1888
1889    /// Updates the cell if the given `functor` returns a `SharedReference`.
1890    pub fn conditional_update_with_shared_reference(
1891        &self,
1892        functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1893    ) {
1894        let tt = turbo_tasks();
1895        let cell_content = tt
1896            .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
1897            .ok();
1898        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1899        if let Some(update) = update {
1900            tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(update)))
1901        }
1902    }
1903
1904    /// Replace the current cell's content with `new_value` if the current content is not equal by
1905    /// value with the existing content.
1906    ///
1907    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1908    ///
1909    /// Take this example of a custom equality implementation on a transparent wrapper type:
1910    ///
1911    /// ```
1912    /// #[turbo_tasks::value(transparent, eq = "manual")]
1913    /// struct Wrapper(Vec<u32>);
1914    ///
1915    /// impl PartialEq for Wrapper {
1916    ///     fn eq(&self, other: Wrapper) {
1917    ///         // Example: order doesn't matter for equality
1918    ///         let (mut this, mut other) = (self.clone(), other.clone());
1919    ///         this.sort_unstable();
1920    ///         other.sort_unstable();
1921    ///         this == other
1922    ///     }
1923    /// }
1924    ///
1925    /// impl Eq for Wrapper {}
1926    /// ```
1927    ///
1928    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
1929    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
1930    ///
1931    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
1932    /// just forwards to the inner value's [`PartialEq`].
1933    ///
1934    /// If you already have a `SharedReference`, consider calling
1935    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
1936    /// object.
1937    pub fn compare_and_update<T>(&self, new_value: T)
1938    where
1939        T: PartialEq + VcValueType,
1940    {
1941        self.conditional_update(|old_value| {
1942            if let Some(old_value) = old_value
1943                && old_value == &new_value
1944            {
1945                return None;
1946            }
1947            Some(new_value)
1948        });
1949    }
1950
1951    /// Replace the current cell's content with `new_shared_reference` if the
1952    /// current content is not equal by value with the existing content.
1953    ///
1954    /// If you already have a `SharedReference`, this is a faster version of
1955    /// [`CurrentCellRef::compare_and_update`].
1956    ///
1957    /// The [`SharedReference`] is expected to use the `<T::Read as
1958    /// VcRead<T>>::Repr` type for its representation of the value.
1959    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1960    where
1961        T: VcValueType + PartialEq,
1962    {
1963        fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1964            <T::Read as VcRead<T>>::repr_to_value_ref(
1965                sr.0.downcast_ref::<VcReadRepr<T>>()
1966                    .expect("cannot update SharedReference of different type"),
1967            )
1968        }
1969        self.conditional_update_with_shared_reference(|old_sr| {
1970            if let Some(old_sr) = old_sr {
1971                let old_value: &T = extract_sr_value(old_sr);
1972                let new_value = extract_sr_value(&new_shared_reference);
1973                if old_value == new_value {
1974                    return None;
1975                }
1976            }
1977            Some(new_shared_reference)
1978        });
1979    }
1980
1981    /// Unconditionally updates the content of the cell.
1982    pub fn update<T>(&self, new_value: T)
1983    where
1984        T: VcValueType,
1985    {
1986        let tt = turbo_tasks();
1987        tt.update_own_task_cell(
1988            self.current_task,
1989            self.index,
1990            CellContent(Some(SharedReference::new(triomphe::Arc::new(
1991                <T::Read as VcRead<T>>::value_to_repr(new_value),
1992            )))),
1993        )
1994    }
1995
1996    /// A faster version of [`Self::update`] if you already have a
1997    /// [`SharedReference`].
1998    ///
1999    /// If the passed-in [`SharedReference`] is the same as the existing cell's
2000    /// by identity, no update is performed.
2001    ///
2002    /// The [`SharedReference`] is expected to use the `<T::Read as
2003    /// VcRead<T>>::Repr` type for its representation of the value.
2004    pub fn update_with_shared_reference(&self, shared_ref: SharedReference) {
2005        let tt = turbo_tasks();
2006        let content = tt
2007            .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
2008            .ok();
2009        let update = if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2010            // pointer equality (not value equality)
2011            shared_ref_exp != shared_ref
2012        } else {
2013            true
2014        };
2015        if update {
2016            tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(shared_ref)))
2017        }
2018    }
2019}
2020
2021impl From<CurrentCellRef> for RawVc {
2022    fn from(cell: CurrentCellRef) -> Self {
2023        RawVc::TaskCell(cell.current_task, cell.index)
2024    }
2025}
2026
2027pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
2028    CURRENT_TASK_STATE.with(|ts| {
2029        let current_task = current_task("celling turbo_tasks values");
2030        let mut ts = ts.write().unwrap();
2031        let map = ts.cell_counters.as_mut().unwrap();
2032        let current_index = map.entry(ty).or_default();
2033        let index = *current_index;
2034        *current_index += 1;
2035        CurrentCellRef {
2036            current_task,
2037            index: CellId { type_id: ty, index },
2038        }
2039    })
2040}
2041
2042pub(crate) async fn read_local_output(
2043    this: &dyn TurboTasksApi,
2044    execution_id: ExecutionId,
2045    local_task_id: LocalTaskId,
2046) -> Result<RawVc> {
2047    loop {
2048        match this.try_read_local_output(execution_id, local_task_id)? {
2049            Ok(raw_vc) => return Ok(raw_vc),
2050            Err(event_listener) => event_listener.await,
2051        }
2052    }
2053}