turbo_tasks_backend/backend/
mod.rs

1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6    borrow::Cow,
7    cmp::min,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    ops::Range,
13    pin::Pin,
14    sync::{
15        Arc, LazyLock,
16        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17    },
18};
19
20use anyhow::{Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_tasks::{
29    CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
30    ReadOutputOptions, ReadTracking, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId,
31    TurboTasksBackendApi, ValueTypeId,
32    backend::{
33        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
34        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
35    },
36    event::{Event, EventListener},
37    message_queue::TimingEvent,
38    registry::get_value_type,
39    task_statistics::TaskStatisticsApi,
40    trace::TraceRawVcs,
41    turbo_tasks,
42    util::{IdFactoryWithReuse, good_chunk_size},
43};
44
45pub use self::{operation::AnyOperation, storage::TaskDataCategory};
46#[cfg(feature = "trace_task_dirty")]
47use crate::backend::operation::TaskDirtyCause;
48use crate::{
49    backend::{
50        operation::{
51            AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation,
52            ComputeDirtyAndCleanUpdate, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
53            Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
54            get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
55        },
56        storage::{
57            InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
58            iter_many, remove,
59        },
60    },
61    backing_storage::BackingStorage,
62    data::{
63        ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
64        CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
65        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
66    },
67    utils::{
68        bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
69        ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
70    },
71};
72
73const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
74
75/// Configurable idle timeout for snapshot persistence.
76/// Defaults to 2 seconds if not set or if the value is invalid.
77static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
78    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
79        .ok()
80        .and_then(|v| v.parse::<u64>().ok())
81        .map(Duration::from_millis)
82        .unwrap_or(Duration::from_secs(2))
83});
84
85struct SnapshotRequest {
86    snapshot_requested: bool,
87    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
88}
89
90impl SnapshotRequest {
91    fn new() -> Self {
92        Self {
93            snapshot_requested: false,
94            suspended_operations: FxHashSet::default(),
95        }
96    }
97}
98
99type TransientTaskOnce =
100    Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
101
102pub enum TransientTask {
103    /// A root task that will track dependencies and re-execute when
104    /// dependencies change. Task will eventually settle to the correct
105    /// execution.
106    ///
107    /// Always active. Automatically scheduled.
108    Root(TransientTaskRoot),
109
110    // TODO implement these strongly consistency
111    /// A single root task execution. It won't track dependencies.
112    /// Task will definitely include all invalidations that happened before the
113    /// start of the task. It may or may not include invalidations that
114    /// happened after that. It may see these invalidations partially
115    /// applied.
116    ///
117    /// Active until done. Automatically scheduled.
118    Once(TransientTaskOnce),
119}
120
121pub enum StorageMode {
122    /// Queries the storage for cache entries that don't exist locally.
123    ReadOnly,
124    /// Queries the storage for cache entries that don't exist locally.
125    /// Regularly pushes changes to the backing storage.
126    ReadWrite,
127    /// Queries the storage for cache entries that don't exist locally.
128    /// On shutdown, pushes all changes to the backing storage.
129    ReadWriteOnShutdown,
130}
131
132pub struct BackendOptions {
133    /// Enables dependency tracking.
134    ///
135    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
136    /// forever.
137    pub dependency_tracking: bool,
138
139    /// Enables active tracking.
140    ///
141    /// Automatically disabled when `dependency_tracking` is disabled.
142    ///
143    /// When disabled: All tasks are considered as active.
144    pub active_tracking: bool,
145
146    /// Enables the backing storage.
147    pub storage_mode: Option<StorageMode>,
148
149    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
150    /// datastructures. If `None`, it will use the available parallelism.
151    pub num_workers: Option<usize>,
152
153    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
154    pub small_preallocation: bool,
155}
156
157impl Default for BackendOptions {
158    fn default() -> Self {
159        Self {
160            dependency_tracking: true,
161            active_tracking: true,
162            storage_mode: Some(StorageMode::ReadWrite),
163            num_workers: None,
164            small_preallocation: false,
165        }
166    }
167}
168
169pub enum TurboTasksBackendJob {
170    InitialSnapshot,
171    FollowUpSnapshot,
172    Prefetch {
173        data: Arc<FxIndexMap<TaskId, bool>>,
174        range: Option<Range<usize>>,
175    },
176}
177
178pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
179
180type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
181
182struct TurboTasksBackendInner<B: BackingStorage> {
183    options: BackendOptions,
184
185    start_time: Instant,
186
187    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
188    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
189
190    persisted_task_cache_log: Option<TaskCacheLog>,
191    task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
192    transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
193
194    storage: Storage,
195
196    /// When true, the backing_storage has data that is not in the local storage.
197    local_is_partial: AtomicBool,
198
199    /// Number of executing operations + Highest bit is set when snapshot is
200    /// requested. When that bit is set, operations should pause until the
201    /// snapshot is completed. When the bit is set and in progress counter
202    /// reaches zero, `operations_completed_when_snapshot_requested` is
203    /// triggered.
204    in_progress_operations: AtomicUsize,
205
206    snapshot_request: Mutex<SnapshotRequest>,
207    /// Condition Variable that is triggered when `in_progress_operations`
208    /// reaches zero while snapshot is requested. All operations are either
209    /// completed or suspended.
210    operations_suspended: Condvar,
211    /// Condition Variable that is triggered when a snapshot is completed and
212    /// operations can continue.
213    snapshot_completed: Condvar,
214    /// The timestamp of the last started snapshot since [`Self::start_time`].
215    last_snapshot: AtomicU64,
216
217    stopping: AtomicBool,
218    stopping_event: Event,
219    idle_start_event: Event,
220    idle_end_event: Event,
221    #[cfg(feature = "verify_aggregation_graph")]
222    is_idle: AtomicBool,
223
224    task_statistics: TaskStatisticsApi,
225
226    backing_storage: B,
227
228    #[cfg(feature = "verify_aggregation_graph")]
229    root_tasks: Mutex<FxHashSet<TaskId>>,
230}
231
232impl<B: BackingStorage> TurboTasksBackend<B> {
233    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
234        Self(Arc::new(TurboTasksBackendInner::new(
235            options,
236            backing_storage,
237        )))
238    }
239
240    pub fn backing_storage(&self) -> &B {
241        &self.0.backing_storage
242    }
243}
244
245impl<B: BackingStorage> TurboTasksBackendInner<B> {
246    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
247        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
248        let need_log = matches!(
249            options.storage_mode,
250            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
251        );
252        if !options.dependency_tracking {
253            options.active_tracking = false;
254        }
255        let small_preallocation = options.small_preallocation;
256        let next_task_id = backing_storage
257            .next_free_task_id()
258            .expect("Failed to get task id");
259        Self {
260            options,
261            start_time: Instant::now(),
262            persisted_task_id_factory: IdFactoryWithReuse::new(
263                next_task_id,
264                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
265            ),
266            transient_task_id_factory: IdFactoryWithReuse::new(
267                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
268                TaskId::MAX,
269            ),
270            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
271            task_cache: BiMap::new(),
272            transient_tasks: FxDashMap::default(),
273            local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
274            storage: Storage::new(shard_amount, small_preallocation),
275            in_progress_operations: AtomicUsize::new(0),
276            snapshot_request: Mutex::new(SnapshotRequest::new()),
277            operations_suspended: Condvar::new(),
278            snapshot_completed: Condvar::new(),
279            last_snapshot: AtomicU64::new(0),
280            stopping: AtomicBool::new(false),
281            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
282            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
283            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
284            #[cfg(feature = "verify_aggregation_graph")]
285            is_idle: AtomicBool::new(false),
286            task_statistics: TaskStatisticsApi::default(),
287            backing_storage,
288            #[cfg(feature = "verify_aggregation_graph")]
289            root_tasks: Default::default(),
290        }
291    }
292
293    fn execute_context<'a>(
294        &'a self,
295        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
296    ) -> impl ExecuteContext<'a> {
297        ExecuteContextImpl::new(self, turbo_tasks)
298    }
299
300    /// # Safety
301    ///
302    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
303    unsafe fn execute_context_with_tx<'e, 'tx>(
304        &'e self,
305        tx: Option<&'e B::ReadTransaction<'tx>>,
306        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
307    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
308    where
309        'tx: 'e,
310    {
311        // Safety: `tx` is from `self`.
312        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
313    }
314
315    fn suspending_requested(&self) -> bool {
316        self.should_persist()
317            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
318    }
319
320    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
321        #[cold]
322        fn operation_suspend_point_cold<B: BackingStorage>(
323            this: &TurboTasksBackendInner<B>,
324            suspend: impl FnOnce() -> AnyOperation,
325        ) {
326            let operation = Arc::new(suspend());
327            let mut snapshot_request = this.snapshot_request.lock();
328            if snapshot_request.snapshot_requested {
329                snapshot_request
330                    .suspended_operations
331                    .insert(operation.clone().into());
332                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
333                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
334                if value == SNAPSHOT_REQUESTED_BIT {
335                    this.operations_suspended.notify_all();
336                }
337                this.snapshot_completed
338                    .wait_while(&mut snapshot_request, |snapshot_request| {
339                        snapshot_request.snapshot_requested
340                    });
341                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
342                snapshot_request
343                    .suspended_operations
344                    .remove(&operation.into());
345            }
346        }
347
348        if self.suspending_requested() {
349            operation_suspend_point_cold(self, suspend);
350        }
351    }
352
353    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
354        if !self.should_persist() {
355            return OperationGuard { backend: None };
356        }
357        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
358        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
359            let mut snapshot_request = self.snapshot_request.lock();
360            if snapshot_request.snapshot_requested {
361                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
362                if value == SNAPSHOT_REQUESTED_BIT {
363                    self.operations_suspended.notify_all();
364                }
365                self.snapshot_completed
366                    .wait_while(&mut snapshot_request, |snapshot_request| {
367                        snapshot_request.snapshot_requested
368                    });
369                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
370            }
371        }
372        OperationGuard {
373            backend: Some(self),
374        }
375    }
376
377    fn should_persist(&self) -> bool {
378        matches!(
379            self.options.storage_mode,
380            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
381        )
382    }
383
384    fn should_restore(&self) -> bool {
385        self.options.storage_mode.is_some()
386    }
387
388    fn should_track_dependencies(&self) -> bool {
389        self.options.dependency_tracking
390    }
391
392    fn should_track_activeness(&self) -> bool {
393        self.options.active_tracking
394    }
395
396    fn track_cache_hit(&self, task_type: &CachedTaskType) {
397        self.task_statistics
398            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
399    }
400
401    fn track_cache_miss(&self, task_type: &CachedTaskType) {
402        self.task_statistics
403            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
404    }
405}
406
407pub(crate) struct OperationGuard<'a, B: BackingStorage> {
408    backend: Option<&'a TurboTasksBackendInner<B>>,
409}
410
411impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
412    fn drop(&mut self) {
413        if let Some(backend) = self.backend {
414            let fetch_sub = backend
415                .in_progress_operations
416                .fetch_sub(1, Ordering::AcqRel);
417            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
418                backend.operations_suspended.notify_all();
419            }
420        }
421    }
422}
423
424/// Intermediate result of step 1 of task execution completion.
425struct TaskExecutionCompletePrepareResult {
426    pub new_children: FxHashSet<TaskId>,
427    pub removed_data: Vec<CachedDataItem>,
428    pub is_now_immutable: bool,
429    #[cfg(feature = "verify_determinism")]
430    pub no_output_set: bool,
431    pub new_output: Option<OutputValue>,
432    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
433}
434
435// Operations
436impl<B: BackingStorage> TurboTasksBackendInner<B> {
437    /// # Safety
438    ///
439    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
440    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
441        &'l self,
442        tx: Option<&'l B::ReadTransaction<'tx>>,
443        parent_task: Option<TaskId>,
444        child_task: TaskId,
445        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
446    ) {
447        operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
448            self.execute_context_with_tx(tx, turbo_tasks)
449        });
450    }
451
452    fn connect_child(
453        &self,
454        parent_task: Option<TaskId>,
455        child_task: TaskId,
456        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
457    ) {
458        operation::ConnectChildOperation::run(
459            parent_task,
460            child_task,
461            self.execute_context(turbo_tasks),
462        );
463    }
464
465    fn try_read_task_output(
466        self: &Arc<Self>,
467        task_id: TaskId,
468        reader: Option<TaskId>,
469        options: ReadOutputOptions,
470        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
471    ) -> Result<Result<RawVc, EventListener>> {
472        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
473
474        let mut ctx = self.execute_context(turbo_tasks);
475        let need_reader_task = if self.should_track_dependencies()
476            && !matches!(options.tracking, ReadTracking::Untracked)
477            && reader.is_some_and(|reader_id| reader_id != task_id)
478            && let Some(reader_id) = reader
479            && reader_id != task_id
480        {
481            Some(reader_id)
482        } else {
483            None
484        };
485        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
486            // Having a task_pair here is not optimal, but otherwise this would lead to a race
487            // condition. See below.
488            // TODO(sokra): solve that in a more performant way.
489            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
490            (task, Some(reader))
491        } else {
492            (ctx.task(task_id, TaskDataCategory::All), None)
493        };
494
495        fn listen_to_done_event<B: BackingStorage>(
496            this: &TurboTasksBackendInner<B>,
497            reader: Option<TaskId>,
498            tracking: ReadTracking,
499            done_event: &Event,
500        ) -> EventListener {
501            done_event.listen_with_note(move || {
502                let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
503                move || {
504                    if let Some(reader_desc) = reader_desc.as_ref() {
505                        format!("try_read_task_output from {} ({})", reader_desc(), tracking)
506                    } else {
507                        format!("try_read_task_output ({})", tracking)
508                    }
509                }
510            })
511        }
512
513        fn check_in_progress<B: BackingStorage>(
514            this: &TurboTasksBackendInner<B>,
515            task: &impl TaskGuard,
516            reader: Option<TaskId>,
517            tracking: ReadTracking,
518            ctx: &impl ExecuteContext<'_>,
519        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
520        {
521            match get!(task, InProgress) {
522                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
523                    listen_to_done_event(this, reader, tracking, done_event),
524                ))),
525                Some(InProgressState::InProgress(box InProgressStateInner {
526                    done_event, ..
527                })) => Some(Ok(Err(listen_to_done_event(
528                    this, reader, tracking, done_event,
529                )))),
530                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
531                    "{} was canceled",
532                    ctx.get_task_description(task.id())
533                ))),
534                None => None,
535            }
536        }
537
538        if matches!(options.consistency, ReadConsistency::Strong) {
539            // Ensure it's an root node
540            loop {
541                let aggregation_number = get_aggregation_number(&task);
542                if is_root_node(aggregation_number) {
543                    break;
544                }
545                drop(task);
546                drop(reader_task);
547                {
548                    let _span = tracing::trace_span!(
549                        "make root node for strongly consistent read",
550                        %task_id
551                    )
552                    .entered();
553                    AggregationUpdateQueue::run(
554                        AggregationUpdateJob::UpdateAggregationNumber {
555                            task_id,
556                            base_aggregation_number: u32::MAX,
557                            distance: None,
558                        },
559                        &mut ctx,
560                    );
561                }
562                (task, reader_task) = if let Some(reader_id) = need_reader_task {
563                    // TODO(sokra): see comment above
564                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
565                    (task, Some(reader))
566                } else {
567                    (ctx.task(task_id, TaskDataCategory::All), None)
568                }
569            }
570
571            let is_dirty = task.is_dirty();
572
573            // Check the dirty count of the root node
574            let has_dirty_containers = task.has_dirty_containers();
575            if has_dirty_containers || is_dirty {
576                let activeness = get_mut!(task, Activeness);
577                let mut task_ids_to_schedule: Vec<_> = Vec::new();
578                // When there are dirty task, subscribe to the all_clean_event
579                let activeness = if let Some(activeness) = activeness {
580                    // This makes sure all tasks stay active and this task won't stale.
581                    // active_until_clean is automatically removed when this
582                    // task is clean.
583                    activeness.set_active_until_clean();
584                    activeness
585                } else {
586                    // If we don't have a root state, add one. This also makes sure all tasks stay
587                    // active and this task won't stale. active_until_clean
588                    // is automatically removed when this task is clean.
589                    get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
590                        .set_active_until_clean();
591                    if ctx.should_track_activeness() {
592                        // A newly added Activeness need to make sure to schedule the tasks
593                        task_ids_to_schedule = task.dirty_containers().collect();
594                        task_ids_to_schedule.push(task_id);
595                    }
596                    get!(task, Activeness).unwrap()
597                };
598                let listener = activeness.all_clean_event.listen_with_note(move || {
599                    let this = self.clone();
600                    let tt = turbo_tasks.pin();
601                    move || {
602                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
603                        let mut ctx = this.execute_context(tt);
604                        let mut visited = FxHashSet::default();
605                        fn indent(s: &str) -> String {
606                            s.split_inclusive('\n')
607                                .flat_map(|line: &str| ["  ", line].into_iter())
608                                .collect::<String>()
609                        }
610                        fn get_info(
611                            ctx: &mut impl ExecuteContext<'_>,
612                            task_id: TaskId,
613                            parent_and_count: Option<(TaskId, i32)>,
614                            visited: &mut FxHashSet<TaskId>,
615                        ) -> String {
616                            let task = ctx.task(task_id, TaskDataCategory::All);
617                            let is_dirty = task.is_dirty();
618                            let in_progress =
619                                get!(task, InProgress).map_or("not in progress", |p| match p {
620                                    InProgressState::InProgress(_) => "in progress",
621                                    InProgressState::Scheduled { .. } => "scheduled",
622                                    InProgressState::Canceled => "canceled",
623                                });
624                            let activeness = get!(task, Activeness).map_or_else(
625                                || "not active".to_string(),
626                                |activeness| format!("{activeness:?}"),
627                            );
628                            let aggregation_number = get_aggregation_number(&task);
629                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
630                            {
631                                let uppers = get_uppers(&task);
632                                !uppers.contains(&parent_task_id)
633                            } else {
634                                false
635                            };
636
637                            // Check the dirty count of the root node
638                            let has_dirty_containers = task.has_dirty_containers();
639
640                            let task_description = ctx.get_task_description(task_id);
641                            let is_dirty_label = if is_dirty { ", dirty" } else { "" };
642                            let has_dirty_containers_label = if has_dirty_containers {
643                                ", dirty containers"
644                            } else {
645                                ""
646                            };
647                            let count = if let Some((_, count)) = parent_and_count {
648                                format!(" {count}")
649                            } else {
650                                String::new()
651                            };
652                            let mut info = format!(
653                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
654                                 {in_progress}, \
655                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
656                            );
657                            let children: Vec<_> = task.dirty_containers_with_count().collect();
658                            drop(task);
659
660                            if missing_upper {
661                                info.push_str("\n  ERROR: missing upper connection");
662                            }
663
664                            if has_dirty_containers || !children.is_empty() {
665                                writeln!(info, "\n  dirty tasks:").unwrap();
666
667                                for (child_task_id, count) in children {
668                                    let task_description = ctx.get_task_description(child_task_id);
669                                    if visited.insert(child_task_id) {
670                                        let child_info = get_info(
671                                            ctx,
672                                            child_task_id,
673                                            Some((task_id, count)),
674                                            visited,
675                                        );
676                                        info.push_str(&indent(&child_info));
677                                        if !info.ends_with('\n') {
678                                            info.push('\n');
679                                        }
680                                    } else {
681                                        writeln!(
682                                            info,
683                                            "  {child_task_id} {task_description} {count} \
684                                             (already visited)"
685                                        )
686                                        .unwrap();
687                                    }
688                                }
689                            }
690                            info
691                        }
692                        let info = get_info(&mut ctx, task_id, None, &mut visited);
693                        format!(
694                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
695                        )
696                    }
697                });
698                drop(reader_task);
699                drop(task);
700                if !task_ids_to_schedule.is_empty() {
701                    let mut queue = AggregationUpdateQueue::new();
702                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
703                    queue.execute(&mut ctx);
704                }
705
706                return Ok(Err(listener));
707            }
708        }
709
710        if let Some(value) = check_in_progress(self, &task, reader, options.tracking, &ctx) {
711            return value;
712        }
713
714        if let Some(output) = get!(task, Output) {
715            let result = match output {
716                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
717                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
718                OutputValue::Error(error) => Err(error
719                    .with_task_context(ctx.get_task_description(task_id), Some(task_id))
720                    .into()),
721            };
722            if let Some(mut reader_task) = reader_task
723                && options.tracking.should_track(result.is_err())
724                && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
725            {
726                #[cfg(feature = "trace_task_output_dependencies")]
727                let _span = tracing::trace_span!(
728                    "add output dependency",
729                    task = %task_id,
730                    dependent_task = ?reader
731                )
732                .entered();
733                let _ = task.add(CachedDataItem::OutputDependent {
734                    task: reader.unwrap(),
735                    value: (),
736                });
737                drop(task);
738
739                // Note: We use `task_pair` earlier to lock the task and its reader at the same
740                // time. If we didn't and just locked the reader here, an invalidation could occur
741                // between grabbing the locks. If that happened, and if the task is "outdated" or
742                // doesn't have the dependency edge yet, the invalidation would be lost.
743
744                if reader_task
745                    .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
746                    .is_none()
747                {
748                    let _ = reader_task.add(CachedDataItem::OutputDependency {
749                        target: task_id,
750                        value: (),
751                    });
752                }
753            }
754
755            return result;
756        }
757        drop(reader_task);
758
759        let note = move || {
760            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
761            move || {
762                if let Some(reader_desc) = reader_desc.as_ref() {
763                    format!("try_read_task_output (recompute) from {}", (reader_desc)())
764                } else {
765                    "try_read_task_output (recompute, untracked)".to_string()
766                }
767            }
768        };
769
770        // Output doesn't exist. We need to schedule the task to compute it.
771        let (item, listener) = CachedDataItem::new_scheduled_with_listener(
772            TaskExecutionReason::OutputNotAvailable,
773            || self.get_task_desc_fn(task_id),
774            note,
775        );
776        // It's not possible that the task is InProgress at this point. If it is InProgress {
777        // done: true } it must have Output and would early return.
778        task.add_new(item);
779        ctx.schedule_task(task);
780
781        Ok(Err(listener))
782    }
783
784    fn try_read_task_cell(
785        &self,
786        task_id: TaskId,
787        reader: Option<TaskId>,
788        cell: CellId,
789        options: ReadCellOptions,
790        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
791    ) -> Result<Result<TypedCellContent, EventListener>> {
792        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
793
794        fn add_cell_dependency(
795            task_id: TaskId,
796            mut task: impl TaskGuard,
797            reader: Option<TaskId>,
798            reader_task: Option<impl TaskGuard>,
799            cell: CellId,
800        ) {
801            if let Some(mut reader_task) = reader_task
802                && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
803            {
804                let _ = task.add(CachedDataItem::CellDependent {
805                    cell,
806                    task: reader.unwrap(),
807                    value: (),
808                });
809                drop(task);
810
811                // Note: We use `task_pair` earlier to lock the task and its reader at the same
812                // time. If we didn't and just locked the reader here, an invalidation could occur
813                // between grabbing the locks. If that happened, and if the task is "outdated" or
814                // doesn't have the dependency edge yet, the invalidation would be lost.
815
816                let target = CellRef {
817                    task: task_id,
818                    cell,
819                };
820                if reader_task
821                    .remove(&CachedDataItemKey::OutdatedCellDependency { target })
822                    .is_none()
823                {
824                    let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
825                }
826            }
827        }
828
829        let ReadCellOptions {
830            is_serializable_cell_content,
831            tracking,
832            final_read_hint,
833        } = options;
834
835        let mut ctx = self.execute_context(turbo_tasks);
836        let (mut task, reader_task) = if self.should_track_dependencies()
837            && !matches!(tracking, ReadTracking::Untracked)
838            && let Some(reader_id) = reader
839            && reader_id != task_id
840        {
841            // Having a task_pair here is not optimal, but otherwise this would lead to a race
842            // condition. See below.
843            // TODO(sokra): solve that in a more performant way.
844            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::Data);
845            (task, Some(reader))
846        } else {
847            (ctx.task(task_id, TaskDataCategory::Data), None)
848        };
849
850        let content = if final_read_hint {
851            task.remove_cell_data(is_serializable_cell_content, cell)
852        } else {
853            task.get_cell_data(is_serializable_cell_content, cell)
854        };
855        if let Some(content) = content {
856            if tracking.should_track(false) {
857                add_cell_dependency(task_id, task, reader, reader_task, cell);
858            }
859            return Ok(Ok(TypedCellContent(
860                cell.type_id,
861                CellContent(Some(content.reference)),
862            )));
863        }
864
865        let in_progress = get!(task, InProgress);
866        if matches!(
867            in_progress,
868            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
869        ) {
870            return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
871        }
872        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
873
874        // Check cell index range (cell might not exist at all)
875        let max_id = get!(
876            task,
877            CellTypeMaxIndex {
878                cell_type: cell.type_id
879            }
880        )
881        .copied();
882        let Some(max_id) = max_id else {
883            if tracking.should_track(true) {
884                add_cell_dependency(task_id, task, reader, reader_task, cell);
885            }
886            bail!(
887                "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
888                ctx.get_task_description(task_id)
889            );
890        };
891        if cell.index >= max_id {
892            if tracking.should_track(true) {
893                add_cell_dependency(task_id, task, reader, reader_task, cell);
894            }
895            bail!(
896                "Cell {cell:?} no longer exists in task {} (index out of bounds)",
897                ctx.get_task_description(task_id)
898            );
899        }
900        drop(reader_task);
901
902        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
903        // task the get the cell content.
904
905        // Listen to the cell and potentially schedule the task
906        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
907        if !new_listener {
908            return Ok(Err(listener));
909        }
910
911        let _span = tracing::trace_span!(
912            "recomputation",
913            cell_type = get_value_type(cell.type_id).global_name,
914            cell_index = cell.index
915        )
916        .entered();
917
918        // Schedule the task, if not already scheduled
919        if is_cancelled {
920            bail!("{} was canceled", ctx.get_task_description(task_id));
921        }
922        task.add_new(CachedDataItem::new_scheduled(
923            TaskExecutionReason::CellNotAvailable,
924            || self.get_task_desc_fn(task_id),
925        ));
926        ctx.schedule_task(task);
927
928        Ok(Err(listener))
929    }
930
931    fn listen_to_cell(
932        &self,
933        task: &mut impl TaskGuard,
934        task_id: TaskId,
935        reader: Option<TaskId>,
936        cell: CellId,
937    ) -> (EventListener, bool) {
938        let note = move || {
939            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
940            move || {
941                if let Some(reader_desc) = reader_desc.as_ref() {
942                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
943                } else {
944                    "try_read_task_cell (in progress, untracked)".to_string()
945                }
946            }
947        };
948        if let Some(in_progress) = get!(task, InProgressCell { cell }) {
949            // Someone else is already computing the cell
950            let listener = in_progress.event.listen_with_note(note);
951            return (listener, false);
952        }
953        let in_progress = InProgressCellState::new(task_id, cell);
954        let listener = in_progress.event.listen_with_note(note);
955        task.add_new(CachedDataItem::InProgressCell {
956            cell,
957            value: in_progress,
958        });
959        (listener, true)
960    }
961
962    fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
963        if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
964            return Some(task_type);
965        }
966        if self.should_restore()
967            && self.local_is_partial.load(Ordering::Acquire)
968            && !task_id.is_transient()
969            && let Some(task_type) = unsafe {
970                self.backing_storage
971                    .reverse_lookup_task_cache(None, task_id)
972                    .expect("Failed to lookup task type")
973            }
974        {
975            let _ = self.task_cache.try_insert(task_type.clone(), task_id);
976            return Some(task_type);
977        }
978        None
979    }
980
981    fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
982        let task_type = self.lookup_task_type(task_id);
983        move || {
984            task_type.as_ref().map_or_else(
985                || format!("{task_id:?} transient"),
986                |task_type| format!("{task_id:?} {task_type}"),
987            )
988        }
989    }
990
991    fn snapshot_and_persist(
992        &self,
993        parent_span: Option<tracing::Id>,
994        reason: &str,
995    ) -> Option<(Instant, bool)> {
996        let snapshot_span =
997            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
998                .entered();
999        let start = Instant::now();
1000        debug_assert!(self.should_persist());
1001
1002        let suspended_operations;
1003        {
1004            let _span = tracing::info_span!("blocking").entered();
1005            let mut snapshot_request = self.snapshot_request.lock();
1006            snapshot_request.snapshot_requested = true;
1007            let active_operations = self
1008                .in_progress_operations
1009                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1010            if active_operations != 0 {
1011                self.operations_suspended
1012                    .wait_while(&mut snapshot_request, |_| {
1013                        self.in_progress_operations.load(Ordering::Relaxed)
1014                            != SNAPSHOT_REQUESTED_BIT
1015                    });
1016            }
1017            suspended_operations = snapshot_request
1018                .suspended_operations
1019                .iter()
1020                .map(|op| op.arc().clone())
1021                .collect::<Vec<_>>();
1022        }
1023        self.storage.start_snapshot();
1024        let mut persisted_task_cache_log = self
1025            .persisted_task_cache_log
1026            .as_ref()
1027            .map(|l| l.take(|i| i))
1028            .unwrap_or_default();
1029        let mut snapshot_request = self.snapshot_request.lock();
1030        snapshot_request.snapshot_requested = false;
1031        self.in_progress_operations
1032            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1033        self.snapshot_completed.notify_all();
1034        let snapshot_time = Instant::now();
1035        drop(snapshot_request);
1036
1037        let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
1038            if task_id.is_transient() {
1039                return (None, None);
1040            }
1041            let len = inner.len();
1042
1043            let meta_restored = inner.state().meta_restored();
1044            let data_restored = inner.state().data_restored();
1045
1046            let mut meta = meta_restored.then(|| Vec::with_capacity(len));
1047            let mut data = data_restored.then(|| Vec::with_capacity(len));
1048            for (key, value) in inner.iter_all() {
1049                if key.is_persistent() && value.is_persistent() {
1050                    match key.category() {
1051                        TaskDataCategory::Meta => {
1052                            if let Some(meta) = &mut meta {
1053                                meta.push(CachedDataItem::from_key_and_value_ref(key, value))
1054                            }
1055                        }
1056                        TaskDataCategory::Data => {
1057                            if let Some(data) = &mut data {
1058                                data.push(CachedDataItem::from_key_and_value_ref(key, value))
1059                            }
1060                        }
1061                        _ => {}
1062                    }
1063                }
1064            }
1065
1066            (meta, data)
1067        };
1068        let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
1069            // TODO: perf: Instead of returning a `Vec` of individually allocated `SmallVec`s, it'd
1070            // be better to append everything to a flat per-task or per-shard `Vec<u8>`, and have
1071            // each `serialize` call return `(start_idx, end_idx)`.
1072            (
1073                task_id,
1074                meta.map(|d| self.backing_storage.serialize(task_id, &d)),
1075                data.map(|d| self.backing_storage.serialize(task_id, &d)),
1076            )
1077        };
1078        let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
1079            if task_id.is_transient() {
1080                return (task_id, None, None);
1081            }
1082            let len = inner.len();
1083            let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
1084            let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
1085            for (key, value) in inner.iter_all() {
1086                if key.is_persistent() && value.is_persistent() {
1087                    match key.category() {
1088                        TaskDataCategory::Meta => {
1089                            if let Some(meta) = &mut meta {
1090                                meta.push(CachedDataItem::from_key_and_value_ref(key, value));
1091                            }
1092                        }
1093                        TaskDataCategory::Data => {
1094                            if let Some(data) = &mut data {
1095                                data.push(CachedDataItem::from_key_and_value_ref(key, value));
1096                            }
1097                        }
1098                        _ => {}
1099                    }
1100                }
1101            }
1102            (
1103                task_id,
1104                meta.map(|meta| self.backing_storage.serialize(task_id, &meta)),
1105                data.map(|data| self.backing_storage.serialize(task_id, &data)),
1106            )
1107        };
1108
1109        let snapshot = self
1110            .storage
1111            .take_snapshot(&preprocess, &process, &process_snapshot);
1112
1113        #[cfg(feature = "print_cache_item_size")]
1114        #[derive(Default)]
1115        struct TaskCacheStats {
1116            data: usize,
1117            data_count: usize,
1118            meta: usize,
1119            meta_count: usize,
1120        }
1121        #[cfg(feature = "print_cache_item_size")]
1122        impl TaskCacheStats {
1123            fn add_data(&mut self, len: usize) {
1124                self.data += len;
1125                self.data_count += 1;
1126            }
1127
1128            fn add_meta(&mut self, len: usize) {
1129                self.meta += len;
1130                self.meta_count += 1;
1131            }
1132        }
1133        #[cfg(feature = "print_cache_item_size")]
1134        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1135            Mutex::new(FxHashMap::default());
1136
1137        let task_snapshots = snapshot
1138            .into_iter()
1139            .filter_map(|iter| {
1140                let mut iter = iter
1141                    .filter_map(
1142                        |(task_id, meta, data): (
1143                            _,
1144                            Option<Result<SmallVec<_>>>,
1145                            Option<Result<SmallVec<_>>>,
1146                        )| {
1147                            let meta = match meta {
1148                                Some(Ok(meta)) => {
1149                                    #[cfg(feature = "print_cache_item_size")]
1150                                    task_cache_stats
1151                                        .lock()
1152                                        .entry(self.get_task_description(task_id))
1153                                        .or_default()
1154                                        .add_meta(meta.len());
1155                                    Some(meta)
1156                                }
1157                                None => None,
1158                                Some(Err(err)) => {
1159                                    println!(
1160                                        "Serializing task {} failed (meta): {:?}",
1161                                        self.get_task_description(task_id),
1162                                        err
1163                                    );
1164                                    None
1165                                }
1166                            };
1167                            let data = match data {
1168                                Some(Ok(data)) => {
1169                                    #[cfg(feature = "print_cache_item_size")]
1170                                    task_cache_stats
1171                                        .lock()
1172                                        .entry(self.get_task_description(task_id))
1173                                        .or_default()
1174                                        .add_data(data.len());
1175                                    Some(data)
1176                                }
1177                                None => None,
1178                                Some(Err(err)) => {
1179                                    println!(
1180                                        "Serializing task {} failed (data): {:?}",
1181                                        self.get_task_description(task_id),
1182                                        err
1183                                    );
1184                                    None
1185                                }
1186                            };
1187                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1188                        },
1189                    )
1190                    .peekable();
1191                iter.peek().is_some().then_some(iter)
1192            })
1193            .collect::<Vec<_>>();
1194
1195        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1196
1197        drop(snapshot_span);
1198
1199        if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1200            return Some((snapshot_time, false));
1201        }
1202
1203        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1204        {
1205            if let Err(err) = self.backing_storage.save_snapshot(
1206                suspended_operations,
1207                persisted_task_cache_log,
1208                task_snapshots,
1209            ) {
1210                println!("Persisting failed: {err:?}");
1211                return None;
1212            }
1213            #[cfg(feature = "print_cache_item_size")]
1214            {
1215                let mut task_cache_stats = task_cache_stats
1216                    .into_inner()
1217                    .into_iter()
1218                    .collect::<Vec<_>>();
1219                if !task_cache_stats.is_empty() {
1220                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1221                        (stats_b.data + stats_b.meta, key_b)
1222                            .cmp(&(stats_a.data + stats_a.meta, key_a))
1223                    });
1224                    println!("Task cache stats:");
1225                    for (task_desc, stats) in task_cache_stats {
1226                        use std::ops::Div;
1227
1228                        use turbo_tasks::util::FormatBytes;
1229
1230                        println!(
1231                            "  {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1232                            FormatBytes(stats.data + stats.meta),
1233                            FormatBytes(stats.meta),
1234                            stats.meta_count,
1235                            FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1236                            FormatBytes(stats.data),
1237                            stats.data_count,
1238                            FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1239                        );
1240                    }
1241                }
1242            }
1243        }
1244
1245        let elapsed = start.elapsed();
1246        // avoid spamming the event queue with information about fast operations
1247        if elapsed > Duration::from_secs(10) {
1248            turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1249                "Finished writing to filesystem cache".to_string(),
1250                elapsed,
1251            )));
1252        }
1253
1254        Some((snapshot_time, true))
1255    }
1256
1257    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1258        if self.should_restore() {
1259            // Continue all uncompleted operations
1260            // They can't be interrupted by a snapshot since the snapshotting job has not been
1261            // scheduled yet.
1262            let uncompleted_operations = self
1263                .backing_storage
1264                .uncompleted_operations()
1265                .expect("Failed to get uncompleted operations");
1266            if !uncompleted_operations.is_empty() {
1267                let mut ctx = self.execute_context(turbo_tasks);
1268                for op in uncompleted_operations {
1269                    op.execute(&mut ctx);
1270                }
1271            }
1272        }
1273
1274        // Only when it should write regularly to the storage, we schedule the initial snapshot
1275        // job.
1276        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1277            // Schedule the snapshot job
1278            let _span = trace_span!("persisting background job").entered();
1279            let _span = tracing::info_span!("thread").entered();
1280            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1281        }
1282    }
1283
1284    fn stopping(&self) {
1285        self.stopping.store(true, Ordering::Release);
1286        self.stopping_event.notify(usize::MAX);
1287    }
1288
1289    #[allow(unused_variables)]
1290    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1291        #[cfg(feature = "verify_aggregation_graph")]
1292        {
1293            self.is_idle.store(false, Ordering::Release);
1294            self.verify_aggregation_graph(turbo_tasks, false);
1295        }
1296        if self.should_persist() {
1297            self.snapshot_and_persist(Span::current().into(), "stop");
1298        }
1299        self.task_cache.drop_contents();
1300        drop_contents(&self.transient_tasks);
1301        self.storage.drop_contents();
1302        if let Err(err) = self.backing_storage.shutdown() {
1303            println!("Shutting down failed: {err}");
1304        }
1305    }
1306
1307    #[allow(unused_variables)]
1308    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1309        self.idle_start_event.notify(usize::MAX);
1310
1311        #[cfg(feature = "verify_aggregation_graph")]
1312        {
1313            use tokio::select;
1314
1315            self.is_idle.store(true, Ordering::Release);
1316            let this = self.clone();
1317            let turbo_tasks = turbo_tasks.pin();
1318            tokio::task::spawn(async move {
1319                select! {
1320                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1321                        // do nothing
1322                    }
1323                    _ = this.idle_end_event.listen() => {
1324                        return;
1325                    }
1326                }
1327                if !this.is_idle.load(Ordering::Relaxed) {
1328                    return;
1329                }
1330                this.verify_aggregation_graph(&*turbo_tasks, true);
1331            });
1332        }
1333    }
1334
1335    fn idle_end(&self) {
1336        #[cfg(feature = "verify_aggregation_graph")]
1337        self.is_idle.store(false, Ordering::Release);
1338        self.idle_end_event.notify(usize::MAX);
1339    }
1340
1341    fn get_or_create_persistent_task(
1342        &self,
1343        task_type: CachedTaskType,
1344        parent_task: Option<TaskId>,
1345        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1346    ) -> TaskId {
1347        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1348            self.track_cache_hit(&task_type);
1349            self.connect_child(parent_task, task_id, turbo_tasks);
1350            return task_id;
1351        }
1352
1353        let check_backing_storage =
1354            self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1355        let tx = check_backing_storage
1356            .then(|| self.backing_storage.start_read_transaction())
1357            .flatten();
1358        let task_id = {
1359            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1360            if let Some(task_id) = unsafe {
1361                check_backing_storage
1362                    .then(|| {
1363                        self.backing_storage
1364                            .forward_lookup_task_cache(tx.as_ref(), &task_type)
1365                            .expect("Failed to lookup task id")
1366                    })
1367                    .flatten()
1368            } {
1369                self.track_cache_hit(&task_type);
1370                let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1371                task_id
1372            } else {
1373                let task_type = Arc::new(task_type);
1374                let task_id = self.persisted_task_id_factory.get();
1375                let task_id = if let Err(existing_task_id) =
1376                    self.task_cache.try_insert(task_type.clone(), task_id)
1377                {
1378                    self.track_cache_hit(&task_type);
1379                    // Safety: We just created the id and failed to insert it.
1380                    unsafe {
1381                        self.persisted_task_id_factory.reuse(task_id);
1382                    }
1383                    existing_task_id
1384                } else {
1385                    self.track_cache_miss(&task_type);
1386                    task_id
1387                };
1388                if let Some(log) = &self.persisted_task_cache_log {
1389                    log.lock(task_id).push((task_type, task_id));
1390                }
1391                task_id
1392            }
1393        };
1394
1395        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1396        unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1397
1398        task_id
1399    }
1400
1401    fn get_or_create_transient_task(
1402        &self,
1403        task_type: CachedTaskType,
1404        parent_task: Option<TaskId>,
1405        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1406    ) -> TaskId {
1407        if let Some(parent_task) = parent_task
1408            && !parent_task.is_transient()
1409        {
1410            self.panic_persistent_calling_transient(
1411                self.lookup_task_type(parent_task).as_deref(),
1412                Some(&task_type),
1413                /* cell_id */ None,
1414            );
1415        }
1416        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1417            self.track_cache_hit(&task_type);
1418            self.connect_child(parent_task, task_id, turbo_tasks);
1419            return task_id;
1420        }
1421
1422        let task_type = Arc::new(task_type);
1423        let task_id = self.transient_task_id_factory.get();
1424        if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1425            self.track_cache_hit(&task_type);
1426            // Safety: We just created the id and failed to insert it.
1427            unsafe {
1428                self.transient_task_id_factory.reuse(task_id);
1429            }
1430            self.connect_child(parent_task, existing_task_id, turbo_tasks);
1431            return existing_task_id;
1432        }
1433
1434        self.track_cache_miss(&task_type);
1435        self.connect_child(parent_task, task_id, turbo_tasks);
1436
1437        task_id
1438    }
1439
1440    /// Generate an object that implements [`fmt::Display`] explaining why the given
1441    /// [`CachedTaskType`] is transient.
1442    fn debug_trace_transient_task(
1443        &self,
1444        task_type: &CachedTaskType,
1445        cell_id: Option<CellId>,
1446    ) -> DebugTraceTransientTask {
1447        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1448        // from tracing the same task many times, so use a visited_set
1449        fn inner_id(
1450            backend: &TurboTasksBackendInner<impl BackingStorage>,
1451            task_id: TaskId,
1452            cell_type_id: Option<ValueTypeId>,
1453            visited_set: &mut FxHashSet<TaskId>,
1454        ) -> DebugTraceTransientTask {
1455            if let Some(task_type) = backend.lookup_task_type(task_id) {
1456                if visited_set.contains(&task_id) {
1457                    let task_name = task_type.get_name();
1458                    DebugTraceTransientTask::Collapsed {
1459                        task_name,
1460                        cell_type_id,
1461                    }
1462                } else {
1463                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1464                }
1465            } else {
1466                DebugTraceTransientTask::Uncached { cell_type_id }
1467            }
1468        }
1469        fn inner_cached(
1470            backend: &TurboTasksBackendInner<impl BackingStorage>,
1471            task_type: &CachedTaskType,
1472            cell_type_id: Option<ValueTypeId>,
1473            visited_set: &mut FxHashSet<TaskId>,
1474        ) -> DebugTraceTransientTask {
1475            let task_name = task_type.get_name();
1476
1477            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1478                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1479                    // `task_id` should never be `None` at this point, as that would imply a
1480                    // non-local task is returning a local `Vc`...
1481                    // Just ignore if it happens, as we're likely already panicking.
1482                    return None;
1483                };
1484                if task_id.is_transient() {
1485                    Some(Box::new(inner_id(
1486                        backend,
1487                        task_id,
1488                        cause_self_raw_vc.try_get_type_id(),
1489                        visited_set,
1490                    )))
1491                } else {
1492                    None
1493                }
1494            });
1495            let cause_args = task_type
1496                .arg
1497                .get_raw_vcs()
1498                .into_iter()
1499                .filter_map(|raw_vc| {
1500                    let Some(task_id) = raw_vc.try_get_task_id() else {
1501                        // `task_id` should never be `None` (see comment above)
1502                        return None;
1503                    };
1504                    if !task_id.is_transient() {
1505                        return None;
1506                    }
1507                    Some((task_id, raw_vc.try_get_type_id()))
1508                })
1509                .collect::<IndexSet<_>>() // dedupe
1510                .into_iter()
1511                .map(|(task_id, cell_type_id)| {
1512                    inner_id(backend, task_id, cell_type_id, visited_set)
1513                })
1514                .collect();
1515
1516            DebugTraceTransientTask::Cached {
1517                task_name,
1518                cell_type_id,
1519                cause_self,
1520                cause_args,
1521            }
1522        }
1523        inner_cached(
1524            self,
1525            task_type,
1526            cell_id.map(|c| c.type_id),
1527            &mut FxHashSet::default(),
1528        )
1529    }
1530
1531    fn invalidate_task(
1532        &self,
1533        task_id: TaskId,
1534        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1535    ) {
1536        if !self.should_track_dependencies() {
1537            panic!("Dependency tracking is disabled so invalidation is not allowed");
1538        }
1539        operation::InvalidateOperation::run(
1540            smallvec![task_id],
1541            #[cfg(feature = "trace_task_dirty")]
1542            TaskDirtyCause::Invalidator,
1543            self.execute_context(turbo_tasks),
1544        );
1545    }
1546
1547    fn invalidate_tasks(
1548        &self,
1549        tasks: &[TaskId],
1550        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1551    ) {
1552        if !self.should_track_dependencies() {
1553            panic!("Dependency tracking is disabled so invalidation is not allowed");
1554        }
1555        operation::InvalidateOperation::run(
1556            tasks.iter().copied().collect(),
1557            #[cfg(feature = "trace_task_dirty")]
1558            TaskDirtyCause::Unknown,
1559            self.execute_context(turbo_tasks),
1560        );
1561    }
1562
1563    fn invalidate_tasks_set(
1564        &self,
1565        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1566        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1567    ) {
1568        if !self.should_track_dependencies() {
1569            panic!("Dependency tracking is disabled so invalidation is not allowed");
1570        }
1571        operation::InvalidateOperation::run(
1572            tasks.iter().copied().collect(),
1573            #[cfg(feature = "trace_task_dirty")]
1574            TaskDirtyCause::Unknown,
1575            self.execute_context(turbo_tasks),
1576        );
1577    }
1578
1579    fn invalidate_serialization(
1580        &self,
1581        task_id: TaskId,
1582        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1583    ) {
1584        if task_id.is_transient() {
1585            return;
1586        }
1587        let mut ctx = self.execute_context(turbo_tasks);
1588        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1589        task.invalidate_serialization();
1590    }
1591
1592    fn get_task_description(&self, task_id: TaskId) -> String {
1593        self.lookup_task_type(task_id).map_or_else(
1594            || format!("{task_id:?} transient"),
1595            |task_type| task_type.to_string(),
1596        )
1597    }
1598
1599    fn task_execution_canceled(
1600        &self,
1601        task_id: TaskId,
1602        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1603    ) {
1604        let mut ctx = self.execute_context(turbo_tasks);
1605        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1606        if let Some(in_progress) = remove!(task, InProgress) {
1607            match in_progress {
1608                InProgressState::Scheduled {
1609                    done_event,
1610                    reason: _,
1611                } => done_event.notify(usize::MAX),
1612                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1613                    done_event.notify(usize::MAX)
1614                }
1615                InProgressState::Canceled => {}
1616            }
1617        }
1618        task.add_new(CachedDataItem::InProgress {
1619            value: InProgressState::Canceled,
1620        });
1621    }
1622
1623    fn try_start_task_execution(
1624        &self,
1625        task_id: TaskId,
1626        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1627    ) -> Option<TaskExecutionSpec<'_>> {
1628        enum TaskType {
1629            Cached(Arc<CachedTaskType>),
1630            Transient(Arc<TransientTask>),
1631        }
1632        let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1633            (TaskType::Cached(task_type), false)
1634        } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1635            (
1636                TaskType::Transient(task_type.clone()),
1637                matches!(**task_type, TransientTask::Once(_)),
1638            )
1639        } else {
1640            return None;
1641        };
1642        let execution_reason;
1643        {
1644            let mut ctx = self.execute_context(turbo_tasks);
1645            let mut task = ctx.task(task_id, TaskDataCategory::All);
1646            let in_progress = remove!(task, InProgress)?;
1647            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1648                task.add_new(CachedDataItem::InProgress { value: in_progress });
1649                return None;
1650            };
1651            execution_reason = reason;
1652            task.add_new(CachedDataItem::InProgress {
1653                value: InProgressState::InProgress(Box::new(InProgressStateInner {
1654                    stale: false,
1655                    once_task,
1656                    done_event,
1657                    session_dependent: false,
1658                    marked_as_completed: false,
1659                    new_children: Default::default(),
1660                })),
1661            });
1662
1663            // Make all current collectibles outdated (remove left-over outdated collectibles)
1664            enum Collectible {
1665                Current(CollectibleRef, i32),
1666                Outdated(CollectibleRef),
1667            }
1668            let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1669                    .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1670                    .collect::<Vec<_>>();
1671            for collectible in collectibles {
1672                match collectible {
1673                    Collectible::Current(collectible, value) => {
1674                        let _ =
1675                            task.insert(CachedDataItem::OutdatedCollectible { collectible, value });
1676                    }
1677                    Collectible::Outdated(collectible) => {
1678                        if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1679                            task.remove(&CachedDataItemKey::OutdatedCollectible { collectible });
1680                        }
1681                    }
1682                }
1683            }
1684
1685            if self.should_track_dependencies() {
1686                // Make all dependencies outdated
1687                let outdated_cell_dependencies_to_add =
1688                    iter_many!(task, CellDependency { target } => target)
1689                        .collect::<SmallVec<[_; 8]>>();
1690                let outdated_cell_dependencies_to_remove =
1691                    iter_many!(task, OutdatedCellDependency { target } => target)
1692                        .filter(|&target| {
1693                            !task.has_key(&CachedDataItemKey::CellDependency { target })
1694                        })
1695                        .collect::<SmallVec<[_; 8]>>();
1696                task.extend(
1697                    CachedDataItemType::OutdatedCellDependency,
1698                    outdated_cell_dependencies_to_add
1699                        .into_iter()
1700                        .map(|target| CachedDataItem::OutdatedCellDependency { target, value: () }),
1701                );
1702                for target in outdated_cell_dependencies_to_remove {
1703                    task.remove(&CachedDataItemKey::OutdatedCellDependency { target });
1704                }
1705
1706                let outdated_output_dependencies_to_add =
1707                    iter_many!(task, OutputDependency { target } => target)
1708                        .collect::<SmallVec<[_; 8]>>();
1709                let outdated_output_dependencies_to_remove =
1710                    iter_many!(task, OutdatedOutputDependency { target } => target)
1711                        .filter(|&target| {
1712                            !task.has_key(&CachedDataItemKey::OutputDependency { target })
1713                        })
1714                        .collect::<SmallVec<[_; 8]>>();
1715                task.extend(
1716                    CachedDataItemType::OutdatedOutputDependency,
1717                    outdated_output_dependencies_to_add
1718                        .into_iter()
1719                        .map(|target| CachedDataItem::OutdatedOutputDependency {
1720                            target,
1721                            value: (),
1722                        }),
1723                );
1724                for target in outdated_output_dependencies_to_remove {
1725                    task.remove(&CachedDataItemKey::OutdatedOutputDependency { target });
1726                }
1727            }
1728        }
1729
1730        let (span, future) = match task_type {
1731            TaskType::Cached(task_type) => {
1732                let CachedTaskType {
1733                    native_fn,
1734                    this,
1735                    arg,
1736                } = &*task_type;
1737                (
1738                    native_fn.span(task_id.persistence(), execution_reason),
1739                    native_fn.execute(*this, &**arg),
1740                )
1741            }
1742            TaskType::Transient(task_type) => {
1743                let span = tracing::trace_span!("turbo_tasks::root_task");
1744                let future = match &*task_type {
1745                    TransientTask::Root(f) => f(),
1746                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1747                };
1748                (span, future)
1749            }
1750        };
1751        Some(TaskExecutionSpec { future, span })
1752    }
1753
1754    fn task_execution_completed(
1755        &self,
1756        task_id: TaskId,
1757        result: Result<RawVc, TurboTasksExecutionError>,
1758        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1759        stateful: bool,
1760        has_invalidator: bool,
1761        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1762    ) -> bool {
1763        // Task completion is a 4 step process:
1764        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1765        //    aggregation number of the task and the new children.
1766        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1767        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1768        // 4. Shrink the task memory to reduce footprint of the task.
1769
1770        // Due to persistence it is possible that the process is cancelled after any step. This is
1771        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1772        // in-memory representation.
1773
1774        // The task might be invalidated during this process, so we need to check the stale flag
1775        // at the start of every step.
1776
1777        #[cfg(not(feature = "trace_task_details"))]
1778        let _span = tracing::trace_span!("task execution completed").entered();
1779        #[cfg(feature = "trace_task_details")]
1780        let span = tracing::trace_span!(
1781            "task execution completed",
1782            task_id = display(task_id),
1783            result = match result.as_ref() {
1784                Ok(value) => display(either::Either::Left(value)),
1785                Err(err) => display(either::Either::Right(err)),
1786            },
1787            immutable = tracing::field::Empty,
1788            new_output = tracing::field::Empty,
1789            output_dependents = tracing::field::Empty,
1790            stale = tracing::field::Empty,
1791        )
1792        .entered();
1793        let mut ctx = self.execute_context(turbo_tasks);
1794
1795        let Some(TaskExecutionCompletePrepareResult {
1796            new_children,
1797            mut removed_data,
1798            is_now_immutable,
1799            #[cfg(feature = "verify_determinism")]
1800            no_output_set,
1801            new_output,
1802            output_dependent_tasks,
1803        }) = self.task_execution_completed_prepare(
1804            &mut ctx,
1805            #[cfg(feature = "trace_task_details")]
1806            &span,
1807            task_id,
1808            result,
1809            cell_counters,
1810            stateful,
1811            has_invalidator,
1812        )
1813        else {
1814            // Task was stale and has been rescheduled
1815            #[cfg(feature = "trace_task_details")]
1816            span.record("stale", "true");
1817            return true;
1818        };
1819
1820        #[cfg(feature = "trace_task_details")]
1821        span.record("new_output", new_output.is_some());
1822        #[cfg(feature = "trace_task_details")]
1823        span.record("output_dependents", output_dependent_tasks.len());
1824
1825        // When restoring from filesystem cache the following might not be executed (since we can
1826        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1827        // would be executed again.
1828
1829        if !output_dependent_tasks.is_empty() {
1830            self.task_execution_completed_invalidate_output_dependent(
1831                &mut ctx,
1832                task_id,
1833                output_dependent_tasks,
1834            );
1835        }
1836
1837        let has_new_children = !new_children.is_empty();
1838
1839        if has_new_children {
1840            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1841        }
1842
1843        if has_new_children
1844            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1845        {
1846            // Task was stale and has been rescheduled
1847            #[cfg(feature = "trace_task_details")]
1848            span.record("stale", "true");
1849            return true;
1850        }
1851
1852        if self.task_execution_completed_finish(
1853            &mut ctx,
1854            task_id,
1855            #[cfg(feature = "verify_determinism")]
1856            no_output_set,
1857            new_output,
1858            &mut removed_data,
1859            is_now_immutable,
1860        ) {
1861            // Task was stale and has been rescheduled
1862            #[cfg(feature = "trace_task_details")]
1863            span.record("stale", "true");
1864            return true;
1865        }
1866
1867        drop(removed_data);
1868
1869        self.task_execution_completed_cleanup(&mut ctx, task_id);
1870
1871        false
1872    }
1873
1874    fn task_execution_completed_prepare(
1875        &self,
1876        ctx: &mut impl ExecuteContext<'_>,
1877        #[cfg(feature = "trace_task_details")] span: &Span,
1878        task_id: TaskId,
1879        result: Result<RawVc, TurboTasksExecutionError>,
1880        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1881        stateful: bool,
1882        has_invalidator: bool,
1883    ) -> Option<TaskExecutionCompletePrepareResult> {
1884        let mut task = ctx.task(task_id, TaskDataCategory::All);
1885        let Some(in_progress) = get_mut!(task, InProgress) else {
1886            panic!("Task execution completed, but task is not in progress: {task:#?}");
1887        };
1888        if matches!(in_progress, InProgressState::Canceled) {
1889            return Some(TaskExecutionCompletePrepareResult {
1890                new_children: Default::default(),
1891                removed_data: Default::default(),
1892                is_now_immutable: false,
1893                #[cfg(feature = "verify_determinism")]
1894                no_output_set: false,
1895                new_output: None,
1896                output_dependent_tasks: Default::default(),
1897            });
1898        }
1899        let &mut InProgressState::InProgress(box InProgressStateInner {
1900            stale,
1901            ref mut new_children,
1902            session_dependent,
1903            ..
1904        }) = in_progress
1905        else {
1906            panic!("Task execution completed, but task is not in progress: {task:#?}");
1907        };
1908
1909        // If the task is stale, reschedule it
1910        #[cfg(not(feature = "no_fast_stale"))]
1911        if stale {
1912            let Some(InProgressState::InProgress(box InProgressStateInner {
1913                done_event,
1914                mut new_children,
1915                ..
1916            })) = remove!(task, InProgress)
1917            else {
1918                unreachable!();
1919            };
1920            task.add_new(CachedDataItem::InProgress {
1921                value: InProgressState::Scheduled {
1922                    done_event,
1923                    reason: TaskExecutionReason::Stale,
1924                },
1925            });
1926            // Remove old children from new_children to leave only the children that had their
1927            // active count increased
1928            for task in iter_many!(task, Child { task } => task) {
1929                new_children.remove(&task);
1930            }
1931            drop(task);
1932
1933            // We need to undo the active count increase for the children since we throw away the
1934            // new_children list now.
1935            AggregationUpdateQueue::run(
1936                AggregationUpdateJob::DecreaseActiveCounts {
1937                    task_ids: new_children.into_iter().collect(),
1938                },
1939                ctx,
1940            );
1941            return None;
1942        }
1943
1944        // take the children from the task to process them
1945        let mut new_children = take(new_children);
1946
1947        // handle stateful
1948        if stateful {
1949            let _ = task.add(CachedDataItem::Stateful { value: () });
1950        }
1951
1952        // handle has_invalidator
1953        if has_invalidator {
1954            let _ = task.add(CachedDataItem::HasInvalidator { value: () });
1955        }
1956
1957        // handle cell counters: update max index and remove cells that are no longer used
1958        let old_counters: FxHashMap<_, _> =
1959            get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1960        let mut counters_to_remove = old_counters.clone();
1961
1962        task.extend(
1963            CachedDataItemType::CellTypeMaxIndex,
1964            cell_counters.iter().filter_map(|(&cell_type, &max_index)| {
1965                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1966                    if old_max_index != max_index {
1967                        Some(CachedDataItem::CellTypeMaxIndex {
1968                            cell_type,
1969                            value: max_index,
1970                        })
1971                    } else {
1972                        None
1973                    }
1974                } else {
1975                    Some(CachedDataItem::CellTypeMaxIndex {
1976                        cell_type,
1977                        value: max_index,
1978                    })
1979                }
1980            }),
1981        );
1982        for (cell_type, _) in counters_to_remove {
1983            task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1984        }
1985
1986        let mut queue = AggregationUpdateQueue::new();
1987
1988        let mut removed_data = Vec::new();
1989        let mut old_edges = Vec::new();
1990
1991        let has_children = !new_children.is_empty();
1992        let is_immutable = task.is_immutable();
1993        let task_dependencies_for_immutable =
1994            // Task was previously marked as immutable
1995            if !is_immutable
1996            // Task is not session dependent (session dependent tasks can change between sessions)
1997            && !session_dependent
1998            // Task has no invalidator
1999            && !task.has_key(&CachedDataItemKey::HasInvalidator {})
2000            // Task has no dependencies on collectibles
2001            && count!(task, CollectiblesDependency) == 0
2002        {
2003            Some(
2004                // Collect all dependencies on tasks to check if all dependencies are immutable
2005                iter_many!(task, OutputDependency { target } => target)
2006                    .chain(iter_many!(task, CellDependency { target } => target.task))
2007                    .collect::<FxHashSet<_>>(),
2008            )
2009        } else {
2010            None
2011        };
2012
2013        if has_children {
2014            // Prepare all new children
2015            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2016
2017            // Filter actual new children
2018            old_edges.extend(
2019                iter_many!(task, Child { task } => task)
2020                    .filter(|task| !new_children.remove(task))
2021                    .map(OutdatedEdge::Child),
2022            );
2023        } else {
2024            old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
2025        }
2026
2027        // Remove no longer existing cells and
2028        // find all outdated data items (removed cells, outdated edges)
2029        // Note: For persistent tasks we only want to call extract_if when there are actual cells to
2030        // remove to avoid tracking that as modification.
2031        if task_id.is_transient() || iter_many!(task, CellData { cell }
2032            if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
2033        ).count() > 0 {
2034            removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
2035                matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
2036                            .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
2037            }));
2038        }
2039        if task_id.is_transient() || iter_many!(task, TransientCellData { cell }
2040            if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
2041        ).count() > 0 {
2042            removed_data.extend(task.extract_if(CachedDataItemType::TransientCellData, |key, _| {
2043                matches!(key, CachedDataItemKey::TransientCellData { cell } if cell_counters
2044                            .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
2045            }));
2046        }
2047
2048        old_edges.extend(
2049            task.iter(CachedDataItemType::OutdatedCollectible)
2050                .filter_map(|(key, value)| match (key, value) {
2051                    (
2052                        CachedDataItemKey::OutdatedCollectible { collectible },
2053                        CachedDataItemValueRef::OutdatedCollectible { value },
2054                    ) => Some(OutdatedEdge::Collectible(collectible, *value)),
2055                    _ => None,
2056                }),
2057        );
2058
2059        if self.should_track_dependencies() {
2060            old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
2061            old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
2062            old_edges.extend(
2063                iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
2064                    |(cell, task)| {
2065                        if cell_counters
2066                            .get(&cell.type_id)
2067                            .is_none_or(|start_index| cell.index >= *start_index)
2068                            && let Some(old_counter) = old_counters.get(&cell.type_id)
2069                            && cell.index < *old_counter
2070                        {
2071                            return Some(OutdatedEdge::RemovedCellDependent {
2072                                task_id: task,
2073                                #[cfg(feature = "trace_task_dirty")]
2074                                value_type_id: cell.type_id,
2075                            });
2076                        }
2077                        None
2078                    },
2079                ),
2080            );
2081        }
2082
2083        // Check if output need to be updated
2084        let current_output = get!(task, Output);
2085        #[cfg(feature = "verify_determinism")]
2086        let no_output_set = current_output.is_none();
2087        let new_output = match result {
2088            Ok(RawVc::TaskOutput(output_task_id)) => {
2089                if let Some(OutputValue::Output(current_task_id)) = current_output
2090                    && *current_task_id == output_task_id
2091                {
2092                    None
2093                } else {
2094                    Some(OutputValue::Output(output_task_id))
2095                }
2096            }
2097            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2098                if let Some(OutputValue::Cell(CellRef {
2099                    task: current_task_id,
2100                    cell: current_cell,
2101                })) = current_output
2102                    && *current_task_id == output_task_id
2103                    && *current_cell == cell
2104                {
2105                    None
2106                } else {
2107                    Some(OutputValue::Cell(CellRef {
2108                        task: output_task_id,
2109                        cell,
2110                    }))
2111                }
2112            }
2113            Ok(RawVc::LocalOutput(..)) => {
2114                panic!("Non-local tasks must not return a local Vc");
2115            }
2116            Err(err) => {
2117                if let Some(OutputValue::Error(old_error)) = current_output
2118                    && old_error == &err
2119                {
2120                    None
2121                } else {
2122                    Some(OutputValue::Error(err))
2123                }
2124            }
2125        };
2126        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2127        // When output has changed, grab the dependent tasks
2128        if new_output.is_some() && ctx.should_track_dependencies() {
2129            output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
2130        }
2131
2132        drop(task);
2133
2134        // Check if the task can be marked as immutable
2135        let mut is_now_immutable = false;
2136        if let Some(dependencies) = task_dependencies_for_immutable
2137            && dependencies
2138                .iter()
2139                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).is_immutable())
2140        {
2141            is_now_immutable = true;
2142        }
2143        #[cfg(feature = "trace_task_details")]
2144        span.record("immutable", is_immutable || is_now_immutable);
2145
2146        if !queue.is_empty() || !old_edges.is_empty() {
2147            #[cfg(feature = "trace_task_completion")]
2148            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2149            // Remove outdated edges first, before removing in_progress+dirty flag.
2150            // We need to make sure all outdated edges are removed before the task can potentially
2151            // be scheduled and executed again
2152            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2153        }
2154
2155        Some(TaskExecutionCompletePrepareResult {
2156            new_children,
2157            removed_data,
2158            is_now_immutable,
2159            #[cfg(feature = "verify_determinism")]
2160            no_output_set,
2161            new_output,
2162            output_dependent_tasks,
2163        })
2164    }
2165
2166    fn task_execution_completed_invalidate_output_dependent(
2167        &self,
2168        ctx: &mut impl ExecuteContext<'_>,
2169        task_id: TaskId,
2170        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2171    ) {
2172        debug_assert!(!output_dependent_tasks.is_empty());
2173
2174        let mut queue = AggregationUpdateQueue::new();
2175        for dependent_task_id in output_dependent_tasks {
2176            #[cfg(feature = "trace_task_output_dependencies")]
2177            let span = tracing::trace_span!(
2178                "invalidate output dependency",
2179                task = %task_id,
2180                dependent_task = %dependent_task_id,
2181                result = tracing::field::Empty,
2182            )
2183            .entered();
2184            if ctx.is_once_task(dependent_task_id) {
2185                // once tasks are never invalidated
2186                #[cfg(feature = "trace_task_output_dependencies")]
2187                span.record("result", "once task");
2188                continue;
2189            }
2190            let mut make_stale = true;
2191            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2192            if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
2193                #[cfg(feature = "trace_task_output_dependencies")]
2194                span.record("result", "outdated dependency");
2195                // output dependency is outdated, so it hasn't read the output yet
2196                // and doesn't need to be invalidated
2197                // But importantly we still need to make the task dirty as it should no longer
2198                // be considered as "recomputation".
2199                make_stale = false;
2200            } else if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
2201                // output dependency has been removed, so the task doesn't depend on the
2202                // output anymore and doesn't need to be invalidated
2203                #[cfg(feature = "trace_task_output_dependencies")]
2204                span.record("result", "no backward dependency");
2205                continue;
2206            }
2207            make_task_dirty_internal(
2208                dependent,
2209                dependent_task_id,
2210                make_stale,
2211                #[cfg(feature = "trace_task_dirty")]
2212                TaskDirtyCause::OutputChange { task_id },
2213                &mut queue,
2214                ctx,
2215            );
2216            #[cfg(feature = "trace_task_output_dependencies")]
2217            span.record("result", "marked dirty");
2218        }
2219
2220        queue.execute(ctx);
2221    }
2222
2223    fn task_execution_completed_unfinished_children_dirty(
2224        &self,
2225        ctx: &mut impl ExecuteContext<'_>,
2226        new_children: &FxHashSet<TaskId>,
2227    ) {
2228        debug_assert!(!new_children.is_empty());
2229
2230        let mut queue = AggregationUpdateQueue::new();
2231        for &child_id in new_children {
2232            let child_task = ctx.task(child_id, TaskDataCategory::Meta);
2233            if !child_task.has_key(&CachedDataItemKey::Output {}) {
2234                make_task_dirty_internal(
2235                    child_task,
2236                    child_id,
2237                    false,
2238                    #[cfg(feature = "trace_task_dirty")]
2239                    TaskDirtyCause::InitialDirty,
2240                    &mut queue,
2241                    ctx,
2242                );
2243            }
2244        }
2245
2246        queue.execute(ctx);
2247    }
2248
2249    fn task_execution_completed_connect(
2250        &self,
2251        ctx: &mut impl ExecuteContext<'_>,
2252        task_id: TaskId,
2253        new_children: FxHashSet<TaskId>,
2254    ) -> bool {
2255        debug_assert!(!new_children.is_empty());
2256
2257        let mut task = ctx.task(task_id, TaskDataCategory::All);
2258        let Some(in_progress) = get!(task, InProgress) else {
2259            panic!("Task execution completed, but task is not in progress: {task:#?}");
2260        };
2261        if matches!(in_progress, InProgressState::Canceled) {
2262            // Task was canceled in the meantime, so we don't connect the children
2263            return false;
2264        }
2265        let InProgressState::InProgress(box InProgressStateInner {
2266            #[cfg(not(feature = "no_fast_stale"))]
2267            stale,
2268            ..
2269        }) = in_progress
2270        else {
2271            panic!("Task execution completed, but task is not in progress: {task:#?}");
2272        };
2273
2274        // If the task is stale, reschedule it
2275        #[cfg(not(feature = "no_fast_stale"))]
2276        if *stale {
2277            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2278                remove!(task, InProgress)
2279            else {
2280                unreachable!();
2281            };
2282            task.add_new(CachedDataItem::InProgress {
2283                value: InProgressState::Scheduled {
2284                    done_event,
2285                    reason: TaskExecutionReason::Stale,
2286                },
2287            });
2288            drop(task);
2289
2290            // All `new_children` are currently hold active with an active count and we need to undo
2291            // that. (We already filtered out the old children from that list)
2292            AggregationUpdateQueue::run(
2293                AggregationUpdateJob::DecreaseActiveCounts {
2294                    task_ids: new_children.into_iter().collect(),
2295                },
2296                ctx,
2297            );
2298            return true;
2299        }
2300
2301        let has_active_count = ctx.should_track_activeness()
2302            && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
2303        connect_children(
2304            ctx,
2305            task_id,
2306            task,
2307            new_children,
2308            has_active_count,
2309            ctx.should_track_activeness(),
2310        );
2311
2312        false
2313    }
2314
2315    fn task_execution_completed_finish(
2316        &self,
2317        ctx: &mut impl ExecuteContext<'_>,
2318        task_id: TaskId,
2319        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2320        new_output: Option<OutputValue>,
2321        removed_data: &mut Vec<CachedDataItem>,
2322        is_now_immutable: bool,
2323    ) -> bool {
2324        let mut task = ctx.task(task_id, TaskDataCategory::All);
2325        let Some(in_progress) = remove!(task, InProgress) else {
2326            panic!("Task execution completed, but task is not in progress: {task:#?}");
2327        };
2328        if matches!(in_progress, InProgressState::Canceled) {
2329            // Task was canceled in the meantime, so we don't finish it
2330            return false;
2331        }
2332        let InProgressState::InProgress(box InProgressStateInner {
2333            done_event,
2334            once_task: _,
2335            stale,
2336            session_dependent,
2337            marked_as_completed: _,
2338            new_children,
2339        }) = in_progress
2340        else {
2341            panic!("Task execution completed, but task is not in progress: {task:#?}");
2342        };
2343        debug_assert!(new_children.is_empty());
2344
2345        // If the task is stale, reschedule it
2346        if stale {
2347            task.add_new(CachedDataItem::InProgress {
2348                value: InProgressState::Scheduled {
2349                    done_event,
2350                    reason: TaskExecutionReason::Stale,
2351                },
2352            });
2353            return true;
2354        }
2355
2356        // Set the output if it has changed
2357        let mut old_content = None;
2358        if let Some(value) = new_output {
2359            old_content = task.insert(CachedDataItem::Output { value });
2360        }
2361
2362        // If the task is not stateful and has no mutable children, it does not have a way to be
2363        // invalidated and we can mark it as immutable.
2364        if is_now_immutable {
2365            let _ = task.add(CachedDataItem::Immutable { value: () });
2366        }
2367
2368        // Notify in progress cells
2369        removed_data.extend(task.extract_if(
2370            CachedDataItemType::InProgressCell,
2371            |key, value| match (key, value) {
2372                (
2373                    CachedDataItemKey::InProgressCell { .. },
2374                    CachedDataItemValueRef::InProgressCell { value },
2375                ) => {
2376                    value.event.notify(usize::MAX);
2377                    true
2378                }
2379                _ => false,
2380            },
2381        ));
2382
2383        // Grab the old dirty state
2384        let old_dirtyness = get!(task, Dirty).cloned();
2385        let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2386            None => (false, false),
2387            Some(Dirtyness::Dirty) => (true, false),
2388            Some(Dirtyness::SessionDependent) => {
2389                let clean_in_current_session = get!(task, CurrentSessionClean).is_some();
2390                (true, clean_in_current_session)
2391            }
2392        };
2393
2394        // Compute the new dirty state
2395        let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2396            (Some(Dirtyness::SessionDependent), true, true)
2397        } else {
2398            (None, false, false)
2399        };
2400
2401        // Update the dirty state
2402        if old_dirtyness != new_dirtyness {
2403            if let Some(value) = new_dirtyness {
2404                task.insert(CachedDataItem::Dirty { value });
2405            } else if old_dirtyness.is_some() {
2406                task.remove(&CachedDataItemKey::Dirty {});
2407            }
2408        }
2409        if old_current_session_self_clean != new_current_session_self_clean {
2410            if new_current_session_self_clean {
2411                task.insert(CachedDataItem::CurrentSessionClean { value: () });
2412            } else if old_current_session_self_clean {
2413                task.remove(&CachedDataItemKey::CurrentSessionClean {});
2414            }
2415        }
2416
2417        // Propagate dirtyness changes
2418        let data_update = if old_self_dirty != new_self_dirty
2419            || old_current_session_self_clean != new_current_session_self_clean
2420        {
2421            let dirty_container_count = get!(task, AggregatedDirtyContainerCount)
2422                .cloned()
2423                .unwrap_or_default();
2424            let current_session_clean_container_count =
2425                get!(task, AggregatedCurrentSessionCleanContainerCount)
2426                    .copied()
2427                    .unwrap_or_default();
2428            let result = ComputeDirtyAndCleanUpdate {
2429                old_dirty_container_count: dirty_container_count,
2430                new_dirty_container_count: dirty_container_count,
2431                old_current_session_clean_container_count: current_session_clean_container_count,
2432                new_current_session_clean_container_count: current_session_clean_container_count,
2433                old_self_dirty,
2434                new_self_dirty,
2435                old_current_session_self_clean,
2436                new_current_session_self_clean,
2437            }
2438            .compute();
2439            if result.dirty_count_update - result.current_session_clean_update < 0 {
2440                // The task is clean now
2441                if let Some(activeness_state) = get_mut!(task, Activeness) {
2442                    activeness_state.all_clean_event.notify(usize::MAX);
2443                    activeness_state.unset_active_until_clean();
2444                    if activeness_state.is_empty() {
2445                        task.remove(&CachedDataItemKey::Activeness {});
2446                    }
2447                }
2448            }
2449            result
2450                .aggregated_update(task_id)
2451                .and_then(|aggregated_update| {
2452                    AggregationUpdateJob::data_update(&mut task, aggregated_update)
2453                })
2454        } else {
2455            None
2456        };
2457
2458        #[cfg(feature = "verify_determinism")]
2459        let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2460        #[cfg(not(feature = "verify_determinism"))]
2461        let reschedule = false;
2462        if reschedule {
2463            task.add_new(CachedDataItem::InProgress {
2464                value: InProgressState::Scheduled {
2465                    done_event,
2466                    reason: TaskExecutionReason::Stale,
2467                },
2468            });
2469            drop(task);
2470        } else {
2471            drop(task);
2472
2473            // Notify dependent tasks that are waiting for this task to finish
2474            done_event.notify(usize::MAX);
2475        }
2476
2477        drop(old_content);
2478
2479        if let Some(data_update) = data_update {
2480            AggregationUpdateQueue::run(data_update, ctx);
2481        }
2482
2483        reschedule
2484    }
2485
2486    fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
2487        let mut task = ctx.task(task_id, TaskDataCategory::All);
2488        task.shrink_to_fit(CachedDataItemType::CellData);
2489        task.shrink_to_fit(CachedDataItemType::TransientCellData);
2490        task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
2491        task.shrink_to_fit(CachedDataItemType::CellDependency);
2492        task.shrink_to_fit(CachedDataItemType::OutputDependency);
2493        task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
2494        drop(task);
2495    }
2496
2497    fn run_backend_job<'a>(
2498        self: &'a Arc<Self>,
2499        job: TurboTasksBackendJob,
2500        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2501    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2502        Box::pin(async move {
2503            match job {
2504                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2505                    debug_assert!(self.should_persist());
2506
2507                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2508                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2509                    let mut idle_start_listener = self.idle_start_event.listen();
2510                    let mut idle_end_listener = self.idle_end_event.listen();
2511                    let mut fresh_idle = true;
2512                    loop {
2513                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2514                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2515                        let idle_timeout = *IDLE_TIMEOUT;
2516                        let (time, mut reason) =
2517                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2518                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2519                            } else {
2520                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2521                            };
2522
2523                        let until = last_snapshot + time;
2524                        if until > Instant::now() {
2525                            let mut stop_listener = self.stopping_event.listen();
2526                            if self.stopping.load(Ordering::Acquire) {
2527                                return;
2528                            }
2529                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2530                                Instant::now() + idle_timeout
2531                            } else {
2532                                far_future()
2533                            };
2534                            loop {
2535                                tokio::select! {
2536                                    _ = &mut stop_listener => {
2537                                        return;
2538                                    },
2539                                    _ = &mut idle_start_listener => {
2540                                        fresh_idle = true;
2541                                        idle_time = Instant::now() + idle_timeout;
2542                                        idle_start_listener = self.idle_start_event.listen()
2543                                    },
2544                                    _ = &mut idle_end_listener => {
2545                                        idle_time = until + idle_timeout;
2546                                        idle_end_listener = self.idle_end_event.listen()
2547                                    },
2548                                    _ = tokio::time::sleep_until(until) => {
2549                                        break;
2550                                    },
2551                                    _ = tokio::time::sleep_until(idle_time) => {
2552                                        if turbo_tasks.is_idle() {
2553                                            reason = "idle timeout";
2554                                            break;
2555                                        }
2556                                    },
2557                                }
2558                            }
2559                        }
2560
2561                        let this = self.clone();
2562                        let snapshot = this.snapshot_and_persist(None, reason);
2563                        if let Some((snapshot_start, new_data)) = snapshot {
2564                            last_snapshot = snapshot_start;
2565                            if !new_data {
2566                                fresh_idle = false;
2567                                continue;
2568                            }
2569                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2570                            self.last_snapshot.store(
2571                                last_snapshot.as_millis().try_into().unwrap(),
2572                                Ordering::Relaxed,
2573                            );
2574
2575                            turbo_tasks.schedule_backend_background_job(
2576                                TurboTasksBackendJob::FollowUpSnapshot,
2577                            );
2578                            return;
2579                        }
2580                    }
2581                }
2582                TurboTasksBackendJob::Prefetch { data, range } => {
2583                    let range: Range<usize> = if let Some(range) = range {
2584                        range
2585                    } else {
2586                        if data.len() > 128 {
2587                            let chunk_size = good_chunk_size(data.len());
2588                            let chunks = data.len().div_ceil(chunk_size);
2589                            for i in 0..chunks {
2590                                turbo_tasks.schedule_backend_background_job(
2591                                    TurboTasksBackendJob::Prefetch {
2592                                        data: data.clone(),
2593                                        range: Some(
2594                                            (i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2595                                        ),
2596                                    },
2597                                );
2598                            }
2599                            return;
2600                        }
2601                        0..data.len()
2602                    };
2603
2604                    let _span = trace_span!("prefetching").entered();
2605                    let mut ctx = self.execute_context(turbo_tasks);
2606                    for i in range {
2607                        let (&task, &with_data) = data.get_index(i).unwrap();
2608                        let category = if with_data {
2609                            TaskDataCategory::All
2610                        } else {
2611                            TaskDataCategory::Meta
2612                        };
2613                        // Prefetch the task
2614                        drop(ctx.task(task, category));
2615                    }
2616                }
2617            }
2618        })
2619    }
2620
2621    fn try_read_own_task_cell(
2622        &self,
2623        task_id: TaskId,
2624        cell: CellId,
2625        options: ReadCellOptions,
2626        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2627    ) -> Result<TypedCellContent> {
2628        let mut ctx = self.execute_context(turbo_tasks);
2629        let task = ctx.task(task_id, TaskDataCategory::Data);
2630        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2631            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2632            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2633        } else {
2634            Ok(CellContent(None).into_typed(cell.type_id))
2635        }
2636    }
2637
2638    fn read_task_collectibles(
2639        &self,
2640        task_id: TaskId,
2641        collectible_type: TraitTypeId,
2642        reader_id: Option<TaskId>,
2643        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2644    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2645        let mut ctx = self.execute_context(turbo_tasks);
2646        let mut collectibles = AutoMap::default();
2647        {
2648            let mut task = ctx.task(task_id, TaskDataCategory::All);
2649            // Ensure it's an root node
2650            loop {
2651                let aggregation_number = get_aggregation_number(&task);
2652                if is_root_node(aggregation_number) {
2653                    break;
2654                }
2655                drop(task);
2656                AggregationUpdateQueue::run(
2657                    AggregationUpdateJob::UpdateAggregationNumber {
2658                        task_id,
2659                        base_aggregation_number: u32::MAX,
2660                        distance: None,
2661                    },
2662                    &mut ctx,
2663                );
2664                task = ctx.task(task_id, TaskDataCategory::All);
2665            }
2666            for collectible in iter_many!(
2667                task,
2668                AggregatedCollectible {
2669                    collectible
2670                } count if collectible.collectible_type == collectible_type && *count > 0 => {
2671                    collectible.cell
2672                }
2673            ) {
2674                *collectibles
2675                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2676                    .or_insert(0) += 1;
2677            }
2678            for (collectible, count) in iter_many!(
2679                task,
2680                Collectible {
2681                    collectible
2682                } count if collectible.collectible_type == collectible_type => {
2683                    (collectible.cell, *count)
2684                }
2685            ) {
2686                *collectibles
2687                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2688                    .or_insert(0) += count;
2689            }
2690            if let Some(reader_id) = reader_id {
2691                let _ = task.add(CachedDataItem::CollectiblesDependent {
2692                    collectible_type,
2693                    task: reader_id,
2694                    value: (),
2695                });
2696            }
2697        }
2698        if let Some(reader_id) = reader_id {
2699            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2700            let target = CollectiblesRef {
2701                task: task_id,
2702                collectible_type,
2703            };
2704            if reader
2705                .remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target })
2706                .is_none()
2707            {
2708                let _ = reader.add(CachedDataItem::CollectiblesDependency { target, value: () });
2709            }
2710        }
2711        collectibles
2712    }
2713
2714    fn emit_collectible(
2715        &self,
2716        collectible_type: TraitTypeId,
2717        collectible: RawVc,
2718        task_id: TaskId,
2719        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2720    ) {
2721        self.assert_valid_collectible(task_id, collectible);
2722
2723        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2724            panic!("Collectibles need to be resolved");
2725        };
2726        let cell = CellRef {
2727            task: collectible_task,
2728            cell,
2729        };
2730        operation::UpdateCollectibleOperation::run(
2731            task_id,
2732            CollectibleRef {
2733                collectible_type,
2734                cell,
2735            },
2736            1,
2737            self.execute_context(turbo_tasks),
2738        );
2739    }
2740
2741    fn unemit_collectible(
2742        &self,
2743        collectible_type: TraitTypeId,
2744        collectible: RawVc,
2745        count: u32,
2746        task_id: TaskId,
2747        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2748    ) {
2749        self.assert_valid_collectible(task_id, collectible);
2750
2751        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2752            panic!("Collectibles need to be resolved");
2753        };
2754        let cell = CellRef {
2755            task: collectible_task,
2756            cell,
2757        };
2758        operation::UpdateCollectibleOperation::run(
2759            task_id,
2760            CollectibleRef {
2761                collectible_type,
2762                cell,
2763            },
2764            -(i32::try_from(count).unwrap()),
2765            self.execute_context(turbo_tasks),
2766        );
2767    }
2768
2769    fn update_task_cell(
2770        &self,
2771        task_id: TaskId,
2772        cell: CellId,
2773        is_serializable_cell_content: bool,
2774        content: CellContent,
2775        verification_mode: VerificationMode,
2776        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2777    ) {
2778        operation::UpdateCellOperation::run(
2779            task_id,
2780            cell,
2781            content,
2782            is_serializable_cell_content,
2783            verification_mode,
2784            self.execute_context(turbo_tasks),
2785        );
2786    }
2787
2788    fn mark_own_task_as_session_dependent(
2789        &self,
2790        task_id: TaskId,
2791        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2792    ) {
2793        if !self.should_track_dependencies() {
2794            // Without dependency tracking we don't need session dependent tasks
2795            return;
2796        }
2797        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2798        let mut ctx = self.execute_context(turbo_tasks);
2799        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2800        let aggregation_number = get_aggregation_number(&task);
2801        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2802            drop(task);
2803            // We want to use a high aggregation number to avoid large aggregation chains for
2804            // session dependent tasks (which change on every run)
2805            AggregationUpdateQueue::run(
2806                AggregationUpdateJob::UpdateAggregationNumber {
2807                    task_id,
2808                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2809                    distance: None,
2810                },
2811                &mut ctx,
2812            );
2813            task = ctx.task(task_id, TaskDataCategory::Meta);
2814        }
2815        if let Some(InProgressState::InProgress(box InProgressStateInner {
2816            session_dependent,
2817            ..
2818        })) = get_mut!(task, InProgress)
2819        {
2820            *session_dependent = true;
2821        }
2822    }
2823
2824    fn mark_own_task_as_finished(
2825        &self,
2826        task: TaskId,
2827        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2828    ) {
2829        let mut ctx = self.execute_context(turbo_tasks);
2830        let mut task = ctx.task(task, TaskDataCategory::Data);
2831        if let Some(InProgressState::InProgress(box InProgressStateInner {
2832            marked_as_completed,
2833            ..
2834        })) = get_mut!(task, InProgress)
2835        {
2836            *marked_as_completed = true;
2837            // TODO this should remove the dirty state (also check session_dependent)
2838            // but this would break some assumptions for strongly consistent reads.
2839            // Client tasks are not connected yet, so we wouldn't wait for them.
2840            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
2841        }
2842    }
2843
2844    fn set_own_task_aggregation_number(
2845        &self,
2846        task: TaskId,
2847        aggregation_number: u32,
2848        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2849    ) {
2850        let mut ctx = self.execute_context(turbo_tasks);
2851        AggregationUpdateQueue::run(
2852            AggregationUpdateJob::UpdateAggregationNumber {
2853                task_id: task,
2854                base_aggregation_number: aggregation_number,
2855                distance: None,
2856            },
2857            &mut ctx,
2858        );
2859    }
2860
2861    fn connect_task(
2862        &self,
2863        task: TaskId,
2864        parent_task: Option<TaskId>,
2865        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2866    ) {
2867        self.assert_not_persistent_calling_transient(parent_task, task, None);
2868        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2869    }
2870
2871    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2872        let task_id = self.transient_task_id_factory.get();
2873        let root_type = match task_type {
2874            TransientTaskType::Root(_) => RootType::RootTask,
2875            TransientTaskType::Once(_) => RootType::OnceTask,
2876        };
2877        self.transient_tasks.insert(
2878            task_id,
2879            Arc::new(match task_type {
2880                TransientTaskType::Root(f) => TransientTask::Root(f),
2881                TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2882            }),
2883        );
2884        {
2885            let mut task = self.storage.access_mut(task_id);
2886            task.add(CachedDataItem::AggregationNumber {
2887                value: AggregationNumber {
2888                    base: u32::MAX,
2889                    distance: 0,
2890                    effective: u32::MAX,
2891                },
2892            });
2893            if self.should_track_activeness() {
2894                task.add(CachedDataItem::Activeness {
2895                    value: ActivenessState::new_root(root_type, task_id),
2896                });
2897            }
2898            task.add(CachedDataItem::new_scheduled(
2899                TaskExecutionReason::Initial,
2900                move || {
2901                    move || match root_type {
2902                        RootType::RootTask => "Root Task".to_string(),
2903                        RootType::OnceTask => "Once Task".to_string(),
2904                    }
2905                },
2906            ));
2907        }
2908        #[cfg(feature = "verify_aggregation_graph")]
2909        self.root_tasks.lock().insert(task_id);
2910        task_id
2911    }
2912
2913    fn dispose_root_task(
2914        &self,
2915        task_id: TaskId,
2916        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2917    ) {
2918        #[cfg(feature = "verify_aggregation_graph")]
2919        self.root_tasks.lock().remove(&task_id);
2920
2921        let mut ctx = self.execute_context(turbo_tasks);
2922        let mut task = ctx.task(task_id, TaskDataCategory::All);
2923        let is_dirty = task.is_dirty();
2924        let has_dirty_containers = task.has_dirty_containers();
2925        if is_dirty || has_dirty_containers {
2926            if let Some(activeness_state) = get_mut!(task, Activeness) {
2927                // We will finish the task, but it would be removed after the task is done
2928                activeness_state.unset_root_type();
2929                activeness_state.set_active_until_clean();
2930            };
2931        } else if let Some(activeness_state) = remove!(task, Activeness) {
2932            // Technically nobody should be listening to this event, but just in case
2933            // we notify it anyway
2934            activeness_state.all_clean_event.notify(usize::MAX);
2935        }
2936    }
2937
2938    #[cfg(feature = "verify_aggregation_graph")]
2939    fn verify_aggregation_graph(
2940        &self,
2941        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2942        idle: bool,
2943    ) {
2944        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2945            return;
2946        }
2947        use std::{collections::VecDeque, env, io::stdout};
2948
2949        use crate::backend::operation::{get_uppers, is_aggregating_node};
2950
2951        let mut ctx = self.execute_context(turbo_tasks);
2952        let root_tasks = self.root_tasks.lock().clone();
2953
2954        for task_id in root_tasks.into_iter() {
2955            let mut queue = VecDeque::new();
2956            let mut visited = FxHashSet::default();
2957            let mut aggregated_nodes = FxHashSet::default();
2958            let mut collectibles = FxHashMap::default();
2959            let root_task_id = task_id;
2960            visited.insert(task_id);
2961            aggregated_nodes.insert(task_id);
2962            queue.push_back(task_id);
2963            let mut counter = 0;
2964            while let Some(task_id) = queue.pop_front() {
2965                counter += 1;
2966                if counter % 100000 == 0 {
2967                    println!(
2968                        "queue={}, visited={}, aggregated_nodes={}",
2969                        queue.len(),
2970                        visited.len(),
2971                        aggregated_nodes.len()
2972                    );
2973                }
2974                let task = ctx.task(task_id, TaskDataCategory::All);
2975                if idle && !self.is_idle.load(Ordering::Relaxed) {
2976                    return;
2977                }
2978
2979                let uppers = get_uppers(&task);
2980                if task_id != root_task_id
2981                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2982                {
2983                    panic!(
2984                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2985                         {:?})",
2986                        task_id,
2987                        ctx.get_task_description(task_id),
2988                        uppers
2989                    );
2990                }
2991
2992                let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2993                for collectible in aggregated_collectibles {
2994                    collectibles
2995                        .entry(collectible)
2996                        .or_insert_with(|| (false, Vec::new()))
2997                        .1
2998                        .push(task_id);
2999                }
3000
3001                let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
3002                for collectible in own_collectibles {
3003                    if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3004                        *flag = true
3005                    } else {
3006                        panic!(
3007                            "Task {} has a collectible {:?} that is not in any upper task",
3008                            task_id, collectible
3009                        );
3010                    }
3011                }
3012
3013                let is_dirty = get!(task, Dirty).is_some();
3014                let has_dirty_container = task.has_dirty_containers();
3015                let should_be_in_upper = is_dirty || has_dirty_container;
3016
3017                let aggregation_number = get_aggregation_number(&task);
3018                if is_aggregating_node(aggregation_number) {
3019                    aggregated_nodes.insert(task_id);
3020                }
3021                // println!(
3022                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3023                //     ctx.get_task_description(task_id),
3024                //     uppers
3025                // );
3026
3027                for child_id in iter_many!(task, Child { task } => task) {
3028                    // println!("{task_id}: child -> {child_id}");
3029                    if visited.insert(child_id) {
3030                        queue.push_back(child_id);
3031                    }
3032                }
3033                drop(task);
3034
3035                if should_be_in_upper {
3036                    for upper_id in uppers {
3037                        let task = ctx.task(upper_id, TaskDataCategory::All);
3038                        let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
3039                            .is_some_and(|&dirty| dirty > 0);
3040                        if !in_upper {
3041                            let containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task: task_id } value => (task_id, *value));
3042                            panic!(
3043                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3044                                 ({})\nThese dirty containers are present:\n{:#?}",
3045                                task_id,
3046                                ctx.get_task_description(task_id),
3047                                upper_id,
3048                                ctx.get_task_description(upper_id),
3049                                containers,
3050                            );
3051                        }
3052                    }
3053                }
3054            }
3055
3056            for (collectible, (flag, task_ids)) in collectibles {
3057                if !flag {
3058                    use std::io::Write;
3059                    let mut stdout = stdout().lock();
3060                    writeln!(
3061                        stdout,
3062                        "{:?} that is not emitted in any child task but in these aggregated \
3063                         tasks: {:#?}",
3064                        collectible,
3065                        task_ids
3066                            .iter()
3067                            .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
3068                            .collect::<Vec<_>>()
3069                    )
3070                    .unwrap();
3071
3072                    let task_id = collectible.cell.task;
3073                    let mut queue = {
3074                        let task = ctx.task(task_id, TaskDataCategory::All);
3075                        get_uppers(&task)
3076                    };
3077                    let mut visited = FxHashSet::default();
3078                    for &upper_id in queue.iter() {
3079                        visited.insert(upper_id);
3080                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3081                    }
3082                    while let Some(task_id) = queue.pop() {
3083                        let desc = ctx.get_task_description(task_id);
3084                        let task = ctx.task(task_id, TaskDataCategory::All);
3085                        let aggregated_collectible =
3086                            get!(task, AggregatedCollectible { collectible })
3087                                .copied()
3088                                .unwrap_or_default();
3089                        let uppers = get_uppers(&task);
3090                        drop(task);
3091                        writeln!(
3092                            stdout,
3093                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3094                        )
3095                        .unwrap();
3096                        if task_ids.contains(&task_id) {
3097                            writeln!(
3098                                stdout,
3099                                "Task has an upper connection to an aggregated task that doesn't \
3100                                 reference it. Upper connection is invalid!"
3101                            )
3102                            .unwrap();
3103                        }
3104                        for upper_id in uppers {
3105                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3106                            if !visited.contains(&upper_id) {
3107                                queue.push(upper_id);
3108                            }
3109                        }
3110                    }
3111                    panic!("See stdout for more details");
3112                }
3113            }
3114        }
3115    }
3116
3117    fn assert_not_persistent_calling_transient(
3118        &self,
3119        parent_id: Option<TaskId>,
3120        child_id: TaskId,
3121        cell_id: Option<CellId>,
3122    ) {
3123        if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
3124            self.panic_persistent_calling_transient(
3125                parent_id
3126                    .and_then(|id| self.lookup_task_type(id))
3127                    .as_deref(),
3128                self.lookup_task_type(child_id).as_deref(),
3129                cell_id,
3130            );
3131        }
3132    }
3133
3134    fn panic_persistent_calling_transient(
3135        &self,
3136        parent: Option<&CachedTaskType>,
3137        child: Option<&CachedTaskType>,
3138        cell_id: Option<CellId>,
3139    ) {
3140        let transient_reason = if let Some(child) = child {
3141            Cow::Owned(format!(
3142                " The callee is transient because it depends on:\n{}",
3143                self.debug_trace_transient_task(child, cell_id),
3144            ))
3145        } else {
3146            Cow::Borrowed("")
3147        };
3148        panic!(
3149            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3150            parent.map_or("unknown", |t| t.get_name()),
3151            child.map_or("unknown", |t| t.get_name()),
3152            transient_reason,
3153        );
3154    }
3155
3156    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3157        // these checks occur in a potentially hot codepath, but they're cheap
3158        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3159            // This should never happen: The collectible APIs use ResolvedVc
3160            let task_info = if let Some(col_task_ty) = collectible
3161                .try_get_task_id()
3162                .and_then(|t| self.lookup_task_type(t))
3163            {
3164                Cow::Owned(format!(" (return type of {col_task_ty})"))
3165            } else {
3166                Cow::Borrowed("")
3167            };
3168            panic!("Collectible{task_info} must be a ResolvedVc")
3169        };
3170        if col_task_id.is_transient() && !task_id.is_transient() {
3171            let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
3172                Cow::Owned(format!(
3173                    ". The collectible is transient because it depends on:\n{}",
3174                    self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3175                ))
3176            } else {
3177                Cow::Borrowed("")
3178            };
3179            // this should never happen: How would a persistent function get a transient Vc?
3180            panic!(
3181                "Collectible is transient, transient collectibles cannot be emitted from \
3182                 persistent tasks{transient_reason}",
3183            )
3184        }
3185    }
3186}
3187
3188impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3189    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3190        self.0.startup(turbo_tasks);
3191    }
3192
3193    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3194        self.0.stopping();
3195    }
3196
3197    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3198        self.0.stop(turbo_tasks);
3199    }
3200
3201    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3202        self.0.idle_start(turbo_tasks);
3203    }
3204
3205    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3206        self.0.idle_end();
3207    }
3208
3209    fn get_or_create_persistent_task(
3210        &self,
3211        task_type: CachedTaskType,
3212        parent_task: Option<TaskId>,
3213        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3214    ) -> TaskId {
3215        self.0
3216            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3217    }
3218
3219    fn get_or_create_transient_task(
3220        &self,
3221        task_type: CachedTaskType,
3222        parent_task: Option<TaskId>,
3223        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3224    ) -> TaskId {
3225        self.0
3226            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3227    }
3228
3229    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3230        self.0.invalidate_task(task_id, turbo_tasks);
3231    }
3232
3233    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3234        self.0.invalidate_tasks(tasks, turbo_tasks);
3235    }
3236
3237    fn invalidate_tasks_set(
3238        &self,
3239        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3240        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3241    ) {
3242        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3243    }
3244
3245    fn invalidate_serialization(
3246        &self,
3247        task_id: TaskId,
3248        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3249    ) {
3250        self.0.invalidate_serialization(task_id, turbo_tasks);
3251    }
3252
3253    fn get_task_description(&self, task: TaskId) -> String {
3254        self.0.get_task_description(task)
3255    }
3256
3257    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3258        self.0.task_execution_canceled(task, turbo_tasks)
3259    }
3260
3261    fn try_start_task_execution(
3262        &self,
3263        task_id: TaskId,
3264        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3265    ) -> Option<TaskExecutionSpec<'_>> {
3266        self.0.try_start_task_execution(task_id, turbo_tasks)
3267    }
3268
3269    fn task_execution_completed(
3270        &self,
3271        task_id: TaskId,
3272        result: Result<RawVc, TurboTasksExecutionError>,
3273        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3274        stateful: bool,
3275        has_invalidator: bool,
3276        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3277    ) -> bool {
3278        self.0.task_execution_completed(
3279            task_id,
3280            result,
3281            cell_counters,
3282            stateful,
3283            has_invalidator,
3284            turbo_tasks,
3285        )
3286    }
3287
3288    type BackendJob = TurboTasksBackendJob;
3289
3290    fn run_backend_job<'a>(
3291        &'a self,
3292        job: Self::BackendJob,
3293        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3294    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3295        self.0.run_backend_job(job, turbo_tasks)
3296    }
3297
3298    fn try_read_task_output(
3299        &self,
3300        task_id: TaskId,
3301        reader: Option<TaskId>,
3302        options: ReadOutputOptions,
3303        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3304    ) -> Result<Result<RawVc, EventListener>> {
3305        self.0
3306            .try_read_task_output(task_id, reader, options, turbo_tasks)
3307    }
3308
3309    fn try_read_task_cell(
3310        &self,
3311        task_id: TaskId,
3312        cell: CellId,
3313        reader: Option<TaskId>,
3314        options: ReadCellOptions,
3315        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3316    ) -> Result<Result<TypedCellContent, EventListener>> {
3317        self.0
3318            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3319    }
3320
3321    fn try_read_own_task_cell(
3322        &self,
3323        task_id: TaskId,
3324        cell: CellId,
3325        options: ReadCellOptions,
3326        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3327    ) -> Result<TypedCellContent> {
3328        self.0
3329            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3330    }
3331
3332    fn read_task_collectibles(
3333        &self,
3334        task_id: TaskId,
3335        collectible_type: TraitTypeId,
3336        reader: Option<TaskId>,
3337        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3338    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3339        self.0
3340            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3341    }
3342
3343    fn emit_collectible(
3344        &self,
3345        collectible_type: TraitTypeId,
3346        collectible: RawVc,
3347        task_id: TaskId,
3348        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3349    ) {
3350        self.0
3351            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3352    }
3353
3354    fn unemit_collectible(
3355        &self,
3356        collectible_type: TraitTypeId,
3357        collectible: RawVc,
3358        count: u32,
3359        task_id: TaskId,
3360        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3361    ) {
3362        self.0
3363            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3364    }
3365
3366    fn update_task_cell(
3367        &self,
3368        task_id: TaskId,
3369        cell: CellId,
3370        is_serializable_cell_content: bool,
3371        content: CellContent,
3372        verification_mode: VerificationMode,
3373        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3374    ) {
3375        self.0.update_task_cell(
3376            task_id,
3377            cell,
3378            is_serializable_cell_content,
3379            content,
3380            verification_mode,
3381            turbo_tasks,
3382        );
3383    }
3384
3385    fn mark_own_task_as_finished(
3386        &self,
3387        task_id: TaskId,
3388        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3389    ) {
3390        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3391    }
3392
3393    fn set_own_task_aggregation_number(
3394        &self,
3395        task: TaskId,
3396        aggregation_number: u32,
3397        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3398    ) {
3399        self.0
3400            .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3401    }
3402
3403    fn mark_own_task_as_session_dependent(
3404        &self,
3405        task: TaskId,
3406        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3407    ) {
3408        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3409    }
3410
3411    fn connect_task(
3412        &self,
3413        task: TaskId,
3414        parent_task: Option<TaskId>,
3415        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3416    ) {
3417        self.0.connect_task(task, parent_task, turbo_tasks);
3418    }
3419
3420    fn create_transient_task(
3421        &self,
3422        task_type: TransientTaskType,
3423        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3424    ) -> TaskId {
3425        self.0.create_transient_task(task_type)
3426    }
3427
3428    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3429        self.0.dispose_root_task(task_id, turbo_tasks);
3430    }
3431
3432    fn task_statistics(&self) -> &TaskStatisticsApi {
3433        &self.0.task_statistics
3434    }
3435
3436    fn is_tracking_dependencies(&self) -> bool {
3437        self.0.options.dependency_tracking
3438    }
3439}
3440
3441enum DebugTraceTransientTask {
3442    Cached {
3443        task_name: &'static str,
3444        cell_type_id: Option<ValueTypeId>,
3445        cause_self: Option<Box<DebugTraceTransientTask>>,
3446        cause_args: Vec<DebugTraceTransientTask>,
3447    },
3448    /// This representation is used when this task is a duplicate of one previously shown
3449    Collapsed {
3450        task_name: &'static str,
3451        cell_type_id: Option<ValueTypeId>,
3452    },
3453    Uncached {
3454        cell_type_id: Option<ValueTypeId>,
3455    },
3456}
3457
3458impl DebugTraceTransientTask {
3459    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3460        let indent = "    ".repeat(level);
3461        f.write_str(&indent)?;
3462
3463        fn fmt_cell_type_id(
3464            f: &mut fmt::Formatter<'_>,
3465            cell_type_id: Option<ValueTypeId>,
3466        ) -> fmt::Result {
3467            if let Some(ty) = cell_type_id {
3468                write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3469            } else {
3470                Ok(())
3471            }
3472        }
3473
3474        // write the name and type
3475        match self {
3476            Self::Cached {
3477                task_name,
3478                cell_type_id,
3479                ..
3480            }
3481            | Self::Collapsed {
3482                task_name,
3483                cell_type_id,
3484                ..
3485            } => {
3486                f.write_str(task_name)?;
3487                fmt_cell_type_id(f, *cell_type_id)?;
3488                if matches!(self, Self::Collapsed { .. }) {
3489                    f.write_str(" (collapsed)")?;
3490                }
3491            }
3492            Self::Uncached { cell_type_id } => {
3493                f.write_str("unknown transient task")?;
3494                fmt_cell_type_id(f, *cell_type_id)?;
3495            }
3496        }
3497        f.write_char('\n')?;
3498
3499        // write any extra "cause" information we might have
3500        if let Self::Cached {
3501            cause_self,
3502            cause_args,
3503            ..
3504        } = self
3505        {
3506            if let Some(c) = cause_self {
3507                writeln!(f, "{indent}  self:")?;
3508                c.fmt_indented(f, level + 1)?;
3509            }
3510            if !cause_args.is_empty() {
3511                writeln!(f, "{indent}  args:")?;
3512                for c in cause_args {
3513                    c.fmt_indented(f, level + 1)?;
3514                }
3515            }
3516        }
3517        Ok(())
3518    }
3519}
3520
3521impl fmt::Display for DebugTraceTransientTask {
3522    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3523        self.fmt_indented(f, 0)
3524    }
3525}
3526
3527// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3528fn far_future() -> Instant {
3529    // Roughly 30 years from now.
3530    // API does not provide a way to obtain max `Instant`
3531    // or convert specific date in the future to instant.
3532    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3533    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3534}