turbo_tasks/
manager.rs

1use std::{
2    any::Any,
3    borrow::Cow,
4    future::Future,
5    hash::BuildHasherDefault,
6    mem::take,
7    pin::Pin,
8    sync::{
9        Arc, Mutex, RwLock, Weak,
10        atomic::{AtomicBool, AtomicUsize, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use anyhow::{Result, anyhow};
17use auto_hash_map::AutoMap;
18use rustc_hash::FxHasher;
19use serde::{Deserialize, Serialize};
20use tokio::{runtime::Handle, select, sync::mpsc::Receiver, task_local};
21use tokio_util::task::TaskTracker;
22use tracing::{Instrument, Level, Span, info_span, instrument, trace_span};
23use turbo_tasks_malloc::TurboMalloc;
24
25use crate::{
26    Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
27    ResolvedVc, SharedReference, TaskId, TaskIdSet, ValueTypeId, Vc, VcRead, VcValueTrait,
28    VcValueType,
29    backend::{
30        Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
31        TransientTaskType, TurboTasksExecutionError, TypedCellContent,
32    },
33    capture_future::{self, CaptureFuture},
34    event::{Event, EventListener},
35    id::{BackendJobId, ExecutionId, FunctionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
36    id_factory::IdFactoryWithReuse,
37    magic_any::MagicAny,
38    message_queue::{CompilationEvent, CompilationEventQueue},
39    raw_vc::{CellId, RawVc},
40    registry,
41    serialization_invalidation::SerializationInvalidator,
42    task::local_task::{LocalTask, LocalTaskType},
43    task_statistics::TaskStatisticsApi,
44    trace::TraceRawVcs,
45    trait_helpers::get_trait_method,
46    util::{IdFactory, SharedError, StaticOrArc},
47    vc::ReadVcFuture,
48};
49
50/// Common base trait for [`TurboTasksApi`] and [`TurboTasksBackendApi`]. Provides APIs for creating
51/// tasks from function calls.
52pub trait TurboTasksCallApi: Sync + Send {
53    /// Calls a native function with arguments. Resolves arguments when needed
54    /// with a wrapper task.
55    fn dynamic_call(
56        &self,
57        func: FunctionId,
58        this: Option<RawVc>,
59        arg: Box<dyn MagicAny>,
60        persistence: TaskPersistence,
61    ) -> RawVc;
62    /// Call a native function with arguments.
63    /// All inputs must be resolved.
64    fn native_call(
65        &self,
66        func: FunctionId,
67        this: Option<RawVc>,
68        arg: Box<dyn MagicAny>,
69        persistence: TaskPersistence,
70    ) -> RawVc;
71    /// Calls a trait method with arguments. First input is the `self` object.
72    /// Uses a wrapper task to resolve
73    fn trait_call(
74        &self,
75        trait_type: TraitTypeId,
76        trait_fn_name: Cow<'static, str>,
77        this: RawVc,
78        arg: Box<dyn MagicAny>,
79        persistence: TaskPersistence,
80    ) -> RawVc;
81
82    fn run_once(
83        &self,
84        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
85    ) -> TaskId;
86    fn run_once_with_reason(
87        &self,
88        reason: StaticOrArc<dyn InvalidationReason>,
89        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90    ) -> TaskId;
91    fn run_once_process(
92        &self,
93        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94    ) -> TaskId;
95}
96
97/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
98/// context. Returned by the [`turbo_tasks`] helper function.
99///
100/// This trait is needed because thread locals cannot contain an unresolved [`Backend`] type
101/// parameter.
102pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
103    fn pin(&self) -> Arc<dyn TurboTasksApi>;
104
105    fn invalidate(&self, task: TaskId);
106    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
107
108    fn invalidate_serialization(&self, task: TaskId);
109
110    /// Eagerly notifies all tasks that were scheduled for notifications via
111    /// `schedule_notify_tasks_set()`
112    fn notify_scheduled_tasks(&self);
113
114    fn try_read_task_output(
115        &self,
116        task: TaskId,
117        consistency: ReadConsistency,
118    ) -> Result<Result<RawVc, EventListener>>;
119
120    /// INVALIDATION: Be careful with this, it will not track dependencies, so
121    /// using it could break cache invalidation.
122    fn try_read_task_output_untracked(
123        &self,
124        task: TaskId,
125        consistency: ReadConsistency,
126    ) -> Result<Result<RawVc, EventListener>>;
127
128    fn try_read_task_cell(
129        &self,
130        task: TaskId,
131        index: CellId,
132        options: ReadCellOptions,
133    ) -> Result<Result<TypedCellContent, EventListener>>;
134
135    /// INVALIDATION: Be careful with this, it will not track dependencies, so
136    /// using it could break cache invalidation.
137    fn try_read_task_cell_untracked(
138        &self,
139        task: TaskId,
140        index: CellId,
141        options: ReadCellOptions,
142    ) -> Result<Result<TypedCellContent, EventListener>>;
143
144    /// Reads a [`RawVc::LocalOutput`]. If the task has completed, returns the [`RawVc`] the local
145    /// task points to.
146    ///
147    /// The returned [`RawVc`] may also be a [`RawVc::LocalOutput`], so this may need to be called
148    /// recursively or in a loop.
149    ///
150    /// This does not accept a consistency argument, as you cannot control consistency of a read of
151    /// an operation owned by your own task. Strongly consistent reads are only allowed on
152    /// [`OperationVc`]s, which should never be local tasks.
153    ///
154    /// No dependency tracking will happen as a result of this function call, as it's a no-op for a
155    /// task to depend on itself.
156    ///
157    /// [`OperationVc`]: crate::OperationVc
158    fn try_read_local_output(
159        &self,
160        execution_id: ExecutionId,
161        local_task_id: LocalTaskId,
162    ) -> Result<Result<RawVc, EventListener>>;
163
164    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
165
166    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
167    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
168    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
169
170    /// INVALIDATION: Be careful with this, it will not track dependencies, so
171    /// using it could break cache invalidation.
172    fn try_read_own_task_cell_untracked(
173        &self,
174        current_task: TaskId,
175        index: CellId,
176        options: ReadCellOptions,
177    ) -> Result<TypedCellContent>;
178
179    fn read_own_task_cell(
180        &self,
181        task: TaskId,
182        index: CellId,
183        options: ReadCellOptions,
184    ) -> Result<TypedCellContent>;
185    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
186    fn mark_own_task_as_finished(&self, task: TaskId);
187    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
188    fn mark_own_task_as_session_dependent(&self, task: TaskId);
189
190    fn connect_task(&self, task: TaskId);
191
192    /// Wraps the given future in the current task.
193    ///
194    /// Beware: this method is not safe to use in production code. It is only intended for use in
195    /// tests and for debugging purposes.
196    fn detached_for_testing(
197        &self,
198        f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
199    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
200
201    fn task_statistics(&self) -> &TaskStatisticsApi;
202
203    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
204
205    fn subscribe_to_compilation_events(
206        &self,
207        event_types: Option<Vec<String>>,
208    ) -> Receiver<Arc<dyn CompilationEvent>>;
209    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
210}
211
212/// A wrapper around a value that is unused.
213pub struct Unused<T> {
214    inner: T,
215}
216
217impl<T> Unused<T> {
218    /// Creates a new unused value.
219    ///
220    /// # Safety
221    ///
222    /// The wrapped value must not be used.
223    pub unsafe fn new_unchecked(inner: T) -> Self {
224        Self { inner }
225    }
226
227    /// Get the inner value, without consuming the `Unused` wrapper.
228    ///
229    /// # Safety
230    ///
231    /// The user need to make sure that the value stays unused.
232    pub unsafe fn get_unchecked(&self) -> &T {
233        &self.inner
234    }
235
236    /// Unwraps the value, consuming the `Unused` wrapper.
237    pub fn into(self) -> T {
238        self.inner
239    }
240}
241
242/// A subset of the [`TurboTasks`] API that's exposed to [`Backend`] implementations.
243pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
244    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
245
246    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
247    fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
248    /// # Safety
249    ///
250    /// The caller must ensure that the task id is not used anymore.
251    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
252    /// # Safety
253    ///
254    /// The caller must ensure that the task id is not used anymore.
255    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
256
257    fn schedule(&self, task: TaskId);
258    fn schedule_backend_background_job(&self, id: BackendJobId);
259    fn schedule_backend_foreground_job(&self, id: BackendJobId);
260
261    fn try_foreground_done(&self) -> Result<(), EventListener>;
262    fn wait_foreground_done_excluding_own<'a>(
263        &'a self,
264    ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>>;
265
266    /// Enqueues tasks for notification of changed dependencies. This will
267    /// eventually call `invalidate_tasks()` on all tasks.
268    fn schedule_notify_tasks(&self, tasks: &[TaskId]);
269
270    /// Enqueues tasks for notification of changed dependencies. This will
271    /// eventually call `invalidate_tasks()` on all tasks.
272    fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet);
273
274    /// Returns the duration from the start of the program to the given instant.
275    fn program_duration_until(&self, instant: Instant) -> Duration;
276
277    /// An untyped object-safe version of [`TurboTasksBackendApiExt::read_task_state`]. Callers
278    /// should prefer the extension trait's version of this method.
279    fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState));
280
281    /// An untyped object-safe version of [`TurboTasksBackendApiExt::write_task_state`]. Callers
282    /// should prefer the extension trait's version of this method.
283    fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState));
284
285    /// Returns true if the system is idle.
286    fn is_idle(&self) -> bool;
287
288    /// Returns a reference to the backend.
289    fn backend(&self) -> &B;
290}
291
292/// An extension trait for methods of [`TurboTasksBackendApi`] that are not object-safe. This is
293/// automatically implemented for all [`TurboTasksBackendApi`]s using a blanket impl.
294pub trait TurboTasksBackendApiExt<B: Backend + 'static>: TurboTasksBackendApi<B> {
295    /// Allows modification of the [`Backend::TaskState`].
296    ///
297    /// This function holds open a non-exclusive read lock that blocks writes, so `func` is expected
298    /// to execute quickly in order to release the lock.
299    fn read_task_state<T>(&self, func: impl FnOnce(&B::TaskState) -> T) -> T {
300        let mut func = Some(func);
301        let mut out = None;
302        self.read_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
303        out.expect("read_task_state_dyn must call `func`")
304    }
305
306    /// Allows modification of the [`Backend::TaskState`].
307    ///
308    /// This function holds open a write lock, so `func` is expected to execute quickly in order to
309    /// release the lock.
310    fn write_task_state<T>(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T {
311        let mut func = Some(func);
312        let mut out = None;
313        self.write_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
314        out.expect("write_task_state_dyn must call `func`")
315    }
316}
317
318impl<TT, B> TurboTasksBackendApiExt<B> for TT
319where
320    TT: TurboTasksBackendApi<B> + ?Sized,
321    B: Backend + 'static,
322{
323}
324
325#[allow(clippy::manual_non_exhaustive)]
326pub struct UpdateInfo {
327    pub duration: Duration,
328    pub tasks: usize,
329    pub reasons: InvalidationReasonSet,
330    #[allow(dead_code)]
331    placeholder_for_future_fields: (),
332}
333
334#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
335pub enum TaskPersistence {
336    /// Tasks that may be persisted across sessions using serialization.
337    Persistent,
338
339    /// Tasks that will be persisted in memory for the life of this session, but won't persist
340    /// between sessions.
341    ///
342    /// This is used for [root tasks][TurboTasks::spawn_root_task] and tasks with an argument of
343    /// type [`TransientValue`][crate::value::TransientValue] or
344    /// [`TransientInstance`][crate::value::TransientInstance].
345    Transient,
346
347    /// Tasks that are persisted only for the lifetime of the nearest non-`Local` parent caller.
348    ///
349    /// This task does not have a unique task id, and is not shared with the backend. Instead it
350    /// uses the parent task's id.
351    ///
352    /// This is useful for functions that have a low cache hit rate. Those functions could be
353    /// converted to non-task functions, but that would break their function signature. This
354    /// provides a mechanism for skipping caching without changing the function signature.
355    Local,
356}
357
358#[derive(Clone, Copy, Debug, Eq, PartialEq)]
359pub enum ReadConsistency {
360    /// The default behavior for most APIs. Reads are faster, but may return stale values, which
361    /// may later trigger re-computation.
362    Eventual,
363    /// Ensures all dependencies are fully resolved before returning the cell or output data, at
364    /// the cost of slower reads.
365    ///
366    /// Top-level code that returns data to the user should use strongly consistent reads.
367    Strong,
368}
369
370pub struct TurboTasks<B: Backend + 'static> {
371    this: Weak<Self>,
372    backend: B,
373    task_id_factory: IdFactoryWithReuse<TaskId>,
374    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
375    execution_id_factory: IdFactory<ExecutionId>,
376    stopped: AtomicBool,
377    currently_scheduled_tasks: AtomicUsize,
378    currently_scheduled_foreground_jobs: AtomicUsize,
379    currently_scheduled_background_jobs: AtomicUsize,
380    scheduled_tasks: AtomicUsize,
381    start: Mutex<Option<Instant>>,
382    aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
383    event: Event,
384    event_start: Event,
385    event_foreground: Event,
386    event_background: Event,
387    program_start: Instant,
388    compilation_events: CompilationEventQueue,
389}
390
391/// Information about a non-local task. A non-local task can contain multiple "local" tasks, which
392/// all share the same non-local task state.
393///
394/// A non-local task is one that:
395///
396/// - Has a unique task id.
397/// - Is potentially cached.
398/// - The backend is aware of.
399struct CurrentTaskState {
400    task_id: TaskId,
401    execution_id: ExecutionId,
402
403    /// Affected tasks, that are tracked during task execution. These tasks will
404    /// be invalidated when the execution finishes or before reading a cell
405    /// value.
406    tasks_to_notify: Vec<TaskId>,
407
408    /// True if the current task has state in cells
409    stateful: bool,
410
411    /// Tracks how many cells of each type has been allocated so far during this task execution.
412    /// When a task is re-executed, the cell count may not match the existing cell vec length.
413    ///
414    /// This is taken (and becomes `None`) during teardown of a task.
415    cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
416
417    /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`.
418    local_tasks: Vec<LocalTask>,
419
420    /// Tracks currently running local tasks, and defers cleanup of the global task until those
421    /// complete. Also used by `detached_for_testing`.
422    local_task_tracker: TaskTracker,
423
424    backend_state: Box<dyn Any + Send + Sync>,
425}
426
427impl CurrentTaskState {
428    fn new(
429        task_id: TaskId,
430        execution_id: ExecutionId,
431        backend_state: Box<dyn Any + Send + Sync>,
432    ) -> Self {
433        Self {
434            task_id,
435            execution_id,
436            tasks_to_notify: Vec::new(),
437            stateful: false,
438            cell_counters: Some(AutoMap::default()),
439            local_tasks: Vec::new(),
440            local_task_tracker: TaskTracker::new(),
441            backend_state,
442        }
443    }
444
445    fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
446        if self.execution_id != expected_execution_id {
447            panic!(
448                "Local tasks can only be scheduled/awaited within the same execution of the \
449                 parent task that created them"
450            );
451        }
452    }
453
454    fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
455        self.local_tasks.push(local_task);
456        // generate a one-indexed id from len() -- we just pushed so len() is >= 1
457        if cfg!(debug_assertions) {
458            LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
459        } else {
460            unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
461        }
462    }
463
464    fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
465        // local task ids are one-indexed (they use NonZeroU32)
466        &self.local_tasks[(*local_task_id as usize) - 1]
467    }
468
469    fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
470        &mut self.local_tasks[(*local_task_id as usize) - 1]
471    }
472}
473
474// TODO implement our own thread pool and make these thread locals instead
475task_local! {
476    /// The current TurboTasks instance
477    static TURBO_TASKS: Arc<dyn TurboTasksApi>;
478
479    static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
480}
481
482impl<B: Backend + 'static> TurboTasks<B> {
483    // TODO better lifetime management for turbo tasks
484    // consider using unsafe for the task_local turbo tasks
485    // that should be safe as long tasks can't outlife turbo task
486    // so we probably want to make sure that all tasks are joined
487    // when trying to drop turbo tasks
488    pub fn new(backend: B) -> Arc<Self> {
489        let task_id_factory = IdFactoryWithReuse::new(
490            TaskId::MIN,
491            TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
492        );
493        let transient_task_id_factory =
494            IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
495        let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
496        let this = Arc::new_cyclic(|this| Self {
497            this: this.clone(),
498            backend,
499            task_id_factory,
500            transient_task_id_factory,
501            execution_id_factory,
502            stopped: AtomicBool::new(false),
503            currently_scheduled_tasks: AtomicUsize::new(0),
504            currently_scheduled_background_jobs: AtomicUsize::new(0),
505            currently_scheduled_foreground_jobs: AtomicUsize::new(0),
506            scheduled_tasks: AtomicUsize::new(0),
507            start: Default::default(),
508            aggregated_update: Default::default(),
509            event: Event::new(|| "TurboTasks::event".to_string()),
510            event_start: Event::new(|| "TurboTasks::event_start".to_string()),
511            event_foreground: Event::new(|| "TurboTasks::event_foreground".to_string()),
512            event_background: Event::new(|| "TurboTasks::event_background".to_string()),
513            program_start: Instant::now(),
514            compilation_events: CompilationEventQueue::default(),
515        });
516        this.backend.startup(&*this);
517        this
518    }
519
520    pub fn pin(&self) -> Arc<Self> {
521        self.this.upgrade().unwrap()
522    }
523
524    /// Creates a new root task
525    pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
526    where
527        T: ?Sized,
528        F: Fn() -> Fut + Send + Sync + Clone + 'static,
529        Fut: Future<Output = Result<Vc<T>>> + Send,
530    {
531        let id = self.backend.create_transient_task(
532            TransientTaskType::Root(Box::new(move || {
533                let functor = functor.clone();
534                Box::pin(async move {
535                    let raw_vc = functor().await?.node;
536                    raw_vc.to_non_local().await
537                })
538            })),
539            self,
540        );
541        self.schedule(id);
542        id
543    }
544
545    pub fn dispose_root_task(&self, task_id: TaskId) {
546        self.backend.dispose_root_task(task_id, self);
547    }
548
549    // TODO make sure that all dependencies settle before reading them
550    /// Creates a new root task, that is only executed once.
551    /// Dependencies will not invalidate the task.
552    #[track_caller]
553    pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
554    where
555        T: ?Sized,
556        Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
557    {
558        let id = self.backend.create_transient_task(
559            TransientTaskType::Once(Box::pin(async move {
560                let raw_vc = future.await?.node;
561                raw_vc.to_non_local().await
562            })),
563            self,
564        );
565        self.schedule(id);
566        id
567    }
568
569    pub async fn run_once<T: TraceRawVcs + Send + 'static>(
570        &self,
571        future: impl Future<Output = Result<T>> + Send + 'static,
572    ) -> Result<T> {
573        let (tx, rx) = tokio::sync::oneshot::channel();
574        let task_id = self.spawn_once_task(async move {
575            let result = future.await?;
576            tx.send(result)
577                .map_err(|_| anyhow!("unable to send result"))?;
578            Ok(Completion::new())
579        });
580        // INVALIDATION: A Once task will never invalidate, therefore we don't need to
581        // track a dependency
582        let raw_result =
583            read_task_output_untracked(self, task_id, ReadConsistency::Eventual).await?;
584        turbo_tasks_future_scope(
585            self.pin(),
586            ReadVcFuture::<Completion>::from(raw_result.into_read().untracked()),
587        )
588        .await?;
589
590        Ok(rx.await?)
591    }
592
593    pub(crate) fn native_call(
594        &self,
595        fn_type: FunctionId,
596        this: Option<RawVc>,
597        arg: Box<dyn MagicAny>,
598        persistence: TaskPersistence,
599    ) -> RawVc {
600        match persistence {
601            TaskPersistence::Local => {
602                let task_type = LocalTaskType::Native { fn_type, this, arg };
603                self.schedule_local_task(task_type, persistence)
604            }
605            TaskPersistence::Transient => {
606                let task_type = CachedTaskType { fn_type, this, arg };
607                RawVc::TaskOutput(self.backend.get_or_create_transient_task(
608                    task_type,
609                    current_task("turbo_function calls"),
610                    self,
611                ))
612            }
613            TaskPersistence::Persistent => {
614                let task_type = CachedTaskType { fn_type, this, arg };
615                RawVc::TaskOutput(self.backend.get_or_create_persistent_task(
616                    task_type,
617                    current_task("turbo_function calls"),
618                    self,
619                ))
620            }
621        }
622    }
623
624    pub fn dynamic_call(
625        &self,
626        fn_type: FunctionId,
627        this: Option<RawVc>,
628        arg: Box<dyn MagicAny>,
629        persistence: TaskPersistence,
630    ) -> RawVc {
631        if this.is_none_or(|this| this.is_resolved())
632            && registry::get_function(fn_type).arg_meta.is_resolved(&*arg)
633        {
634            return self.native_call(fn_type, this, arg, persistence);
635        }
636        let task_type = LocalTaskType::ResolveNative { fn_type, this, arg };
637        self.schedule_local_task(task_type, persistence)
638    }
639
640    pub fn trait_call(
641        &self,
642        trait_type: TraitTypeId,
643        mut trait_fn_name: Cow<'static, str>,
644        this: RawVc,
645        arg: Box<dyn MagicAny>,
646        persistence: TaskPersistence,
647    ) -> RawVc {
648        // avoid creating a wrapper task if self is already resolved
649        // for resolved cells we already know the value type so we can lookup the
650        // function
651        if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
652            match get_trait_method(trait_type, type_id, trait_fn_name) {
653                Ok(native_fn) => {
654                    let arg = registry::get_function(native_fn).arg_meta.filter_owned(arg);
655                    return self.dynamic_call(native_fn, Some(this), arg, persistence);
656                }
657                Err(name) => {
658                    trait_fn_name = name;
659                }
660            }
661        }
662
663        // create a wrapper task to resolve all inputs
664        let task_type = LocalTaskType::ResolveTrait {
665            trait_type,
666            method_name: trait_fn_name,
667            this,
668            arg,
669        };
670
671        self.schedule_local_task(task_type, persistence)
672    }
673
674    #[track_caller]
675    pub(crate) fn schedule(&self, task_id: TaskId) {
676        self.begin_primary_job();
677        self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
678
679        let this = self.pin();
680        let future = async move {
681            let mut schedule_again = true;
682            while schedule_again {
683                let backend_state = this.backend.new_task_state(task_id);
684                // it's okay for execution ids to overflow and wrap, they're just used for an assert
685                let execution_id = this.execution_id_factory.wrapping_get();
686                let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
687                    task_id,
688                    execution_id,
689                    Box::new(backend_state),
690                )));
691                let single_execution_future = async {
692                    if this.stopped.load(Ordering::Acquire) {
693                        this.backend.task_execution_canceled(task_id, &*this);
694                        return false;
695                    }
696
697                    let Some(TaskExecutionSpec { future, span }) =
698                        this.backend.try_start_task_execution(task_id, &*this)
699                    else {
700                        return false;
701                    };
702
703                    async {
704                        let (result, duration, memory_usage) = CaptureFuture::new(future).await;
705
706                        // wait for all spawned local tasks using `local` to finish
707                        let ltt = CURRENT_TASK_STATE
708                            .with(|ts| ts.read().unwrap().local_task_tracker.clone());
709                        ltt.close();
710                        ltt.wait().await;
711
712                        let result = match result {
713                            Ok(Ok(raw_vc)) => Ok(raw_vc),
714                            Ok(Err(err)) => Err(Arc::new(err.into())),
715                            Err(err) => {
716                                Err(Arc::new(TurboTasksExecutionError::Panic(Arc::new(err))))
717                            }
718                        };
719
720                        this.backend.task_execution_result(task_id, result, &*this);
721                        let stateful = this.finish_current_task_state();
722                        let cell_counters = CURRENT_TASK_STATE
723                            .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
724                        let schedule_again = this.backend.task_execution_completed(
725                            task_id,
726                            duration,
727                            memory_usage,
728                            &cell_counters,
729                            stateful,
730                            &*this,
731                        );
732                        // task_execution_completed might need to notify tasks
733                        this.notify_scheduled_tasks();
734                        schedule_again
735                    }
736                    .instrument(span)
737                    .await
738                };
739                schedule_again = CURRENT_TASK_STATE
740                    .scope(current_task_state, single_execution_future)
741                    .await;
742            }
743            this.finish_primary_job();
744            anyhow::Ok(())
745        };
746
747        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
748
749        #[cfg(feature = "tokio_tracing")]
750        {
751            let description = self.backend.get_task_description(task_id);
752            tokio::task::Builder::new()
753                .name(&description)
754                .spawn(future)
755                .unwrap();
756        }
757        #[cfg(not(feature = "tokio_tracing"))]
758        tokio::task::spawn(future);
759    }
760
761    fn schedule_local_task(
762        &self,
763        ty: LocalTaskType,
764        // if this is a `LocalTaskType::Resolve*`, we may spawn another task with this persistence,
765        // if this is a `LocalTaskType::Native`, persistence is unused.
766        //
767        // TODO: In the rare case that we're crossing a transient->persistent boundary, we should
768        // force `LocalTaskType::Native` to be spawned as real tasks, so that any cells they create
769        // have the correct persistence. This is not an issue for resolution stub task, as they
770        // don't end up owning any cells.
771        persistence: TaskPersistence,
772    ) -> RawVc {
773        let ty = Arc::new(ty);
774        let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
775            .with(|gts| {
776                let mut gts_write = gts.write().unwrap();
777                let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
778                    done_event: Event::new({
779                        let ty = Arc::clone(&ty);
780                        move || format!("LocalTask({ty})::done_event")
781                    }),
782                });
783                (
784                    Arc::clone(gts),
785                    gts_write.task_id,
786                    gts_write.execution_id,
787                    local_task_id,
788                )
789            });
790
791        #[cfg(feature = "tokio_tracing")]
792        let description = format!(
793            "[local] (parent: {}) {}",
794            self.backend.get_task_description(parent_task_id),
795            ty,
796        );
797        #[cfg(not(feature = "tokio_tracing"))]
798        let _ = parent_task_id; // suppress unused variable warning
799
800        let this = self.pin();
801        let future = async move {
802            let TaskExecutionSpec { future, span } =
803                crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
804            let ty = ty.clone();
805            async move {
806                let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
807
808                let result = match result {
809                    Ok(Ok(raw_vc)) => Ok(raw_vc),
810                    Ok(Err(err)) => Err(Arc::new(err.into())),
811                    Err(err) => Err(Arc::new(TurboTasksExecutionError::Panic(Arc::new(err)))),
812                };
813
814                let local_task = LocalTask::Done {
815                    output: match result {
816                        Ok(raw_vc) => OutputContent::Link(raw_vc),
817                        Err(err) => match &*err {
818                            TurboTasksExecutionError::Error { .. } => {
819                                OutputContent::Error(SharedError::new(
820                                    anyhow::Error::new(err)
821                                        .context(format!("Execution of {ty} failed")),
822                                ))
823                            }
824                            TurboTasksExecutionError::Panic(err) => {
825                                OutputContent::Panic(err.clone())
826                            }
827                        },
828                    },
829                };
830
831                let done_event = CURRENT_TASK_STATE.with(move |gts| {
832                    let mut gts_write = gts.write().unwrap();
833                    let scheduled_task =
834                        std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
835                    let LocalTask::Scheduled { done_event } = scheduled_task else {
836                        panic!("local task finished, but was not in the scheduled state?");
837                    };
838                    done_event
839                });
840                done_event.notify(usize::MAX)
841            }
842            .instrument(span)
843            .await
844        };
845        let future = global_task_state
846            .read()
847            .unwrap()
848            .local_task_tracker
849            .track_future(future);
850        let future = CURRENT_TASK_STATE.scope(global_task_state, future);
851        let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
852
853        #[cfg(feature = "tokio_tracing")]
854        tokio::task::Builder::new()
855            .name(&description)
856            .spawn(future)
857            .unwrap();
858        #[cfg(not(feature = "tokio_tracing"))]
859        tokio::task::spawn(future);
860
861        RawVc::LocalOutput(execution_id, local_task_id, persistence)
862    }
863
864    fn begin_primary_job(&self) {
865        if self
866            .currently_scheduled_tasks
867            .fetch_add(1, Ordering::AcqRel)
868            == 0
869        {
870            *self.start.lock().unwrap() = Some(Instant::now());
871            self.event_start.notify(usize::MAX);
872            self.backend.idle_end(self);
873        }
874    }
875
876    fn begin_foreground_job(&self) {
877        self.begin_primary_job();
878        self.currently_scheduled_foreground_jobs
879            .fetch_add(1, Ordering::AcqRel);
880    }
881
882    fn finish_primary_job(&self) {
883        if self
884            .currently_scheduled_tasks
885            .fetch_sub(1, Ordering::AcqRel)
886            == 1
887        {
888            self.backend.idle_start(self);
889            // That's not super race-condition-safe, but it's only for
890            // statistical reasons
891            let total = self.scheduled_tasks.load(Ordering::Acquire);
892            self.scheduled_tasks.store(0, Ordering::Release);
893            if let Some(start) = *self.start.lock().unwrap() {
894                let (update, _) = &mut *self.aggregated_update.lock().unwrap();
895                if let Some(update) = update.as_mut() {
896                    update.0 += start.elapsed();
897                    update.1 += total;
898                } else {
899                    *update = Some((start.elapsed(), total));
900                }
901            }
902            self.event.notify(usize::MAX);
903        }
904    }
905
906    fn finish_foreground_job(&self) {
907        if self
908            .currently_scheduled_foreground_jobs
909            .fetch_sub(1, Ordering::AcqRel)
910            == 1
911        {
912            self.event_foreground.notify(usize::MAX);
913        }
914        self.finish_primary_job();
915    }
916
917    pub async fn wait_foreground_done(&self) {
918        if self
919            .currently_scheduled_foreground_jobs
920            .load(Ordering::Acquire)
921            == 0
922        {
923            return;
924        }
925        let listener = self.event_foreground.listen();
926        if self
927            .currently_scheduled_foreground_jobs
928            .load(Ordering::Acquire)
929            == 0
930        {
931            return;
932        }
933        listener
934            .instrument(trace_span!("wait_foreground_done"))
935            .await;
936    }
937
938    pub fn get_in_progress_count(&self) -> usize {
939        self.currently_scheduled_tasks.load(Ordering::Acquire)
940    }
941
942    /// Waits for the given task to finish executing. This works by performing an untracked read,
943    /// and discarding the value of the task output.
944    ///
945    /// [`ReadConsistency::Eventual`] means that this will return after the task executes, but
946    /// before all dependencies have completely settled.
947    ///
948    /// [`ReadConsistency::Strong`] means that this will also wait for the task and all dependencies
949    /// to fully settle before returning.
950    ///
951    /// As this function is typically called in top-level code that waits for results to be ready
952    /// for the user to access, most callers should use [`ReadConsistency::Strong`].
953    pub async fn wait_task_completion(
954        &self,
955        id: TaskId,
956        consistency: ReadConsistency,
957    ) -> Result<()> {
958        // INVALIDATION: This doesn't return a value, only waits for it to be ready.
959        read_task_output_untracked(self, id, consistency).await?;
960        Ok(())
961    }
962
963    #[deprecated(note = "Use get_or_wait_aggregated_update_info instead")]
964    pub async fn get_or_wait_update_info(&self, aggregation: Duration) -> (Duration, usize) {
965        let UpdateInfo {
966            duration, tasks, ..
967        } = self.get_or_wait_aggregated_update_info(aggregation).await;
968        (duration, tasks)
969    }
970
971    #[deprecated(note = "Use aggregated_update_info instead")]
972    pub async fn update_info(
973        &self,
974        aggregation: Duration,
975        timeout: Duration,
976    ) -> Option<(Duration, usize)> {
977        self.aggregated_update_info(aggregation, timeout).await.map(
978            |UpdateInfo {
979                 duration, tasks, ..
980             }| (duration, tasks),
981        )
982    }
983
984    /// Returns [UpdateInfo] with all updates aggregated over a given duration
985    /// (`aggregation`). Will wait until an update happens.
986    pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
987        self.aggregated_update_info(aggregation, Duration::MAX)
988            .await
989            .unwrap()
990    }
991
992    /// Returns [UpdateInfo] with all updates aggregated over a given duration
993    /// (`aggregation`). Will only return None when the timeout is reached while
994    /// waiting for the first update.
995    pub async fn aggregated_update_info(
996        &self,
997        aggregation: Duration,
998        timeout: Duration,
999    ) -> Option<UpdateInfo> {
1000        let listener = self
1001            .event
1002            .listen_with_note(|| "wait for update info".to_string());
1003        let wait_for_finish = {
1004            let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1005            if aggregation.is_zero() {
1006                if let Some((duration, tasks)) = update.take() {
1007                    return Some(UpdateInfo {
1008                        duration,
1009                        tasks,
1010                        reasons: take(reason_set),
1011                        placeholder_for_future_fields: (),
1012                    });
1013                } else {
1014                    true
1015                }
1016            } else {
1017                update.is_none()
1018            }
1019        };
1020        if wait_for_finish {
1021            if timeout == Duration::MAX {
1022                // wait for finish
1023                listener.await;
1024            } else {
1025                // wait for start, then wait for finish or timeout
1026                let start_listener = self
1027                    .event_start
1028                    .listen_with_note(|| "wait for update info".to_string());
1029                if self.currently_scheduled_tasks.load(Ordering::Acquire) == 0 {
1030                    start_listener.await;
1031                } else {
1032                    drop(start_listener);
1033                }
1034                if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1035                    // Timeout
1036                    return None;
1037                }
1038            }
1039        }
1040        if !aggregation.is_zero() {
1041            loop {
1042                select! {
1043                    () = tokio::time::sleep(aggregation) => {
1044                        break;
1045                    }
1046                    () = self.event.listen_with_note(|| "wait for update info".to_string()) => {
1047                        // Resets the sleep
1048                    }
1049                }
1050            }
1051        }
1052        let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1053        if let Some((duration, tasks)) = update.take() {
1054            Some(UpdateInfo {
1055                duration,
1056                tasks,
1057                reasons: take(reason_set),
1058                placeholder_for_future_fields: (),
1059            })
1060        } else {
1061            panic!("aggregated_update_info must not called concurrently")
1062        }
1063    }
1064
1065    pub async fn wait_background_done(&self) {
1066        let listener = self.event_background.listen();
1067        if self
1068            .currently_scheduled_background_jobs
1069            .load(Ordering::Acquire)
1070            != 0
1071        {
1072            listener.await;
1073        }
1074    }
1075
1076    pub async fn stop_and_wait(&self) {
1077        self.backend.stopping(self);
1078        self.stopped.store(true, Ordering::Release);
1079        {
1080            let listener = self.event.listen_with_note(|| "wait for stop".to_string());
1081            if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1082                listener.await;
1083            }
1084        }
1085        {
1086            let listener = self.event_background.listen();
1087            if self
1088                .currently_scheduled_background_jobs
1089                .load(Ordering::Acquire)
1090                != 0
1091            {
1092                listener.await;
1093            }
1094        }
1095        self.backend.stop(self);
1096    }
1097
1098    #[track_caller]
1099    pub(crate) fn schedule_background_job<
1100        T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1101        F: Future<Output = ()> + Send + 'static,
1102    >(
1103        &self,
1104        func: T,
1105    ) {
1106        let this = self.pin();
1107        self.currently_scheduled_background_jobs
1108            .fetch_add(1, Ordering::AcqRel);
1109        tokio::spawn(
1110            TURBO_TASKS
1111                .scope(this.clone(), async move {
1112                    while this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1113                        let listener = this.event.listen_with_note(|| {
1114                            "background job waiting for execution".to_string()
1115                        });
1116                        if this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1117                            listener.await;
1118                        }
1119                    }
1120                    let this2 = this.clone();
1121                    if !this.stopped.load(Ordering::Acquire) {
1122                        func(this).await;
1123                    }
1124                    if this2
1125                        .currently_scheduled_background_jobs
1126                        .fetch_sub(1, Ordering::AcqRel)
1127                        == 1
1128                    {
1129                        this2.event_background.notify(usize::MAX);
1130                    }
1131                })
1132                .in_current_span(),
1133        );
1134    }
1135
1136    #[track_caller]
1137    pub(crate) fn schedule_foreground_job<
1138        T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1139        F: Future<Output = ()> + Send + 'static,
1140    >(
1141        &self,
1142        func: T,
1143    ) {
1144        let this = self.pin();
1145        this.begin_foreground_job();
1146        tokio::spawn(
1147            TURBO_TASKS
1148                .scope(this.clone(), async move {
1149                    if !this.stopped.load(Ordering::Acquire) {
1150                        func(this.clone()).await;
1151                    }
1152                    this.finish_foreground_job();
1153                })
1154                .in_current_span(),
1155        );
1156    }
1157
1158    fn finish_current_task_state(&self) -> bool {
1159        let (stateful, tasks) = CURRENT_TASK_STATE.with(|cell| {
1160            let CurrentTaskState {
1161                tasks_to_notify,
1162                stateful,
1163                ..
1164            } = &mut *cell.write().unwrap();
1165            (*stateful, take(tasks_to_notify))
1166        });
1167
1168        if !tasks.is_empty() {
1169            self.backend.invalidate_tasks(&tasks, self);
1170        }
1171        stateful
1172    }
1173
1174    pub fn backend(&self) -> &B {
1175        &self.backend
1176    }
1177}
1178
1179impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1180    fn dynamic_call(
1181        &self,
1182        func: FunctionId,
1183        this: Option<RawVc>,
1184        arg: Box<dyn MagicAny>,
1185        persistence: TaskPersistence,
1186    ) -> RawVc {
1187        self.dynamic_call(func, this, arg, persistence)
1188    }
1189    fn native_call(
1190        &self,
1191        func: FunctionId,
1192        this: Option<RawVc>,
1193        arg: Box<dyn MagicAny>,
1194        persistence: TaskPersistence,
1195    ) -> RawVc {
1196        self.native_call(func, this, arg, persistence)
1197    }
1198    fn trait_call(
1199        &self,
1200        trait_type: TraitTypeId,
1201        trait_fn_name: Cow<'static, str>,
1202        this: RawVc,
1203        arg: Box<dyn MagicAny>,
1204        persistence: TaskPersistence,
1205    ) -> RawVc {
1206        self.trait_call(trait_type, trait_fn_name, this, arg, persistence)
1207    }
1208
1209    #[track_caller]
1210    fn run_once(
1211        &self,
1212        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1213    ) -> TaskId {
1214        self.spawn_once_task(async move {
1215            future.await?;
1216            Ok(Completion::new())
1217        })
1218    }
1219
1220    #[track_caller]
1221    fn run_once_with_reason(
1222        &self,
1223        reason: StaticOrArc<dyn InvalidationReason>,
1224        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1225    ) -> TaskId {
1226        {
1227            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1228            reason_set.insert(reason);
1229        }
1230        self.spawn_once_task(async move {
1231            future.await?;
1232            Ok(Completion::new())
1233        })
1234    }
1235
1236    #[track_caller]
1237    fn run_once_process(
1238        &self,
1239        future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1240    ) -> TaskId {
1241        let this = self.pin();
1242        self.spawn_once_task(async move {
1243            this.finish_primary_job();
1244            future.await?;
1245            this.begin_primary_job();
1246            Ok(Completion::new())
1247        })
1248    }
1249}
1250
1251impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1252    fn pin(&self) -> Arc<dyn TurboTasksApi> {
1253        self.pin()
1254    }
1255
1256    #[instrument(level = Level::INFO, skip_all, name = "invalidate")]
1257    fn invalidate(&self, task: TaskId) {
1258        self.backend.invalidate_task(task, self);
1259    }
1260
1261    #[instrument(level = Level::INFO, skip_all, name = "invalidate", fields(name = display(&reason)))]
1262    fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1263        {
1264            let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1265            reason_set.insert(reason);
1266        }
1267        self.backend.invalidate_task(task, self);
1268    }
1269
1270    fn invalidate_serialization(&self, task: TaskId) {
1271        self.backend.invalidate_serialization(task, self);
1272    }
1273
1274    fn notify_scheduled_tasks(&self) {
1275        let _ = CURRENT_TASK_STATE.try_with(|cell| {
1276            let tasks = {
1277                let CurrentTaskState {
1278                    tasks_to_notify, ..
1279                } = &mut *cell.write().unwrap();
1280                take(tasks_to_notify)
1281            };
1282            if tasks.is_empty() {
1283                return;
1284            }
1285            self.backend.invalidate_tasks(&tasks, self);
1286        });
1287    }
1288
1289    fn try_read_task_output(
1290        &self,
1291        task: TaskId,
1292        consistency: ReadConsistency,
1293    ) -> Result<Result<RawVc, EventListener>> {
1294        self.backend
1295            .try_read_task_output(task, current_task("reading Vcs"), consistency, self)
1296    }
1297
1298    fn try_read_task_output_untracked(
1299        &self,
1300        task: TaskId,
1301        consistency: ReadConsistency,
1302    ) -> Result<Result<RawVc, EventListener>> {
1303        self.backend
1304            .try_read_task_output_untracked(task, consistency, self)
1305    }
1306
1307    fn try_read_task_cell(
1308        &self,
1309        task: TaskId,
1310        index: CellId,
1311        options: ReadCellOptions,
1312    ) -> Result<Result<TypedCellContent, EventListener>> {
1313        self.backend
1314            .try_read_task_cell(task, index, current_task("reading Vcs"), options, self)
1315    }
1316
1317    fn try_read_task_cell_untracked(
1318        &self,
1319        task: TaskId,
1320        index: CellId,
1321        options: ReadCellOptions,
1322    ) -> Result<Result<TypedCellContent, EventListener>> {
1323        self.backend
1324            .try_read_task_cell_untracked(task, index, options, self)
1325    }
1326
1327    fn try_read_own_task_cell_untracked(
1328        &self,
1329        current_task: TaskId,
1330        index: CellId,
1331        options: ReadCellOptions,
1332    ) -> Result<TypedCellContent> {
1333        self.backend
1334            .try_read_own_task_cell_untracked(current_task, index, options, self)
1335    }
1336
1337    fn try_read_local_output(
1338        &self,
1339        execution_id: ExecutionId,
1340        local_task_id: LocalTaskId,
1341    ) -> Result<Result<RawVc, EventListener>> {
1342        CURRENT_TASK_STATE.with(|gts| {
1343            let gts_read = gts.read().unwrap();
1344
1345            // Local Vcs are local to their parent task's current execution, and do not exist
1346            // outside of it. This is weakly enforced at compile time using the `NonLocalValue`
1347            // marker trait. This assertion exists to handle any potential escapes that the
1348            // compile-time checks cannot capture.
1349            gts_read.assert_execution_id(execution_id);
1350
1351            match gts_read.get_local_task(local_task_id) {
1352                LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1353                LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1354            }
1355        })
1356    }
1357
1358    fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1359        self.backend.read_task_collectibles(
1360            task,
1361            trait_id,
1362            current_task("reading collectibles"),
1363            self,
1364        )
1365    }
1366
1367    fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1368        self.backend.emit_collectible(
1369            trait_type,
1370            collectible,
1371            current_task("emitting collectible"),
1372            self,
1373        );
1374    }
1375
1376    fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1377        self.backend.unemit_collectible(
1378            trait_type,
1379            collectible,
1380            count,
1381            current_task("emitting collectible"),
1382            self,
1383        );
1384    }
1385
1386    fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1387        for (&collectible, &count) in collectibles {
1388            if count > 0 {
1389                self.backend.unemit_collectible(
1390                    trait_type,
1391                    collectible,
1392                    count as u32,
1393                    current_task("emitting collectible"),
1394                    self,
1395                );
1396            }
1397        }
1398    }
1399
1400    fn read_own_task_cell(
1401        &self,
1402        task: TaskId,
1403        index: CellId,
1404        options: ReadCellOptions,
1405    ) -> Result<TypedCellContent> {
1406        // INVALIDATION: don't need to track a dependency to itself
1407        self.try_read_own_task_cell_untracked(task, index, options)
1408    }
1409
1410    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
1411        self.backend.update_task_cell(task, index, content, self);
1412    }
1413
1414    fn connect_task(&self, task: TaskId) {
1415        self.backend
1416            .connect_task(task, current_task("connecting task"), self);
1417    }
1418
1419    fn mark_own_task_as_finished(&self, task: TaskId) {
1420        self.backend.mark_own_task_as_finished(task, self);
1421    }
1422
1423    fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1424        self.backend
1425            .set_own_task_aggregation_number(task, aggregation_number, self);
1426    }
1427
1428    fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1429        self.backend.mark_own_task_as_session_dependent(task, self);
1430    }
1431
1432    /// Creates a future that inherits the current task id and task state. The current global task
1433    /// will wait for this future to be dropped before exiting.
1434    fn detached_for_testing(
1435        &self,
1436        fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1437    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1438        // this is similar to what happens for a local task, except that we keep the local task's
1439        // state as well.
1440        let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1441        let tracked_fut = {
1442            let ts = global_task_state.read().unwrap();
1443            ts.local_task_tracker.track_future(fut)
1444        };
1445        Box::pin(TURBO_TASKS.scope(
1446            turbo_tasks(),
1447            CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1448        ))
1449    }
1450
1451    fn task_statistics(&self) -> &TaskStatisticsApi {
1452        self.backend.task_statistics()
1453    }
1454
1455    fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1456        let this = self.pin();
1457        Box::pin(async move {
1458            this.stop_and_wait().await;
1459        })
1460    }
1461
1462    fn subscribe_to_compilation_events(
1463        &self,
1464        event_types: Option<Vec<String>>,
1465    ) -> Receiver<Arc<dyn CompilationEvent>> {
1466        self.compilation_events.subscribe(event_types)
1467    }
1468
1469    fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1470        if let Err(e) = self.compilation_events.send(event) {
1471            tracing::warn!("Failed to send compilation event: {e}");
1472        }
1473    }
1474}
1475
1476impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1477    fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1478        self.pin()
1479    }
1480    fn backend(&self) -> &B {
1481        &self.backend
1482    }
1483
1484    #[track_caller]
1485    fn schedule_backend_background_job(&self, id: BackendJobId) {
1486        self.schedule_background_job(move |this| async move {
1487            this.backend.run_backend_job(id, &*this).await;
1488        })
1489    }
1490
1491    #[track_caller]
1492    fn schedule_backend_foreground_job(&self, id: BackendJobId) {
1493        self.schedule_foreground_job(move |this| async move {
1494            this.backend.run_backend_job(id, &*this).await;
1495        })
1496    }
1497
1498    fn try_foreground_done(&self) -> Result<(), EventListener> {
1499        if self
1500            .currently_scheduled_foreground_jobs
1501            .load(Ordering::Acquire)
1502            == 0
1503        {
1504            return Ok(());
1505        }
1506        let listener = self.event_foreground.listen();
1507        if self
1508            .currently_scheduled_foreground_jobs
1509            .load(Ordering::Acquire)
1510            == 0
1511        {
1512            return Ok(());
1513        }
1514        Err(listener)
1515    }
1516
1517    fn wait_foreground_done_excluding_own<'a>(
1518        &'a self,
1519    ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>> {
1520        if self
1521            .currently_scheduled_foreground_jobs
1522            .load(Ordering::Acquire)
1523            == 0
1524        {
1525            return None;
1526        }
1527        Some(Box::pin(async {
1528            self.finish_foreground_job();
1529            self.wait_foreground_done().await;
1530            self.begin_foreground_job();
1531        }))
1532    }
1533
1534    /// Enqueues tasks for notification of changed dependencies. This will
1535    /// eventually call `dependent_cell_updated()` on all tasks.
1536    fn schedule_notify_tasks(&self, tasks: &[TaskId]) {
1537        let result = CURRENT_TASK_STATE.try_with(|cell| {
1538            let CurrentTaskState {
1539                tasks_to_notify, ..
1540            } = &mut *cell.write().unwrap();
1541            tasks_to_notify.extend(tasks.iter());
1542        });
1543        if result.is_err() {
1544            let _guard = trace_span!("schedule_notify_tasks", count = tasks.len()).entered();
1545            self.backend.invalidate_tasks(tasks, self);
1546        }
1547    }
1548
1549    /// Enqueues tasks for notification of changed dependencies. This will
1550    /// eventually call `dependent_cell_updated()` on all tasks.
1551    fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet) {
1552        let result = CURRENT_TASK_STATE.try_with(|cell| {
1553            let CurrentTaskState {
1554                tasks_to_notify, ..
1555            } = &mut *cell.write().unwrap();
1556            tasks_to_notify.extend(tasks.iter());
1557        });
1558        if result.is_err() {
1559            let _guard = trace_span!("schedule_notify_tasks_set", count = tasks.len()).entered();
1560            self.backend.invalidate_tasks_set(tasks, self);
1561        };
1562    }
1563
1564    #[track_caller]
1565    fn schedule(&self, task: TaskId) {
1566        self.schedule(task)
1567    }
1568
1569    fn program_duration_until(&self, instant: Instant) -> Duration {
1570        instant - self.program_start
1571    }
1572
1573    fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1574        // SAFETY: This is a fresh id from the factory
1575        unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1576    }
1577
1578    fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1579        // SAFETY: This is a fresh id from the factory
1580        unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1581    }
1582
1583    unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1584        unsafe { self.task_id_factory.reuse(id.into()) }
1585    }
1586
1587    unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1588        unsafe { self.transient_task_id_factory.reuse(id.into()) }
1589    }
1590
1591    fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState)) {
1592        CURRENT_TASK_STATE
1593            .with(move |ts| func(ts.read().unwrap().backend_state.downcast_ref().unwrap()))
1594    }
1595
1596    fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState)) {
1597        CURRENT_TASK_STATE
1598            .with(move |ts| func(ts.write().unwrap().backend_state.downcast_mut().unwrap()))
1599    }
1600
1601    fn is_idle(&self) -> bool {
1602        self.currently_scheduled_tasks.load(Ordering::Acquire) == 0
1603    }
1604}
1605
1606pub(crate) fn current_task(from: &str) -> TaskId {
1607    match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1608        Ok(id) => id,
1609        Err(_) => panic!("{from} can only be used in the context of turbo_tasks task execution"),
1610    }
1611}
1612
1613pub async fn run_once<T: Send + 'static>(
1614    tt: Arc<dyn TurboTasksApi>,
1615    future: impl Future<Output = Result<T>> + Send + 'static,
1616) -> Result<T> {
1617    let (tx, rx) = tokio::sync::oneshot::channel();
1618
1619    let task_id = tt.run_once(Box::pin(async move {
1620        let result = future.await?;
1621        tx.send(result)
1622            .map_err(|_| anyhow!("unable to send result"))?;
1623        Ok(())
1624    }));
1625
1626    // INVALIDATION: A Once task will never invalidate, therefore we don't need to
1627    // track a dependency
1628    let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1629    let raw_future = raw_result.into_read().untracked();
1630    turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1631
1632    Ok(rx.await?)
1633}
1634
1635pub async fn run_once_with_reason<T: Send + 'static>(
1636    tt: Arc<dyn TurboTasksApi>,
1637    reason: impl InvalidationReason,
1638    future: impl Future<Output = Result<T>> + Send + 'static,
1639) -> Result<T> {
1640    let (tx, rx) = tokio::sync::oneshot::channel();
1641
1642    let task_id = tt.run_once_with_reason(
1643        (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1644        Box::pin(async move {
1645            let result = future.await?;
1646            tx.send(result)
1647                .map_err(|_| anyhow!("unable to send result"))?;
1648            Ok(())
1649        }),
1650    );
1651
1652    // INVALIDATION: A Once task will never invalidate, therefore we don't need to
1653    // track a dependency
1654    let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1655    let raw_future = raw_result.into_read().untracked();
1656    turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1657
1658    Ok(rx.await?)
1659}
1660
1661/// Calls [`TurboTasks::dynamic_call`] for the current turbo tasks instance.
1662pub fn dynamic_call(
1663    func: FunctionId,
1664    this: Option<RawVc>,
1665    arg: Box<dyn MagicAny>,
1666    persistence: TaskPersistence,
1667) -> RawVc {
1668    with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1669}
1670
1671/// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance.
1672pub fn trait_call(
1673    trait_type: TraitTypeId,
1674    trait_fn_name: Cow<'static, str>,
1675    this: RawVc,
1676    arg: Box<dyn MagicAny>,
1677    persistence: TaskPersistence,
1678) -> RawVc {
1679    with_turbo_tasks(|tt| tt.trait_call(trait_type, trait_fn_name, this, arg, persistence))
1680}
1681
1682pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1683    TURBO_TASKS.with(|arc| arc.clone())
1684}
1685
1686pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1687    TURBO_TASKS.with(|arc| func(arc))
1688}
1689
1690pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1691    TURBO_TASKS.sync_scope(tt, f)
1692}
1693
1694pub fn turbo_tasks_future_scope<T>(
1695    tt: Arc<dyn TurboTasksApi>,
1696    f: impl Future<Output = T>,
1697) -> impl Future<Output = T> {
1698    TURBO_TASKS.scope(tt, f)
1699}
1700
1701pub fn with_turbo_tasks_for_testing<T>(
1702    tt: Arc<dyn TurboTasksApi>,
1703    current_task: TaskId,
1704    execution_id: ExecutionId,
1705    f: impl Future<Output = T>,
1706) -> impl Future<Output = T> {
1707    TURBO_TASKS.scope(
1708        tt,
1709        CURRENT_TASK_STATE.scope(
1710            Arc::new(RwLock::new(CurrentTaskState::new(
1711                current_task,
1712                execution_id,
1713                Box::new(()),
1714            ))),
1715            f,
1716        ),
1717    )
1718}
1719
1720/// Spawns the given future within the context of the current task.
1721///
1722/// Beware: this method is not safe to use in production code. It is only
1723/// intended for use in tests and for debugging purposes.
1724pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1725    tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1726}
1727
1728pub fn current_task_for_testing() -> TaskId {
1729    CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1730}
1731
1732/// Marks the current task as dirty when restored from persistent cache.
1733pub fn mark_session_dependent() {
1734    with_turbo_tasks(|tt| {
1735        tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1736    });
1737}
1738
1739/// Marks the current task as finished. This excludes it from waiting for
1740/// strongly consistency.
1741pub fn mark_root() {
1742    with_turbo_tasks(|tt| {
1743        tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1744    });
1745}
1746
1747/// Marks the current task as finished. This excludes it from waiting for
1748/// strongly consistency.
1749pub fn mark_finished() {
1750    with_turbo_tasks(|tt| {
1751        tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1752    });
1753}
1754
1755/// Marks the current task as stateful. This prevents the tasks from being
1756/// dropped without persisting the state.
1757///
1758/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
1759/// serialization of the current task cells
1760pub fn mark_stateful() -> SerializationInvalidator {
1761    CURRENT_TASK_STATE.with(|cell| {
1762        let CurrentTaskState {
1763            stateful, task_id, ..
1764        } = &mut *cell.write().unwrap();
1765        *stateful = true;
1766        SerializationInvalidator::new(*task_id)
1767    })
1768}
1769
1770pub fn prevent_gc() {
1771    // There is a hack in UpdateCellOperation that need to be updated when this is changed.
1772    mark_stateful();
1773}
1774
1775/// Notifies scheduled tasks for execution.
1776pub fn notify_scheduled_tasks() {
1777    with_turbo_tasks(|tt| tt.notify_scheduled_tasks())
1778}
1779
1780pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1781    with_turbo_tasks(|tt| {
1782        let raw_vc = collectible.node.node;
1783        tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1784    })
1785}
1786
1787pub async fn spawn_blocking<T: Send + 'static>(func: impl FnOnce() -> T + Send + 'static) -> T {
1788    let turbo_tasks = turbo_tasks();
1789    let span = Span::current();
1790    let (result, duration, alloc_info) = tokio::task::spawn_blocking(|| {
1791        let _guard = span.entered();
1792        let start = Instant::now();
1793        let start_allocations = TurboMalloc::allocation_counters();
1794        let r = turbo_tasks_scope(turbo_tasks, func);
1795        (r, start.elapsed(), start_allocations.until_now())
1796    })
1797    .await
1798    .unwrap();
1799    capture_future::add_duration(duration);
1800    capture_future::add_allocation_info(alloc_info);
1801    result
1802}
1803
1804pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
1805    let handle = Handle::current();
1806    let span = info_span!("thread").or_current();
1807    thread::spawn(move || {
1808        let span = span.entered();
1809        let guard = handle.enter();
1810        func();
1811        drop(guard);
1812        drop(span);
1813    });
1814}
1815
1816pub(crate) async fn read_task_output(
1817    this: &dyn TurboTasksApi,
1818    id: TaskId,
1819    consistency: ReadConsistency,
1820) -> Result<RawVc> {
1821    loop {
1822        match this.try_read_task_output(id, consistency)? {
1823            Ok(result) => return Ok(result),
1824            Err(listener) => listener.await,
1825        }
1826    }
1827}
1828
1829/// INVALIDATION: Be careful with this, it will not track dependencies, so
1830/// using it could break cache invalidation.
1831pub(crate) async fn read_task_output_untracked(
1832    this: &dyn TurboTasksApi,
1833    id: TaskId,
1834    consistency: ReadConsistency,
1835) -> Result<RawVc> {
1836    loop {
1837        match this.try_read_task_output_untracked(id, consistency)? {
1838            Ok(result) => return Ok(result),
1839            Err(listener) => listener.await,
1840        }
1841    }
1842}
1843
1844pub(crate) async fn read_task_cell(
1845    this: &dyn TurboTasksApi,
1846    id: TaskId,
1847    index: CellId,
1848    options: ReadCellOptions,
1849) -> Result<TypedCellContent> {
1850    loop {
1851        match this.try_read_task_cell(id, index, options)? {
1852            Ok(result) => return Ok(result),
1853            Err(listener) => listener.await,
1854        }
1855    }
1856}
1857
1858/// A reference to a task's cell with methods that allow updating the contents
1859/// of the cell.
1860///
1861/// Mutations should not outside of the task that that owns this cell. Doing so
1862/// is a logic error, and may lead to incorrect caching behavior.
1863#[derive(Clone, Copy, Serialize, Deserialize)]
1864pub struct CurrentCellRef {
1865    current_task: TaskId,
1866    index: CellId,
1867}
1868
1869type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1870
1871impl CurrentCellRef {
1872    /// Updates the cell if the given `functor` returns a value.
1873    pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1874    where
1875        T: VcValueType,
1876    {
1877        self.conditional_update_with_shared_reference(|old_shared_reference| {
1878            let old_ref = old_shared_reference
1879                .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1880                .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1881            let new_value = functor(old_ref)?;
1882            Some(SharedReference::new(triomphe::Arc::new(
1883                <T::Read as VcRead<T>>::value_to_repr(new_value),
1884            )))
1885        })
1886    }
1887
1888    /// Updates the cell if the given `functor` returns a `SharedReference`.
1889    pub fn conditional_update_with_shared_reference(
1890        &self,
1891        functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1892    ) {
1893        let tt = turbo_tasks();
1894        let cell_content = tt
1895            .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
1896            .ok();
1897        let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1898        if let Some(update) = update {
1899            tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(update)))
1900        }
1901    }
1902
1903    /// Replace the current cell's content with `new_value` if the current content is not equal by
1904    /// value with the existing content.
1905    ///
1906    /// The comparison happens using the value itself, not the [`VcRead::Target`] of that value.
1907    ///
1908    /// Take this example of a custom equality implementation on a transparent wrapper type:
1909    ///
1910    /// ```
1911    /// #[turbo_tasks::value(transparent, eq = "manual")]
1912    /// struct Wrapper(Vec<u32>);
1913    ///
1914    /// impl PartialEq for Wrapper {
1915    ///     fn eq(&self, other: Wrapper) {
1916    ///         // Example: order doesn't matter for equality
1917    ///         let (mut this, mut other) = (self.clone(), other.clone());
1918    ///         this.sort_unstable();
1919    ///         other.sort_unstable();
1920    ///         this == other
1921    ///     }
1922    /// }
1923    ///
1924    /// impl Eq for Wrapper {}
1925    /// ```
1926    ///
1927    /// Comparisons of [`Vc<Wrapper>`] used when updating the cell will use `Wrapper`'s custom
1928    /// equality implementation, rather than the one provided by the target ([`Vec<u32>`]) type.
1929    ///
1930    /// However, in most cases, the default derived implementation of [`PartialEq`] is used which
1931    /// just forwards to the inner value's [`PartialEq`].
1932    ///
1933    /// If you already have a `SharedReference`, consider calling
1934    /// [`Self::compare_and_update_with_shared_reference`] which can re-use the [`SharedReference`]
1935    /// object.
1936    pub fn compare_and_update<T>(&self, new_value: T)
1937    where
1938        T: PartialEq + VcValueType,
1939    {
1940        self.conditional_update(|old_value| {
1941            if let Some(old_value) = old_value {
1942                if old_value == &new_value {
1943                    return None;
1944                }
1945            }
1946            Some(new_value)
1947        });
1948    }
1949
1950    /// Replace the current cell's content with `new_shared_reference` if the
1951    /// current content is not equal by value with the existing content.
1952    ///
1953    /// If you already have a `SharedReference`, this is a faster version of
1954    /// [`CurrentCellRef::compare_and_update`].
1955    ///
1956    /// The [`SharedReference`] is expected to use the `<T::Read as
1957    /// VcRead<T>>::Repr` type for its representation of the value.
1958    pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1959    where
1960        T: VcValueType + PartialEq,
1961    {
1962        fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1963            <T::Read as VcRead<T>>::repr_to_value_ref(
1964                sr.0.downcast_ref::<VcReadRepr<T>>()
1965                    .expect("cannot update SharedReference of different type"),
1966            )
1967        }
1968        self.conditional_update_with_shared_reference(|old_sr| {
1969            if let Some(old_sr) = old_sr {
1970                let old_value: &T = extract_sr_value(old_sr);
1971                let new_value = extract_sr_value(&new_shared_reference);
1972                if old_value == new_value {
1973                    return None;
1974                }
1975            }
1976            Some(new_shared_reference)
1977        });
1978    }
1979
1980    /// Unconditionally updates the content of the cell.
1981    pub fn update<T>(&self, new_value: T)
1982    where
1983        T: VcValueType,
1984    {
1985        let tt = turbo_tasks();
1986        tt.update_own_task_cell(
1987            self.current_task,
1988            self.index,
1989            CellContent(Some(SharedReference::new(triomphe::Arc::new(
1990                <T::Read as VcRead<T>>::value_to_repr(new_value),
1991            )))),
1992        )
1993    }
1994
1995    /// A faster version of [`Self::update`] if you already have a
1996    /// [`SharedReference`].
1997    ///
1998    /// If the passed-in [`SharedReference`] is the same as the existing cell's
1999    /// by identity, no update is performed.
2000    ///
2001    /// The [`SharedReference`] is expected to use the `<T::Read as
2002    /// VcRead<T>>::Repr` type for its representation of the value.
2003    pub fn update_with_shared_reference(&self, shared_ref: SharedReference) {
2004        let tt = turbo_tasks();
2005        let content = tt
2006            .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
2007            .ok();
2008        let update = if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2009            // pointer equality (not value equality)
2010            shared_ref_exp != shared_ref
2011        } else {
2012            true
2013        };
2014        if update {
2015            tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(shared_ref)))
2016        }
2017    }
2018}
2019
2020impl From<CurrentCellRef> for RawVc {
2021    fn from(cell: CurrentCellRef) -> Self {
2022        RawVc::TaskCell(cell.current_task, cell.index)
2023    }
2024}
2025
2026pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
2027    CURRENT_TASK_STATE.with(|ts| {
2028        let current_task = current_task("celling turbo_tasks values");
2029        let mut ts = ts.write().unwrap();
2030        let map = ts.cell_counters.as_mut().unwrap();
2031        let current_index = map.entry(ty).or_default();
2032        let index = *current_index;
2033        *current_index += 1;
2034        CurrentCellRef {
2035            current_task,
2036            index: CellId { type_id: ty, index },
2037        }
2038    })
2039}
2040
2041pub(crate) async fn read_local_output(
2042    this: &dyn TurboTasksApi,
2043    execution_id: ExecutionId,
2044    local_task_id: LocalTaskId,
2045) -> Result<RawVc> {
2046    loop {
2047        match this.try_read_local_output(execution_id, local_task_id)? {
2048            Ok(raw_vc) => return Ok(raw_vc),
2049            Err(event_listener) => event_listener.await,
2050        }
2051    }
2052}