turbo_tasks_backend/backend/
mod.rs

1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7    borrow::Cow,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    pin::Pin,
13    sync::{
14        Arc, LazyLock,
15        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16    },
17};
18
19use anyhow::{Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use tracing::{Span, trace_span};
27use turbo_tasks::{
28    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
29    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
30    TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, ValueTypeId,
31    backend::{
32        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
33        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
34    },
35    event::{Event, EventListener},
36    message_queue::TimingEvent,
37    registry::get_value_type,
38    scope::scope_and_block,
39    task_statistics::TaskStatisticsApi,
40    trace::TraceRawVcs,
41    turbo_tasks,
42    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
43};
44
45pub use self::{
46    operation::AnyOperation,
47    storage::{SpecificTaskDataCategory, TaskDataCategory},
48};
49#[cfg(feature = "trace_task_dirty")]
50use crate::backend::operation::TaskDirtyCause;
51use crate::{
52    backend::{
53        operation::{
54            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
55            CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
56            ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
57            TaskGuard, connect_children, get_aggregation_number, get_uppers, is_root_node,
58            make_task_dirty_internal, prepare_new_children,
59        },
60        storage::Storage,
61        storage_schema::{TaskStorage, TaskStorageAccessors},
62    },
63    backing_storage::BackingStorage,
64    data::{
65        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
66        InProgressState, InProgressStateInner, OutputValue, RootType,
67    },
68    utils::{
69        bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
70        ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
71    },
72};
73
74/// Threshold for parallelizing making dependent tasks dirty.
75/// If the number of dependent tasks exceeds this threshold,
76/// the operation will be parallelized.
77const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
78
79const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
80
81/// Configurable idle timeout for snapshot persistence.
82/// Defaults to 2 seconds if not set or if the value is invalid.
83static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
84    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
85        .ok()
86        .and_then(|v| v.parse::<u64>().ok())
87        .map(Duration::from_millis)
88        .unwrap_or(Duration::from_secs(2))
89});
90
91struct SnapshotRequest {
92    snapshot_requested: bool,
93    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
94}
95
96impl SnapshotRequest {
97    fn new() -> Self {
98        Self {
99            snapshot_requested: false,
100            suspended_operations: FxHashSet::default(),
101        }
102    }
103}
104
105type TransientTaskOnce =
106    Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
107
108pub enum TransientTask {
109    /// A root task that will track dependencies and re-execute when
110    /// dependencies change. Task will eventually settle to the correct
111    /// execution.
112    ///
113    /// Always active. Automatically scheduled.
114    Root(TransientTaskRoot),
115
116    // TODO implement these strongly consistency
117    /// A single root task execution. It won't track dependencies.
118    /// Task will definitely include all invalidations that happened before the
119    /// start of the task. It may or may not include invalidations that
120    /// happened after that. It may see these invalidations partially
121    /// applied.
122    ///
123    /// Active until done. Automatically scheduled.
124    Once(TransientTaskOnce),
125}
126
127pub enum StorageMode {
128    /// Queries the storage for cache entries that don't exist locally.
129    ReadOnly,
130    /// Queries the storage for cache entries that don't exist locally.
131    /// Regularly pushes changes to the backing storage.
132    ReadWrite,
133    /// Queries the storage for cache entries that don't exist locally.
134    /// On shutdown, pushes all changes to the backing storage.
135    ReadWriteOnShutdown,
136}
137
138pub struct BackendOptions {
139    /// Enables dependency tracking.
140    ///
141    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
142    /// forever.
143    pub dependency_tracking: bool,
144
145    /// Enables active tracking.
146    ///
147    /// Automatically disabled when `dependency_tracking` is disabled.
148    ///
149    /// When disabled: All tasks are considered as active.
150    pub active_tracking: bool,
151
152    /// Enables the backing storage.
153    pub storage_mode: Option<StorageMode>,
154
155    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
156    /// datastructures. If `None`, it will use the available parallelism.
157    pub num_workers: Option<usize>,
158
159    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
160    pub small_preallocation: bool,
161}
162
163impl Default for BackendOptions {
164    fn default() -> Self {
165        Self {
166            dependency_tracking: true,
167            active_tracking: true,
168            storage_mode: Some(StorageMode::ReadWrite),
169            num_workers: None,
170            small_preallocation: false,
171        }
172    }
173}
174
175pub enum TurboTasksBackendJob {
176    InitialSnapshot,
177    FollowUpSnapshot,
178}
179
180pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
181
182type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
183
184struct TurboTasksBackendInner<B: BackingStorage> {
185    options: BackendOptions,
186
187    start_time: Instant,
188
189    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
190    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
191
192    persisted_task_cache_log: Option<TaskCacheLog>,
193    task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
194    transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
195
196    storage: Storage,
197
198    /// When true, the backing_storage has data that is not in the local storage.
199    local_is_partial: AtomicBool,
200
201    /// Number of executing operations + Highest bit is set when snapshot is
202    /// requested. When that bit is set, operations should pause until the
203    /// snapshot is completed. When the bit is set and in progress counter
204    /// reaches zero, `operations_completed_when_snapshot_requested` is
205    /// triggered.
206    in_progress_operations: AtomicUsize,
207
208    snapshot_request: Mutex<SnapshotRequest>,
209    /// Condition Variable that is triggered when `in_progress_operations`
210    /// reaches zero while snapshot is requested. All operations are either
211    /// completed or suspended.
212    operations_suspended: Condvar,
213    /// Condition Variable that is triggered when a snapshot is completed and
214    /// operations can continue.
215    snapshot_completed: Condvar,
216    /// The timestamp of the last started snapshot since [`Self::start_time`].
217    last_snapshot: AtomicU64,
218
219    stopping: AtomicBool,
220    stopping_event: Event,
221    idle_start_event: Event,
222    idle_end_event: Event,
223    #[cfg(feature = "verify_aggregation_graph")]
224    is_idle: AtomicBool,
225
226    task_statistics: TaskStatisticsApi,
227
228    backing_storage: B,
229
230    #[cfg(feature = "verify_aggregation_graph")]
231    root_tasks: Mutex<FxHashSet<TaskId>>,
232}
233
234impl<B: BackingStorage> TurboTasksBackend<B> {
235    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
236        Self(Arc::new(TurboTasksBackendInner::new(
237            options,
238            backing_storage,
239        )))
240    }
241
242    pub fn backing_storage(&self) -> &B {
243        &self.0.backing_storage
244    }
245}
246
247impl<B: BackingStorage> TurboTasksBackendInner<B> {
248    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
249        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
250        let need_log = matches!(
251            options.storage_mode,
252            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
253        );
254        if !options.dependency_tracking {
255            options.active_tracking = false;
256        }
257        let small_preallocation = options.small_preallocation;
258        let next_task_id = backing_storage
259            .next_free_task_id()
260            .expect("Failed to get task id");
261        Self {
262            options,
263            start_time: Instant::now(),
264            persisted_task_id_factory: IdFactoryWithReuse::new(
265                next_task_id,
266                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
267            ),
268            transient_task_id_factory: IdFactoryWithReuse::new(
269                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
270                TaskId::MAX,
271            ),
272            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
273            task_cache: BiMap::new(),
274            transient_tasks: FxDashMap::default(),
275            local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
276            storage: Storage::new(shard_amount, small_preallocation),
277            in_progress_operations: AtomicUsize::new(0),
278            snapshot_request: Mutex::new(SnapshotRequest::new()),
279            operations_suspended: Condvar::new(),
280            snapshot_completed: Condvar::new(),
281            last_snapshot: AtomicU64::new(0),
282            stopping: AtomicBool::new(false),
283            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
284            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
285            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
286            #[cfg(feature = "verify_aggregation_graph")]
287            is_idle: AtomicBool::new(false),
288            task_statistics: TaskStatisticsApi::default(),
289            backing_storage,
290            #[cfg(feature = "verify_aggregation_graph")]
291            root_tasks: Default::default(),
292        }
293    }
294
295    fn execute_context<'a>(
296        &'a self,
297        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
298    ) -> impl ExecuteContext<'a> {
299        ExecuteContextImpl::new(self, turbo_tasks)
300    }
301
302    /// # Safety
303    ///
304    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
305    unsafe fn execute_context_with_tx<'e, 'tx>(
306        &'e self,
307        tx: Option<&'e B::ReadTransaction<'tx>>,
308        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
309    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
310    where
311        'tx: 'e,
312    {
313        // Safety: `tx` is from `self`.
314        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
315    }
316
317    fn suspending_requested(&self) -> bool {
318        self.should_persist()
319            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
320    }
321
322    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
323        #[cold]
324        fn operation_suspend_point_cold<B: BackingStorage>(
325            this: &TurboTasksBackendInner<B>,
326            suspend: impl FnOnce() -> AnyOperation,
327        ) {
328            let operation = Arc::new(suspend());
329            let mut snapshot_request = this.snapshot_request.lock();
330            if snapshot_request.snapshot_requested {
331                snapshot_request
332                    .suspended_operations
333                    .insert(operation.clone().into());
334                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
335                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
336                if value == SNAPSHOT_REQUESTED_BIT {
337                    this.operations_suspended.notify_all();
338                }
339                this.snapshot_completed
340                    .wait_while(&mut snapshot_request, |snapshot_request| {
341                        snapshot_request.snapshot_requested
342                    });
343                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
344                snapshot_request
345                    .suspended_operations
346                    .remove(&operation.into());
347            }
348        }
349
350        if self.suspending_requested() {
351            operation_suspend_point_cold(self, suspend);
352        }
353    }
354
355    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
356        if !self.should_persist() {
357            return OperationGuard { backend: None };
358        }
359        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
360        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
361            let mut snapshot_request = self.snapshot_request.lock();
362            if snapshot_request.snapshot_requested {
363                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
364                if value == SNAPSHOT_REQUESTED_BIT {
365                    self.operations_suspended.notify_all();
366                }
367                self.snapshot_completed
368                    .wait_while(&mut snapshot_request, |snapshot_request| {
369                        snapshot_request.snapshot_requested
370                    });
371                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
372            }
373        }
374        OperationGuard {
375            backend: Some(self),
376        }
377    }
378
379    fn should_persist(&self) -> bool {
380        matches!(
381            self.options.storage_mode,
382            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
383        )
384    }
385
386    fn should_restore(&self) -> bool {
387        self.options.storage_mode.is_some()
388    }
389
390    fn should_track_dependencies(&self) -> bool {
391        self.options.dependency_tracking
392    }
393
394    fn should_track_activeness(&self) -> bool {
395        self.options.active_tracking
396    }
397
398    fn track_cache_hit(&self, task_type: &CachedTaskType) {
399        self.task_statistics
400            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
401    }
402
403    fn track_cache_miss(&self, task_type: &CachedTaskType) {
404        self.task_statistics
405            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
406    }
407}
408
409pub(crate) struct OperationGuard<'a, B: BackingStorage> {
410    backend: Option<&'a TurboTasksBackendInner<B>>,
411}
412
413impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
414    fn drop(&mut self) {
415        if let Some(backend) = self.backend {
416            let fetch_sub = backend
417                .in_progress_operations
418                .fetch_sub(1, Ordering::AcqRel);
419            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
420                backend.operations_suspended.notify_all();
421            }
422        }
423    }
424}
425
426/// Intermediate result of step 1 of task execution completion.
427struct TaskExecutionCompletePrepareResult {
428    pub new_children: FxHashSet<TaskId>,
429    pub is_now_immutable: bool,
430    #[cfg(feature = "verify_determinism")]
431    pub no_output_set: bool,
432    pub new_output: Option<OutputValue>,
433    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
434}
435
436// Operations
437impl<B: BackingStorage> TurboTasksBackendInner<B> {
438    /// # Safety
439    ///
440    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
441    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
442        &'l self,
443        tx: Option<&'l B::ReadTransaction<'tx>>,
444        parent_task: Option<TaskId>,
445        child_task: TaskId,
446        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
447    ) {
448        operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
449            self.execute_context_with_tx(tx, turbo_tasks)
450        });
451    }
452
453    fn connect_child(
454        &self,
455        parent_task: Option<TaskId>,
456        child_task: TaskId,
457        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
458    ) {
459        operation::ConnectChildOperation::run(
460            parent_task,
461            child_task,
462            self.execute_context(turbo_tasks),
463        );
464    }
465
466    fn try_read_task_output(
467        self: &Arc<Self>,
468        task_id: TaskId,
469        reader: Option<TaskId>,
470        options: ReadOutputOptions,
471        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
472    ) -> Result<Result<RawVc, EventListener>> {
473        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
474
475        let mut ctx = self.execute_context(turbo_tasks);
476        let need_reader_task = if self.should_track_dependencies()
477            && !matches!(options.tracking, ReadTracking::Untracked)
478            && reader.is_some_and(|reader_id| reader_id != task_id)
479            && let Some(reader_id) = reader
480            && reader_id != task_id
481        {
482            Some(reader_id)
483        } else {
484            None
485        };
486        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
487            // Having a task_pair here is not optimal, but otherwise this would lead to a race
488            // condition. See below.
489            // TODO(sokra): solve that in a more performant way.
490            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
491            (task, Some(reader))
492        } else {
493            (ctx.task(task_id, TaskDataCategory::All), None)
494        };
495
496        fn listen_to_done_event<B: BackingStorage>(
497            this: &TurboTasksBackendInner<B>,
498            reader: Option<TaskId>,
499            tracking: ReadTracking,
500            done_event: &Event,
501        ) -> EventListener {
502            done_event.listen_with_note(move || {
503                let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
504                move || {
505                    if let Some(reader_desc) = reader_desc.as_ref() {
506                        format!("try_read_task_output from {} ({})", reader_desc(), tracking)
507                    } else {
508                        format!("try_read_task_output ({})", tracking)
509                    }
510                }
511            })
512        }
513
514        fn check_in_progress<B: BackingStorage>(
515            this: &TurboTasksBackendInner<B>,
516            task: &impl TaskGuard,
517            reader: Option<TaskId>,
518            tracking: ReadTracking,
519            ctx: &impl ExecuteContext<'_>,
520        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
521        {
522            match task.get_in_progress() {
523                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
524                    listen_to_done_event(this, reader, tracking, done_event),
525                ))),
526                Some(InProgressState::InProgress(box InProgressStateInner {
527                    done_event, ..
528                })) => Some(Ok(Err(listen_to_done_event(
529                    this, reader, tracking, done_event,
530                )))),
531                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
532                    "{} was canceled",
533                    ctx.get_task_description(task.id())
534                ))),
535                None => None,
536            }
537        }
538
539        if matches!(options.consistency, ReadConsistency::Strong) {
540            // Ensure it's an root node
541            loop {
542                let aggregation_number = get_aggregation_number(&task);
543                if is_root_node(aggregation_number) {
544                    break;
545                }
546                drop(task);
547                drop(reader_task);
548                {
549                    let _span = tracing::trace_span!(
550                        "make root node for strongly consistent read",
551                        %task_id
552                    )
553                    .entered();
554                    AggregationUpdateQueue::run(
555                        AggregationUpdateJob::UpdateAggregationNumber {
556                            task_id,
557                            base_aggregation_number: u32::MAX,
558                            distance: None,
559                        },
560                        &mut ctx,
561                    );
562                }
563                (task, reader_task) = if let Some(reader_id) = need_reader_task {
564                    // TODO(sokra): see comment above
565                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
566                    (task, Some(reader))
567                } else {
568                    (ctx.task(task_id, TaskDataCategory::All), None)
569                }
570            }
571
572            let is_dirty = task.is_dirty();
573
574            // Check the dirty count of the root node
575            let has_dirty_containers = task.has_dirty_containers();
576            if has_dirty_containers || is_dirty.is_some() {
577                let activeness = task.get_activeness_mut();
578                let mut task_ids_to_schedule: Vec<_> = Vec::new();
579                // When there are dirty task, subscribe to the all_clean_event
580                let activeness = if let Some(activeness) = activeness {
581                    // This makes sure all tasks stay active and this task won't stale.
582                    // active_until_clean is automatically removed when this
583                    // task is clean.
584                    activeness.set_active_until_clean();
585                    activeness
586                } else {
587                    // If we don't have a root state, add one. This also makes sure all tasks stay
588                    // active and this task won't stale. active_until_clean
589                    // is automatically removed when this task is clean.
590                    if ctx.should_track_activeness() {
591                        // A newly added Activeness need to make sure to schedule the tasks
592                        task_ids_to_schedule = task.dirty_containers().collect();
593                        task_ids_to_schedule.push(task_id);
594                    }
595                    let activeness =
596                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
597                    activeness.set_active_until_clean();
598                    activeness
599                };
600                let listener = activeness.all_clean_event.listen_with_note(move || {
601                    let this = self.clone();
602                    let tt = turbo_tasks.pin();
603                    move || {
604                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
605                        let mut ctx = this.execute_context(tt);
606                        let mut visited = FxHashSet::default();
607                        fn indent(s: &str) -> String {
608                            s.split_inclusive('\n')
609                                .flat_map(|line: &str| ["  ", line].into_iter())
610                                .collect::<String>()
611                        }
612                        fn get_info(
613                            ctx: &mut impl ExecuteContext<'_>,
614                            task_id: TaskId,
615                            parent_and_count: Option<(TaskId, i32)>,
616                            visited: &mut FxHashSet<TaskId>,
617                        ) -> String {
618                            let task = ctx.task(task_id, TaskDataCategory::All);
619                            let is_dirty = task.is_dirty();
620                            let in_progress =
621                                task.get_in_progress()
622                                    .map_or("not in progress", |p| match p {
623                                        InProgressState::InProgress(_) => "in progress",
624                                        InProgressState::Scheduled { .. } => "scheduled",
625                                        InProgressState::Canceled => "canceled",
626                                    });
627                            let activeness = task.get_activeness().map_or_else(
628                                || "not active".to_string(),
629                                |activeness| format!("{activeness:?}"),
630                            );
631                            let aggregation_number = get_aggregation_number(&task);
632                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
633                            {
634                                let uppers = get_uppers(&task);
635                                !uppers.contains(&parent_task_id)
636                            } else {
637                                false
638                            };
639
640                            // Check the dirty count of the root node
641                            let has_dirty_containers = task.has_dirty_containers();
642
643                            let task_description = ctx.get_task_description(task_id);
644                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
645                                format!(", dirty({parent_priority})")
646                            } else {
647                                String::new()
648                            };
649                            let has_dirty_containers_label = if has_dirty_containers {
650                                ", dirty containers"
651                            } else {
652                                ""
653                            };
654                            let count = if let Some((_, count)) = parent_and_count {
655                                format!(" {count}")
656                            } else {
657                                String::new()
658                            };
659                            let mut info = format!(
660                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
661                                 {in_progress}, \
662                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
663                            );
664                            let children: Vec<_> = task.dirty_containers_with_count().collect();
665                            drop(task);
666
667                            if missing_upper {
668                                info.push_str("\n  ERROR: missing upper connection");
669                            }
670
671                            if has_dirty_containers || !children.is_empty() {
672                                writeln!(info, "\n  dirty tasks:").unwrap();
673
674                                for (child_task_id, count) in children {
675                                    let task_description = ctx.get_task_description(child_task_id);
676                                    if visited.insert(child_task_id) {
677                                        let child_info = get_info(
678                                            ctx,
679                                            child_task_id,
680                                            Some((task_id, count)),
681                                            visited,
682                                        );
683                                        info.push_str(&indent(&child_info));
684                                        if !info.ends_with('\n') {
685                                            info.push('\n');
686                                        }
687                                    } else {
688                                        writeln!(
689                                            info,
690                                            "  {child_task_id} {task_description} {count} \
691                                             (already visited)"
692                                        )
693                                        .unwrap();
694                                    }
695                                }
696                            }
697                            info
698                        }
699                        let info = get_info(&mut ctx, task_id, None, &mut visited);
700                        format!(
701                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
702                        )
703                    }
704                });
705                drop(reader_task);
706                drop(task);
707                if !task_ids_to_schedule.is_empty() {
708                    let mut queue = AggregationUpdateQueue::new();
709                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
710                    queue.execute(&mut ctx);
711                }
712
713                return Ok(Err(listener));
714            }
715        }
716
717        if let Some(value) = check_in_progress(self, &task, reader, options.tracking, &ctx) {
718            return value;
719        }
720
721        if let Some(output) = task.get_output() {
722            let result = match output {
723                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
724                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
725                OutputValue::Error(error) => Err(error
726                    .with_task_context(ctx.get_task_description(task_id), Some(task_id))
727                    .into()),
728            };
729            if let Some(mut reader_task) = reader_task
730                && options.tracking.should_track(result.is_err())
731                && (!task.immutable() || cfg!(feature = "verify_immutable"))
732            {
733                #[cfg(feature = "trace_task_output_dependencies")]
734                let _span = tracing::trace_span!(
735                    "add output dependency",
736                    task = %task_id,
737                    dependent_task = ?reader
738                )
739                .entered();
740                let mut queue = LeafDistanceUpdateQueue::new();
741                let reader = reader.unwrap();
742                if task.add_output_dependent(reader) {
743                    // Ensure that dependent leaf distance is strictly monotonic increasing
744                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
745                    let reader_leaf_distance =
746                        reader_task.get_leaf_distance().copied().unwrap_or_default();
747                    if reader_leaf_distance.distance <= leaf_distance.distance {
748                        queue.push(
749                            reader,
750                            leaf_distance.distance,
751                            leaf_distance.max_distance_in_buffer,
752                        );
753                    }
754                }
755
756                drop(task);
757
758                // Note: We use `task_pair` earlier to lock the task and its reader at the same
759                // time. If we didn't and just locked the reader here, an invalidation could occur
760                // between grabbing the locks. If that happened, and if the task is "outdated" or
761                // doesn't have the dependency edge yet, the invalidation would be lost.
762
763                if !reader_task.remove_outdated_output_dependencies(&task_id) {
764                    let _ = reader_task.add_output_dependencies(task_id);
765                }
766                drop(reader_task);
767
768                queue.execute(&mut ctx);
769            }
770
771            return result;
772        }
773        drop(reader_task);
774
775        let note = move || {
776            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
777            move || {
778                if let Some(reader_desc) = reader_desc.as_ref() {
779                    format!("try_read_task_output (recompute) from {}", (reader_desc)())
780                } else {
781                    "try_read_task_output (recompute, untracked)".to_string()
782                }
783            }
784        };
785
786        // Output doesn't exist. We need to schedule the task to compute it.
787        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
788            TaskExecutionReason::OutputNotAvailable,
789            || self.get_task_desc_fn(task_id),
790            note,
791        );
792        // It's not possible that the task is InProgress at this point. If it is InProgress {
793        // done: true } it must have Output and would early return.
794        let old = task.set_in_progress(in_progress_state);
795        debug_assert!(old.is_none(), "InProgress already exists");
796        ctx.schedule_task(task, TaskPriority::Initial);
797
798        Ok(Err(listener))
799    }
800
801    fn try_read_task_cell(
802        &self,
803        task_id: TaskId,
804        reader: Option<TaskId>,
805        cell: CellId,
806        options: ReadCellOptions,
807        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
808    ) -> Result<Result<TypedCellContent, EventListener>> {
809        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
810
811        fn add_cell_dependency(
812            task_id: TaskId,
813            mut task: impl TaskGuard,
814            reader: Option<TaskId>,
815            reader_task: Option<impl TaskGuard>,
816            cell: CellId,
817            key: Option<u64>,
818        ) {
819            if let Some(mut reader_task) = reader_task
820                && (!task.immutable() || cfg!(feature = "verify_immutable"))
821            {
822                let reader = reader.unwrap();
823                let _ = task.add_cell_dependents((cell, key, reader));
824                drop(task);
825
826                // Note: We use `task_pair` earlier to lock the task and its reader at the same
827                // time. If we didn't and just locked the reader here, an invalidation could occur
828                // between grabbing the locks. If that happened, and if the task is "outdated" or
829                // doesn't have the dependency edge yet, the invalidation would be lost.
830
831                let target = CellRef {
832                    task: task_id,
833                    cell,
834                };
835                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
836                    let _ = reader_task.add_cell_dependencies((target, key));
837                }
838                drop(reader_task);
839            }
840        }
841
842        let ReadCellOptions {
843            is_serializable_cell_content,
844            tracking,
845            final_read_hint,
846        } = options;
847
848        let mut ctx = self.execute_context(turbo_tasks);
849        let (mut task, reader_task) = if self.should_track_dependencies()
850            && !matches!(tracking, ReadCellTracking::Untracked)
851            && let Some(reader_id) = reader
852            && reader_id != task_id
853        {
854            // Having a task_pair here is not optimal, but otherwise this would lead to a race
855            // condition. See below.
856            // TODO(sokra): solve that in a more performant way.
857            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
858            (task, Some(reader))
859        } else {
860            (ctx.task(task_id, TaskDataCategory::All), None)
861        };
862
863        let content = if final_read_hint {
864            task.remove_cell_data(is_serializable_cell_content, cell)
865        } else {
866            task.get_cell_data(is_serializable_cell_content, cell)
867        };
868        if let Some(content) = content {
869            if tracking.should_track(false) {
870                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
871            }
872            return Ok(Ok(TypedCellContent(
873                cell.type_id,
874                CellContent(Some(content.reference)),
875            )));
876        }
877
878        let in_progress = task.get_in_progress();
879        if matches!(
880            in_progress,
881            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
882        ) {
883            return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
884        }
885        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
886
887        // Check cell index range (cell might not exist at all)
888        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
889        let Some(max_id) = max_id else {
890            if tracking.should_track(true) {
891                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
892            }
893            bail!(
894                "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
895                ctx.get_task_description(task_id)
896            );
897        };
898        if cell.index >= max_id {
899            if tracking.should_track(true) {
900                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
901            }
902            bail!(
903                "Cell {cell:?} no longer exists in task {} (index out of bounds)",
904                ctx.get_task_description(task_id)
905            );
906        }
907        drop(reader_task);
908
909        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
910        // task the get the cell content.
911
912        // Listen to the cell and potentially schedule the task
913        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
914        if !new_listener {
915            return Ok(Err(listener));
916        }
917
918        let _span = tracing::trace_span!(
919            "recomputation",
920            cell_type = get_value_type(cell.type_id).global_name,
921            cell_index = cell.index
922        )
923        .entered();
924
925        // Schedule the task, if not already scheduled
926        if is_cancelled {
927            bail!("{} was canceled", ctx.get_task_description(task_id));
928        }
929        let _ = task.add_scheduled(TaskExecutionReason::CellNotAvailable, || {
930            self.get_task_desc_fn(task_id)
931        });
932        ctx.schedule_task(task, TaskPriority::initial());
933
934        Ok(Err(listener))
935    }
936
937    fn listen_to_cell(
938        &self,
939        task: &mut impl TaskGuard,
940        task_id: TaskId,
941        reader: Option<TaskId>,
942        cell: CellId,
943    ) -> (EventListener, bool) {
944        let note = move || {
945            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
946            move || {
947                if let Some(reader_desc) = reader_desc.as_ref() {
948                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
949                } else {
950                    "try_read_task_cell (in progress, untracked)".to_string()
951                }
952            }
953        };
954        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
955            // Someone else is already computing the cell
956            let listener = in_progress.event.listen_with_note(note);
957            return (listener, false);
958        }
959        let in_progress = InProgressCellState::new(task_id, cell);
960        let listener = in_progress.event.listen_with_note(note);
961        let old = task.insert_in_progress_cells(cell, in_progress);
962        debug_assert!(old.is_none(), "InProgressCell already exists");
963        (listener, true)
964    }
965
966    fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
967        if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
968            return Some(task_type);
969        }
970        if self.should_restore()
971            && self.local_is_partial.load(Ordering::Acquire)
972            && !task_id.is_transient()
973            && let Some(task_type) = unsafe {
974                self.backing_storage
975                    .reverse_lookup_task_cache(None, task_id)
976                    .expect("Failed to lookup task type")
977            }
978        {
979            let _ = self.task_cache.try_insert(task_type.clone(), task_id);
980            return Some(task_type);
981        }
982        None
983    }
984
985    fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
986        let task_type = self.lookup_task_type(task_id);
987        move || {
988            task_type.as_ref().map_or_else(
989                || format!("{task_id:?} transient"),
990                |task_type| format!("{task_id:?} {task_type}"),
991            )
992        }
993    }
994
995    fn snapshot_and_persist(
996        &self,
997        parent_span: Option<tracing::Id>,
998        reason: &str,
999    ) -> Option<(Instant, bool)> {
1000        let snapshot_span =
1001            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1002                .entered();
1003        let start = Instant::now();
1004        debug_assert!(self.should_persist());
1005
1006        let suspended_operations;
1007        {
1008            let _span = tracing::info_span!("blocking").entered();
1009            let mut snapshot_request = self.snapshot_request.lock();
1010            snapshot_request.snapshot_requested = true;
1011            let active_operations = self
1012                .in_progress_operations
1013                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1014            if active_operations != 0 {
1015                self.operations_suspended
1016                    .wait_while(&mut snapshot_request, |_| {
1017                        self.in_progress_operations.load(Ordering::Relaxed)
1018                            != SNAPSHOT_REQUESTED_BIT
1019                    });
1020            }
1021            suspended_operations = snapshot_request
1022                .suspended_operations
1023                .iter()
1024                .map(|op| op.arc().clone())
1025                .collect::<Vec<_>>();
1026        }
1027        self.storage.start_snapshot();
1028        let mut persisted_task_cache_log = self
1029            .persisted_task_cache_log
1030            .as_ref()
1031            .map(|l| l.take(|i| i))
1032            .unwrap_or_default();
1033        let mut snapshot_request = self.snapshot_request.lock();
1034        snapshot_request.snapshot_requested = false;
1035        self.in_progress_operations
1036            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1037        self.snapshot_completed.notify_all();
1038        let snapshot_time = Instant::now();
1039        drop(snapshot_request);
1040
1041        let preprocess = |task_id: TaskId, inner: &TaskStorage| {
1042            if task_id.is_transient() {
1043                return (None, None);
1044            }
1045
1046            let meta_restored = inner.flags.meta_restored();
1047            let data_restored = inner.flags.data_restored();
1048
1049            // Encode meta/data directly from TaskStorage
1050            let meta = meta_restored.then(|| inner.clone_meta_snapshot());
1051            let data = data_restored.then(|| inner.clone_data_snapshot());
1052
1053            (meta, data)
1054        };
1055        let process =
1056            |task_id: TaskId, (meta, data): (Option<TaskStorage>, Option<TaskStorage>)| {
1057                // TODO: perf: Instead of returning a `Vec` of individually allocated `SmallVec`s,
1058                // it'd be better to append everything to a flat per-task or
1059                // per-shard `Vec<u8>`, and have each `serialize` call return
1060                // `(start_idx, end_idx)`.
1061                (
1062                    task_id,
1063                    meta.map(|d| {
1064                        self.backing_storage
1065                            .serialize(task_id, &d, SpecificTaskDataCategory::Meta)
1066                    }),
1067                    data.map(|d| {
1068                        self.backing_storage
1069                            .serialize(task_id, &d, SpecificTaskDataCategory::Data)
1070                    }),
1071                )
1072            };
1073        let process_snapshot = |task_id: TaskId, inner: Box<TaskStorage>| {
1074            if task_id.is_transient() {
1075                return (task_id, None, None);
1076            }
1077
1078            // Encode meta/data directly from TaskStorage snapshot
1079            (
1080                task_id,
1081                inner.flags.meta_modified().then(|| {
1082                    self.backing_storage
1083                        .serialize(task_id, &inner, SpecificTaskDataCategory::Meta)
1084                }),
1085                inner.flags.data_modified().then(|| {
1086                    self.backing_storage
1087                        .serialize(task_id, &inner, SpecificTaskDataCategory::Data)
1088                }),
1089            )
1090        };
1091
1092        let snapshot = self
1093            .storage
1094            .take_snapshot(&preprocess, &process, &process_snapshot);
1095
1096        #[cfg(feature = "print_cache_item_size")]
1097        #[derive(Default)]
1098        struct TaskCacheStats {
1099            data: usize,
1100            data_compressed: usize,
1101            data_count: usize,
1102            meta: usize,
1103            meta_compressed: usize,
1104            meta_count: usize,
1105        }
1106        #[cfg(feature = "print_cache_item_size")]
1107        impl TaskCacheStats {
1108            fn compressed_size(data: &[u8]) -> Result<usize> {
1109                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1110                    data,
1111                    &mut Vec::new(),
1112                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1113                )?)
1114            }
1115
1116            fn add_data(&mut self, data: &[u8]) {
1117                self.data += data.len();
1118                self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1119                self.data_count += 1;
1120            }
1121
1122            fn add_meta(&mut self, data: &[u8]) {
1123                self.meta += data.len();
1124                self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1125                self.meta_count += 1;
1126            }
1127        }
1128        #[cfg(feature = "print_cache_item_size")]
1129        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1130            Mutex::new(FxHashMap::default());
1131
1132        let task_snapshots = snapshot
1133            .into_iter()
1134            .filter_map(|iter| {
1135                let mut iter = iter
1136                    .filter_map(
1137                        |(task_id, meta, data): (
1138                            _,
1139                            Option<Result<SmallVec<_>>>,
1140                            Option<Result<SmallVec<_>>>,
1141                        )| {
1142                            let meta = match meta {
1143                                Some(Ok(meta)) => {
1144                                    #[cfg(feature = "print_cache_item_size")]
1145                                    task_cache_stats
1146                                        .lock()
1147                                        .entry(self.get_task_description(task_id))
1148                                        .or_default()
1149                                        .add_meta(&meta);
1150                                    Some(meta)
1151                                }
1152                                None => None,
1153                                Some(Err(err)) => {
1154                                    println!(
1155                                        "Serializing task {} failed (meta): {:?}",
1156                                        self.get_task_description(task_id),
1157                                        err
1158                                    );
1159                                    None
1160                                }
1161                            };
1162                            let data = match data {
1163                                Some(Ok(data)) => {
1164                                    #[cfg(feature = "print_cache_item_size")]
1165                                    task_cache_stats
1166                                        .lock()
1167                                        .entry(self.get_task_description(task_id))
1168                                        .or_default()
1169                                        .add_data(&data);
1170                                    Some(data)
1171                                }
1172                                None => None,
1173                                Some(Err(err)) => {
1174                                    println!(
1175                                        "Serializing task {} failed (data): {:?}",
1176                                        self.get_task_description(task_id),
1177                                        err
1178                                    );
1179                                    None
1180                                }
1181                            };
1182                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1183                        },
1184                    )
1185                    .peekable();
1186                iter.peek().is_some().then_some(iter)
1187            })
1188            .collect::<Vec<_>>();
1189
1190        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1191
1192        drop(snapshot_span);
1193
1194        if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1195            return Some((snapshot_time, false));
1196        }
1197
1198        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1199        {
1200            if let Err(err) = self.backing_storage.save_snapshot(
1201                suspended_operations,
1202                persisted_task_cache_log,
1203                task_snapshots,
1204            ) {
1205                println!("Persisting failed: {err:?}");
1206                return None;
1207            }
1208            #[cfg(feature = "print_cache_item_size")]
1209            {
1210                let mut task_cache_stats = task_cache_stats
1211                    .into_inner()
1212                    .into_iter()
1213                    .collect::<Vec<_>>();
1214                if !task_cache_stats.is_empty() {
1215                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1216                        (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1217                            .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1218                    });
1219                    println!("Task cache stats:");
1220                    for (task_desc, stats) in task_cache_stats {
1221                        use turbo_tasks::util::FormatBytes;
1222
1223                        println!(
1224                            "  {} ({}) {task_desc} = {} ({}) meta {} x {} ({}), {} ({}) data {} x \
1225                             {} ({})",
1226                            FormatBytes(stats.data_compressed + stats.meta_compressed),
1227                            FormatBytes(stats.data + stats.meta),
1228                            FormatBytes(stats.meta_compressed),
1229                            FormatBytes(stats.meta),
1230                            stats.meta_count,
1231                            FormatBytes(
1232                                stats
1233                                    .meta_compressed
1234                                    .checked_div(stats.meta_count)
1235                                    .unwrap_or(0)
1236                            ),
1237                            FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1238                            FormatBytes(stats.data_compressed),
1239                            FormatBytes(stats.data),
1240                            stats.data_count,
1241                            FormatBytes(
1242                                stats
1243                                    .data_compressed
1244                                    .checked_div(stats.data_count)
1245                                    .unwrap_or(0)
1246                            ),
1247                            FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1248                        );
1249                    }
1250                }
1251            }
1252        }
1253
1254        let elapsed = start.elapsed();
1255        // avoid spamming the event queue with information about fast operations
1256        if elapsed > Duration::from_secs(10) {
1257            turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1258                "Finished writing to filesystem cache".to_string(),
1259                elapsed,
1260            )));
1261        }
1262
1263        Some((snapshot_time, true))
1264    }
1265
1266    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1267        if self.should_restore() {
1268            // Continue all uncompleted operations
1269            // They can't be interrupted by a snapshot since the snapshotting job has not been
1270            // scheduled yet.
1271            let uncompleted_operations = self
1272                .backing_storage
1273                .uncompleted_operations()
1274                .expect("Failed to get uncompleted operations");
1275            if !uncompleted_operations.is_empty() {
1276                let mut ctx = self.execute_context(turbo_tasks);
1277                for op in uncompleted_operations {
1278                    op.execute(&mut ctx);
1279                }
1280            }
1281        }
1282
1283        // Only when it should write regularly to the storage, we schedule the initial snapshot
1284        // job.
1285        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1286            // Schedule the snapshot job
1287            let _span = trace_span!("persisting background job").entered();
1288            let _span = tracing::info_span!("thread").entered();
1289            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1290        }
1291    }
1292
1293    fn stopping(&self) {
1294        self.stopping.store(true, Ordering::Release);
1295        self.stopping_event.notify(usize::MAX);
1296    }
1297
1298    #[allow(unused_variables)]
1299    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1300        #[cfg(feature = "verify_aggregation_graph")]
1301        {
1302            self.is_idle.store(false, Ordering::Release);
1303            self.verify_aggregation_graph(turbo_tasks, false);
1304        }
1305        if self.should_persist() {
1306            self.snapshot_and_persist(Span::current().into(), "stop");
1307        }
1308        self.task_cache.drop_contents();
1309        drop_contents(&self.transient_tasks);
1310        self.storage.drop_contents();
1311        if let Err(err) = self.backing_storage.shutdown() {
1312            println!("Shutting down failed: {err}");
1313        }
1314    }
1315
1316    #[allow(unused_variables)]
1317    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1318        self.idle_start_event.notify(usize::MAX);
1319
1320        #[cfg(feature = "verify_aggregation_graph")]
1321        {
1322            use tokio::select;
1323
1324            self.is_idle.store(true, Ordering::Release);
1325            let this = self.clone();
1326            let turbo_tasks = turbo_tasks.pin();
1327            tokio::task::spawn(async move {
1328                select! {
1329                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1330                        // do nothing
1331                    }
1332                    _ = this.idle_end_event.listen() => {
1333                        return;
1334                    }
1335                }
1336                if !this.is_idle.load(Ordering::Relaxed) {
1337                    return;
1338                }
1339                this.verify_aggregation_graph(&*turbo_tasks, true);
1340            });
1341        }
1342    }
1343
1344    fn idle_end(&self) {
1345        #[cfg(feature = "verify_aggregation_graph")]
1346        self.is_idle.store(false, Ordering::Release);
1347        self.idle_end_event.notify(usize::MAX);
1348    }
1349
1350    fn get_or_create_persistent_task(
1351        &self,
1352        task_type: CachedTaskType,
1353        parent_task: Option<TaskId>,
1354        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1355    ) -> TaskId {
1356        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1357            self.track_cache_hit(&task_type);
1358            self.connect_child(parent_task, task_id, turbo_tasks);
1359            return task_id;
1360        }
1361
1362        let check_backing_storage =
1363            self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1364        let tx = check_backing_storage
1365            .then(|| self.backing_storage.start_read_transaction())
1366            .flatten();
1367        let task_id = {
1368            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1369            if let Some(task_id) = unsafe {
1370                check_backing_storage
1371                    .then(|| {
1372                        self.backing_storage
1373                            .forward_lookup_task_cache(tx.as_ref(), &task_type)
1374                            .expect("Failed to lookup task id")
1375                    })
1376                    .flatten()
1377            } {
1378                self.track_cache_hit(&task_type);
1379                let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1380                task_id
1381            } else {
1382                let task_type = Arc::new(task_type);
1383                let task_id = self.persisted_task_id_factory.get();
1384                let task_id = if let Err(existing_task_id) =
1385                    self.task_cache.try_insert(task_type.clone(), task_id)
1386                {
1387                    self.track_cache_hit(&task_type);
1388                    // Safety: We just created the id and failed to insert it.
1389                    unsafe {
1390                        self.persisted_task_id_factory.reuse(task_id);
1391                    }
1392                    existing_task_id
1393                } else {
1394                    self.track_cache_miss(&task_type);
1395                    task_id
1396                };
1397                if let Some(log) = &self.persisted_task_cache_log {
1398                    log.lock(task_id).push((task_type, task_id));
1399                }
1400                task_id
1401            }
1402        };
1403
1404        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1405        unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1406
1407        task_id
1408    }
1409
1410    fn get_or_create_transient_task(
1411        &self,
1412        task_type: CachedTaskType,
1413        parent_task: Option<TaskId>,
1414        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1415    ) -> TaskId {
1416        if let Some(parent_task) = parent_task
1417            && !parent_task.is_transient()
1418        {
1419            self.panic_persistent_calling_transient(
1420                self.lookup_task_type(parent_task).as_deref(),
1421                Some(&task_type),
1422                /* cell_id */ None,
1423            );
1424        }
1425        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1426            self.track_cache_hit(&task_type);
1427            self.connect_child(parent_task, task_id, turbo_tasks);
1428            return task_id;
1429        }
1430
1431        let task_type = Arc::new(task_type);
1432        let task_id = self.transient_task_id_factory.get();
1433        if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1434            self.track_cache_hit(&task_type);
1435            // Safety: We just created the id and failed to insert it.
1436            unsafe {
1437                self.transient_task_id_factory.reuse(task_id);
1438            }
1439            self.connect_child(parent_task, existing_task_id, turbo_tasks);
1440            return existing_task_id;
1441        }
1442
1443        self.track_cache_miss(&task_type);
1444        self.connect_child(parent_task, task_id, turbo_tasks);
1445
1446        task_id
1447    }
1448
1449    /// Generate an object that implements [`fmt::Display`] explaining why the given
1450    /// [`CachedTaskType`] is transient.
1451    fn debug_trace_transient_task(
1452        &self,
1453        task_type: &CachedTaskType,
1454        cell_id: Option<CellId>,
1455    ) -> DebugTraceTransientTask {
1456        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1457        // from tracing the same task many times, so use a visited_set
1458        fn inner_id(
1459            backend: &TurboTasksBackendInner<impl BackingStorage>,
1460            task_id: TaskId,
1461            cell_type_id: Option<ValueTypeId>,
1462            visited_set: &mut FxHashSet<TaskId>,
1463        ) -> DebugTraceTransientTask {
1464            if let Some(task_type) = backend.lookup_task_type(task_id) {
1465                if visited_set.contains(&task_id) {
1466                    let task_name = task_type.get_name();
1467                    DebugTraceTransientTask::Collapsed {
1468                        task_name,
1469                        cell_type_id,
1470                    }
1471                } else {
1472                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1473                }
1474            } else {
1475                DebugTraceTransientTask::Uncached { cell_type_id }
1476            }
1477        }
1478        fn inner_cached(
1479            backend: &TurboTasksBackendInner<impl BackingStorage>,
1480            task_type: &CachedTaskType,
1481            cell_type_id: Option<ValueTypeId>,
1482            visited_set: &mut FxHashSet<TaskId>,
1483        ) -> DebugTraceTransientTask {
1484            let task_name = task_type.get_name();
1485
1486            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1487                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1488                    // `task_id` should never be `None` at this point, as that would imply a
1489                    // non-local task is returning a local `Vc`...
1490                    // Just ignore if it happens, as we're likely already panicking.
1491                    return None;
1492                };
1493                if task_id.is_transient() {
1494                    Some(Box::new(inner_id(
1495                        backend,
1496                        task_id,
1497                        cause_self_raw_vc.try_get_type_id(),
1498                        visited_set,
1499                    )))
1500                } else {
1501                    None
1502                }
1503            });
1504            let cause_args = task_type
1505                .arg
1506                .get_raw_vcs()
1507                .into_iter()
1508                .filter_map(|raw_vc| {
1509                    let Some(task_id) = raw_vc.try_get_task_id() else {
1510                        // `task_id` should never be `None` (see comment above)
1511                        return None;
1512                    };
1513                    if !task_id.is_transient() {
1514                        return None;
1515                    }
1516                    Some((task_id, raw_vc.try_get_type_id()))
1517                })
1518                .collect::<IndexSet<_>>() // dedupe
1519                .into_iter()
1520                .map(|(task_id, cell_type_id)| {
1521                    inner_id(backend, task_id, cell_type_id, visited_set)
1522                })
1523                .collect();
1524
1525            DebugTraceTransientTask::Cached {
1526                task_name,
1527                cell_type_id,
1528                cause_self,
1529                cause_args,
1530            }
1531        }
1532        inner_cached(
1533            self,
1534            task_type,
1535            cell_id.map(|c| c.type_id),
1536            &mut FxHashSet::default(),
1537        )
1538    }
1539
1540    fn invalidate_task(
1541        &self,
1542        task_id: TaskId,
1543        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1544    ) {
1545        if !self.should_track_dependencies() {
1546            panic!("Dependency tracking is disabled so invalidation is not allowed");
1547        }
1548        operation::InvalidateOperation::run(
1549            smallvec![task_id],
1550            #[cfg(feature = "trace_task_dirty")]
1551            TaskDirtyCause::Invalidator,
1552            self.execute_context(turbo_tasks),
1553        );
1554    }
1555
1556    fn invalidate_tasks(
1557        &self,
1558        tasks: &[TaskId],
1559        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1560    ) {
1561        if !self.should_track_dependencies() {
1562            panic!("Dependency tracking is disabled so invalidation is not allowed");
1563        }
1564        operation::InvalidateOperation::run(
1565            tasks.iter().copied().collect(),
1566            #[cfg(feature = "trace_task_dirty")]
1567            TaskDirtyCause::Unknown,
1568            self.execute_context(turbo_tasks),
1569        );
1570    }
1571
1572    fn invalidate_tasks_set(
1573        &self,
1574        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1575        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1576    ) {
1577        if !self.should_track_dependencies() {
1578            panic!("Dependency tracking is disabled so invalidation is not allowed");
1579        }
1580        operation::InvalidateOperation::run(
1581            tasks.iter().copied().collect(),
1582            #[cfg(feature = "trace_task_dirty")]
1583            TaskDirtyCause::Unknown,
1584            self.execute_context(turbo_tasks),
1585        );
1586    }
1587
1588    fn invalidate_serialization(
1589        &self,
1590        task_id: TaskId,
1591        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1592    ) {
1593        if task_id.is_transient() {
1594            return;
1595        }
1596        let mut ctx = self.execute_context(turbo_tasks);
1597        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1598        task.invalidate_serialization();
1599    }
1600
1601    fn get_task_description(&self, task_id: TaskId) -> String {
1602        self.lookup_task_type(task_id).map_or_else(
1603            || format!("{task_id:?} transient"),
1604            |task_type| task_type.to_string(),
1605        )
1606    }
1607
1608    fn task_execution_canceled(
1609        &self,
1610        task_id: TaskId,
1611        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1612    ) {
1613        let mut ctx = self.execute_context(turbo_tasks);
1614        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1615        if let Some(in_progress) = task.take_in_progress() {
1616            match in_progress {
1617                InProgressState::Scheduled {
1618                    done_event,
1619                    reason: _,
1620                } => done_event.notify(usize::MAX),
1621                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1622                    done_event.notify(usize::MAX)
1623                }
1624                InProgressState::Canceled => {}
1625            }
1626        }
1627        let old = task.set_in_progress(InProgressState::Canceled);
1628        debug_assert!(old.is_none(), "InProgress already exists");
1629    }
1630
1631    fn try_start_task_execution(
1632        &self,
1633        task_id: TaskId,
1634        priority: TaskPriority,
1635        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1636    ) -> Option<TaskExecutionSpec<'_>> {
1637        enum TaskType {
1638            Cached(Arc<CachedTaskType>),
1639            Transient(Arc<TransientTask>),
1640        }
1641        let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1642            (TaskType::Cached(task_type), false)
1643        } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1644            (
1645                TaskType::Transient(task_type.clone()),
1646                matches!(**task_type, TransientTask::Once(_)),
1647            )
1648        } else {
1649            return None;
1650        };
1651        let execution_reason;
1652        {
1653            let mut ctx = self.execute_context(turbo_tasks);
1654            let mut task = ctx.task(task_id, TaskDataCategory::All);
1655            if let Some(tasks) = task.prefetch() {
1656                drop(task);
1657                ctx.prepare_tasks(tasks);
1658                task = ctx.task(task_id, TaskDataCategory::All);
1659            }
1660            let in_progress = task.take_in_progress()?;
1661            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1662                let old = task.set_in_progress(in_progress);
1663                debug_assert!(old.is_none(), "InProgress already exists");
1664                return None;
1665            };
1666            execution_reason = reason;
1667            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1668                InProgressStateInner {
1669                    stale: false,
1670                    once_task,
1671                    done_event,
1672                    session_dependent: false,
1673                    marked_as_completed: false,
1674                    new_children: Default::default(),
1675                },
1676            )));
1677            debug_assert!(old.is_none(), "InProgress already exists");
1678
1679            // Make all current collectibles outdated (remove left-over outdated collectibles)
1680            enum Collectible {
1681                Current(CollectibleRef, i32),
1682                Outdated(CollectibleRef),
1683            }
1684            let collectibles = task
1685                .iter_collectibles()
1686                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1687                .chain(
1688                    task.iter_outdated_collectibles()
1689                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1690                )
1691                .collect::<Vec<_>>();
1692            for collectible in collectibles {
1693                match collectible {
1694                    Collectible::Current(collectible, value) => {
1695                        let _ = task.insert_outdated_collectible(collectible, value);
1696                    }
1697                    Collectible::Outdated(collectible) => {
1698                        if task
1699                            .collectibles()
1700                            .is_none_or(|m| m.get(&collectible).is_none())
1701                        {
1702                            task.remove_outdated_collectibles(&collectible);
1703                        }
1704                    }
1705                }
1706            }
1707
1708            if self.should_track_dependencies() {
1709                // Make all dependencies outdated
1710                let cell_dependencies = task.iter_cell_dependencies().collect();
1711                task.set_outdated_cell_dependencies(cell_dependencies);
1712
1713                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1714                task.set_outdated_output_dependencies(outdated_output_dependencies);
1715            }
1716        }
1717
1718        let (span, future) = match task_type {
1719            TaskType::Cached(task_type) => {
1720                let CachedTaskType {
1721                    native_fn,
1722                    this,
1723                    arg,
1724                } = &*task_type;
1725                (
1726                    native_fn.span(task_id.persistence(), execution_reason, priority),
1727                    native_fn.execute(*this, &**arg),
1728                )
1729            }
1730            TaskType::Transient(task_type) => {
1731                let span = tracing::trace_span!("turbo_tasks::root_task");
1732                let future = match &*task_type {
1733                    TransientTask::Root(f) => f(),
1734                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1735                };
1736                (span, future)
1737            }
1738        };
1739        Some(TaskExecutionSpec { future, span })
1740    }
1741
1742    fn task_execution_completed(
1743        &self,
1744        task_id: TaskId,
1745        result: Result<RawVc, TurboTasksExecutionError>,
1746        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1747        has_invalidator: bool,
1748        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1749    ) -> bool {
1750        // Task completion is a 4 step process:
1751        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1752        //    aggregation number of the task and the new children.
1753        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1754        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1755        // 4. Shrink the task memory to reduce footprint of the task.
1756
1757        // Due to persistence it is possible that the process is cancelled after any step. This is
1758        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1759        // in-memory representation.
1760
1761        // The task might be invalidated during this process, so we need to check the stale flag
1762        // at the start of every step.
1763
1764        #[cfg(not(feature = "trace_task_details"))]
1765        let span = tracing::trace_span!(
1766            "task execution completed",
1767            new_children = tracing::field::Empty
1768        )
1769        .entered();
1770        #[cfg(feature = "trace_task_details")]
1771        let span = tracing::trace_span!(
1772            "task execution completed",
1773            task_id = display(task_id),
1774            result = match result.as_ref() {
1775                Ok(value) => display(either::Either::Left(value)),
1776                Err(err) => display(either::Either::Right(err)),
1777            },
1778            new_children = tracing::field::Empty,
1779            immutable = tracing::field::Empty,
1780            new_output = tracing::field::Empty,
1781            output_dependents = tracing::field::Empty,
1782            stale = tracing::field::Empty,
1783        )
1784        .entered();
1785
1786        let is_error = result.is_err();
1787
1788        let mut ctx = self.execute_context(turbo_tasks);
1789
1790        let Some(TaskExecutionCompletePrepareResult {
1791            new_children,
1792            is_now_immutable,
1793            #[cfg(feature = "verify_determinism")]
1794            no_output_set,
1795            new_output,
1796            output_dependent_tasks,
1797        }) = self.task_execution_completed_prepare(
1798            &mut ctx,
1799            #[cfg(feature = "trace_task_details")]
1800            &span,
1801            task_id,
1802            result,
1803            cell_counters,
1804            has_invalidator,
1805        )
1806        else {
1807            // Task was stale and has been rescheduled
1808            #[cfg(feature = "trace_task_details")]
1809            span.record("stale", "prepare");
1810            return true;
1811        };
1812
1813        #[cfg(feature = "trace_task_details")]
1814        span.record("new_output", new_output.is_some());
1815        #[cfg(feature = "trace_task_details")]
1816        span.record("output_dependents", output_dependent_tasks.len());
1817
1818        // When restoring from filesystem cache the following might not be executed (since we can
1819        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1820        // would be executed again.
1821
1822        if !output_dependent_tasks.is_empty() {
1823            self.task_execution_completed_invalidate_output_dependent(
1824                &mut ctx,
1825                task_id,
1826                output_dependent_tasks,
1827            );
1828        }
1829
1830        let has_new_children = !new_children.is_empty();
1831        span.record("new_children", new_children.len());
1832
1833        if has_new_children {
1834            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1835        }
1836
1837        if has_new_children
1838            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1839        {
1840            // Task was stale and has been rescheduled
1841            #[cfg(feature = "trace_task_details")]
1842            span.record("stale", "connect");
1843            return true;
1844        }
1845
1846        let (stale, in_progress_cells) = self.task_execution_completed_finish(
1847            &mut ctx,
1848            task_id,
1849            #[cfg(feature = "verify_determinism")]
1850            no_output_set,
1851            new_output,
1852            is_now_immutable,
1853        );
1854        if stale {
1855            // Task was stale and has been rescheduled
1856            #[cfg(feature = "trace_task_details")]
1857            span.record("stale", "finish");
1858            return true;
1859        }
1860
1861        let removed_data =
1862            self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
1863
1864        // Drop data outside of critical sections
1865        drop(removed_data);
1866        drop(in_progress_cells);
1867
1868        false
1869    }
1870
1871    fn task_execution_completed_prepare(
1872        &self,
1873        ctx: &mut impl ExecuteContext<'_>,
1874        #[cfg(feature = "trace_task_details")] span: &Span,
1875        task_id: TaskId,
1876        result: Result<RawVc, TurboTasksExecutionError>,
1877        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1878        has_invalidator: bool,
1879    ) -> Option<TaskExecutionCompletePrepareResult> {
1880        let mut task = ctx.task(task_id, TaskDataCategory::All);
1881        let Some(in_progress) = task.get_in_progress_mut() else {
1882            panic!("Task execution completed, but task is not in progress: {task:#?}");
1883        };
1884        if matches!(in_progress, InProgressState::Canceled) {
1885            return Some(TaskExecutionCompletePrepareResult {
1886                new_children: Default::default(),
1887                is_now_immutable: false,
1888                #[cfg(feature = "verify_determinism")]
1889                no_output_set: false,
1890                new_output: None,
1891                output_dependent_tasks: Default::default(),
1892            });
1893        }
1894        let &mut InProgressState::InProgress(box InProgressStateInner {
1895            stale,
1896            ref mut new_children,
1897            session_dependent,
1898            ..
1899        }) = in_progress
1900        else {
1901            panic!("Task execution completed, but task is not in progress: {task:#?}");
1902        };
1903
1904        // If the task is stale, reschedule it
1905        #[cfg(not(feature = "no_fast_stale"))]
1906        if stale {
1907            let Some(InProgressState::InProgress(box InProgressStateInner {
1908                done_event,
1909                mut new_children,
1910                ..
1911            })) = task.take_in_progress()
1912            else {
1913                unreachable!();
1914            };
1915            let old = task.set_in_progress(InProgressState::Scheduled {
1916                done_event,
1917                reason: TaskExecutionReason::Stale,
1918            });
1919            debug_assert!(old.is_none(), "InProgress already exists");
1920            // Remove old children from new_children to leave only the children that had their
1921            // active count increased
1922            for task in task.iter_children() {
1923                new_children.remove(&task);
1924            }
1925            drop(task);
1926
1927            // We need to undo the active count increase for the children since we throw away the
1928            // new_children list now.
1929            AggregationUpdateQueue::run(
1930                AggregationUpdateJob::DecreaseActiveCounts {
1931                    task_ids: new_children.into_iter().collect(),
1932                },
1933                ctx,
1934            );
1935            return None;
1936        }
1937
1938        // take the children from the task to process them
1939        let mut new_children = take(new_children);
1940
1941        // handle has_invalidator
1942        if has_invalidator {
1943            task.set_invalidator(true);
1944        }
1945
1946        // handle cell counters: update max index and remove cells that are no longer used
1947        let old_counters: FxHashMap<_, _> = task
1948            .iter_cell_type_max_index()
1949            .map(|(&k, &v)| (k, v))
1950            .collect();
1951        let mut counters_to_remove = old_counters.clone();
1952
1953        for (&cell_type, &max_index) in cell_counters.iter() {
1954            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1955                if old_max_index != max_index {
1956                    task.insert_cell_type_max_index(cell_type, max_index);
1957                }
1958            } else {
1959                task.insert_cell_type_max_index(cell_type, max_index);
1960            }
1961        }
1962        for (cell_type, _) in counters_to_remove {
1963            task.remove_cell_type_max_index(&cell_type);
1964        }
1965
1966        let mut queue = AggregationUpdateQueue::new();
1967
1968        let mut old_edges = Vec::new();
1969
1970        let has_children = !new_children.is_empty();
1971        let is_immutable = task.immutable();
1972        let task_dependencies_for_immutable =
1973            // Task was previously marked as immutable
1974            if !is_immutable
1975            // Task is not session dependent (session dependent tasks can change between sessions)
1976            && !session_dependent
1977            // Task has no invalidator
1978            && !task.invalidator()
1979            // Task has no dependencies on collectibles
1980            && task.is_collectibles_dependencies_empty()
1981        {
1982            Some(
1983                // Collect all dependencies on tasks to check if all dependencies are immutable
1984                task.iter_output_dependencies()
1985                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
1986                    .collect::<FxHashSet<_>>(),
1987            )
1988        } else {
1989            None
1990        };
1991
1992        if has_children {
1993            // Prepare all new children
1994            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1995
1996            // Filter actual new children
1997            old_edges.extend(
1998                task.iter_children()
1999                    .filter(|task| !new_children.remove(task))
2000                    .map(OutdatedEdge::Child),
2001            );
2002        } else {
2003            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2004        }
2005
2006        old_edges.extend(
2007            task.iter_outdated_collectibles()
2008                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2009        );
2010
2011        if self.should_track_dependencies() {
2012            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2013            // At execution start, active deps are copied to outdated as a "before" snapshot.
2014            // During execution, new deps are added to active.
2015            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2016            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2017            // breaking dependency tracking.
2018            old_edges.extend(
2019                task.iter_outdated_cell_dependencies()
2020                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2021            );
2022            old_edges.extend(
2023                task.iter_outdated_output_dependencies()
2024                    .map(OutdatedEdge::OutputDependency),
2025            );
2026        }
2027
2028        // Check if output need to be updated
2029        let current_output = task.get_output();
2030        #[cfg(feature = "verify_determinism")]
2031        let no_output_set = current_output.is_none();
2032        let new_output = match result {
2033            Ok(RawVc::TaskOutput(output_task_id)) => {
2034                if let Some(OutputValue::Output(current_task_id)) = current_output
2035                    && *current_task_id == output_task_id
2036                {
2037                    None
2038                } else {
2039                    Some(OutputValue::Output(output_task_id))
2040                }
2041            }
2042            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2043                if let Some(OutputValue::Cell(CellRef {
2044                    task: current_task_id,
2045                    cell: current_cell,
2046                })) = current_output
2047                    && *current_task_id == output_task_id
2048                    && *current_cell == cell
2049                {
2050                    None
2051                } else {
2052                    Some(OutputValue::Cell(CellRef {
2053                        task: output_task_id,
2054                        cell,
2055                    }))
2056                }
2057            }
2058            Ok(RawVc::LocalOutput(..)) => {
2059                panic!("Non-local tasks must not return a local Vc");
2060            }
2061            Err(err) => {
2062                if let Some(OutputValue::Error(old_error)) = current_output
2063                    && old_error == &err
2064                {
2065                    None
2066                } else {
2067                    Some(OutputValue::Error(err))
2068                }
2069            }
2070        };
2071        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2072        // When output has changed, grab the dependent tasks
2073        if new_output.is_some() && ctx.should_track_dependencies() {
2074            output_dependent_tasks = task.iter_output_dependent().collect();
2075        }
2076
2077        drop(task);
2078
2079        // Check if the task can be marked as immutable
2080        let mut is_now_immutable = false;
2081        if let Some(dependencies) = task_dependencies_for_immutable
2082            && dependencies
2083                .iter()
2084                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Meta).immutable())
2085        {
2086            is_now_immutable = true;
2087        }
2088        #[cfg(feature = "trace_task_details")]
2089        span.record("immutable", is_immutable || is_now_immutable);
2090
2091        if !queue.is_empty() || !old_edges.is_empty() {
2092            #[cfg(feature = "trace_task_completion")]
2093            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2094            // Remove outdated edges first, before removing in_progress+dirty flag.
2095            // We need to make sure all outdated edges are removed before the task can potentially
2096            // be scheduled and executed again
2097            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2098        }
2099
2100        Some(TaskExecutionCompletePrepareResult {
2101            new_children,
2102            is_now_immutable,
2103            #[cfg(feature = "verify_determinism")]
2104            no_output_set,
2105            new_output,
2106            output_dependent_tasks,
2107        })
2108    }
2109
2110    fn task_execution_completed_invalidate_output_dependent(
2111        &self,
2112        ctx: &mut impl ExecuteContext<'_>,
2113        task_id: TaskId,
2114        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2115    ) {
2116        debug_assert!(!output_dependent_tasks.is_empty());
2117
2118        if output_dependent_tasks.len() > 1 {
2119            ctx.prepare_tasks(
2120                output_dependent_tasks
2121                    .iter()
2122                    .map(|&id| (id, TaskDataCategory::All)),
2123            );
2124        }
2125
2126        fn process_output_dependents(
2127            ctx: &mut impl ExecuteContext<'_>,
2128            task_id: TaskId,
2129            dependent_task_id: TaskId,
2130            queue: &mut AggregationUpdateQueue,
2131        ) {
2132            #[cfg(feature = "trace_task_output_dependencies")]
2133            let span = tracing::trace_span!(
2134                "invalidate output dependency",
2135                task = %task_id,
2136                dependent_task = %dependent_task_id,
2137                result = tracing::field::Empty,
2138            )
2139            .entered();
2140            if ctx.is_once_task(dependent_task_id) {
2141                // once tasks are never invalidated
2142                #[cfg(feature = "trace_task_output_dependencies")]
2143                span.record("result", "once task");
2144                return;
2145            }
2146            let mut make_stale = true;
2147            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2148            if dependent.outdated_output_dependencies_contains(&task_id) {
2149                #[cfg(feature = "trace_task_output_dependencies")]
2150                span.record("result", "outdated dependency");
2151                // output dependency is outdated, so it hasn't read the output yet
2152                // and doesn't need to be invalidated
2153                // But importantly we still need to make the task dirty as it should no longer
2154                // be considered as "recomputation".
2155                make_stale = false;
2156            } else if !dependent.output_dependencies_contains(&task_id) {
2157                // output dependency has been removed, so the task doesn't depend on the
2158                // output anymore and doesn't need to be invalidated
2159                #[cfg(feature = "trace_task_output_dependencies")]
2160                span.record("result", "no backward dependency");
2161                return;
2162            }
2163            make_task_dirty_internal(
2164                dependent,
2165                dependent_task_id,
2166                make_stale,
2167                #[cfg(feature = "trace_task_dirty")]
2168                TaskDirtyCause::OutputChange { task_id },
2169                queue,
2170                ctx,
2171            );
2172            #[cfg(feature = "trace_task_output_dependencies")]
2173            span.record("result", "marked dirty");
2174        }
2175
2176        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2177            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2178            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2179            let _ = scope_and_block(chunks.len(), |scope| {
2180                for chunk in chunks {
2181                    let child_ctx = ctx.child_context();
2182                    scope.spawn(move || {
2183                        let mut ctx = child_ctx.create();
2184                        let mut queue = AggregationUpdateQueue::new();
2185                        for dependent_task_id in chunk {
2186                            process_output_dependents(
2187                                &mut ctx,
2188                                task_id,
2189                                dependent_task_id,
2190                                &mut queue,
2191                            )
2192                        }
2193                        queue.execute(&mut ctx);
2194                    });
2195                }
2196            });
2197        } else {
2198            let mut queue = AggregationUpdateQueue::new();
2199            for dependent_task_id in output_dependent_tasks {
2200                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2201            }
2202            queue.execute(ctx);
2203        }
2204    }
2205
2206    fn task_execution_completed_unfinished_children_dirty(
2207        &self,
2208        ctx: &mut impl ExecuteContext<'_>,
2209        new_children: &FxHashSet<TaskId>,
2210    ) {
2211        debug_assert!(!new_children.is_empty());
2212
2213        let mut queue = AggregationUpdateQueue::new();
2214        ctx.for_each_task_meta(new_children.iter().copied(), |child_task, ctx| {
2215            if !child_task.has_output() {
2216                let child_id = child_task.id();
2217                make_task_dirty_internal(
2218                    child_task,
2219                    child_id,
2220                    false,
2221                    #[cfg(feature = "trace_task_dirty")]
2222                    TaskDirtyCause::InitialDirty,
2223                    &mut queue,
2224                    ctx,
2225                );
2226            }
2227        });
2228
2229        queue.execute(ctx);
2230    }
2231
2232    fn task_execution_completed_connect(
2233        &self,
2234        ctx: &mut impl ExecuteContext<'_>,
2235        task_id: TaskId,
2236        new_children: FxHashSet<TaskId>,
2237    ) -> bool {
2238        debug_assert!(!new_children.is_empty());
2239
2240        let mut task = ctx.task(task_id, TaskDataCategory::All);
2241        let Some(in_progress) = task.get_in_progress() else {
2242            panic!("Task execution completed, but task is not in progress: {task:#?}");
2243        };
2244        if matches!(in_progress, InProgressState::Canceled) {
2245            // Task was canceled in the meantime, so we don't connect the children
2246            return false;
2247        }
2248        let InProgressState::InProgress(box InProgressStateInner {
2249            #[cfg(not(feature = "no_fast_stale"))]
2250            stale,
2251            ..
2252        }) = in_progress
2253        else {
2254            panic!("Task execution completed, but task is not in progress: {task:#?}");
2255        };
2256
2257        // If the task is stale, reschedule it
2258        #[cfg(not(feature = "no_fast_stale"))]
2259        if *stale {
2260            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2261                task.take_in_progress()
2262            else {
2263                unreachable!();
2264            };
2265            let old = task.set_in_progress(InProgressState::Scheduled {
2266                done_event,
2267                reason: TaskExecutionReason::Stale,
2268            });
2269            debug_assert!(old.is_none(), "InProgress already exists");
2270            drop(task);
2271
2272            // All `new_children` are currently hold active with an active count and we need to undo
2273            // that. (We already filtered out the old children from that list)
2274            AggregationUpdateQueue::run(
2275                AggregationUpdateJob::DecreaseActiveCounts {
2276                    task_ids: new_children.into_iter().collect(),
2277                },
2278                ctx,
2279            );
2280            return true;
2281        }
2282
2283        let has_active_count = ctx.should_track_activeness()
2284            && task
2285                .get_activeness()
2286                .is_some_and(|activeness| activeness.active_counter > 0);
2287        connect_children(
2288            ctx,
2289            task_id,
2290            task,
2291            new_children,
2292            has_active_count,
2293            ctx.should_track_activeness(),
2294        );
2295
2296        false
2297    }
2298
2299    fn task_execution_completed_finish(
2300        &self,
2301        ctx: &mut impl ExecuteContext<'_>,
2302        task_id: TaskId,
2303        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2304        new_output: Option<OutputValue>,
2305        is_now_immutable: bool,
2306    ) -> (
2307        bool,
2308        Option<
2309            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2310        >,
2311    ) {
2312        let mut task = ctx.task(task_id, TaskDataCategory::All);
2313        let Some(in_progress) = task.take_in_progress() else {
2314            panic!("Task execution completed, but task is not in progress: {task:#?}");
2315        };
2316        if matches!(in_progress, InProgressState::Canceled) {
2317            // Task was canceled in the meantime, so we don't finish it
2318            return (false, None);
2319        }
2320        let InProgressState::InProgress(box InProgressStateInner {
2321            done_event,
2322            once_task: _,
2323            stale,
2324            session_dependent,
2325            marked_as_completed: _,
2326            new_children,
2327        }) = in_progress
2328        else {
2329            panic!("Task execution completed, but task is not in progress: {task:#?}");
2330        };
2331        debug_assert!(new_children.is_empty());
2332
2333        // If the task is stale, reschedule it
2334        if stale {
2335            let old = task.set_in_progress(InProgressState::Scheduled {
2336                done_event,
2337                reason: TaskExecutionReason::Stale,
2338            });
2339            debug_assert!(old.is_none(), "InProgress already exists");
2340            return (true, None);
2341        }
2342
2343        // Set the output if it has changed
2344        let mut old_content = None;
2345        if let Some(value) = new_output {
2346            old_content = task.set_output(value);
2347        }
2348
2349        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2350        // to be invalidated and we can mark it as immutable.
2351        if is_now_immutable {
2352            task.set_immutable(true);
2353        }
2354
2355        // Notify in progress cells and remove all of them
2356        let in_progress_cells = task.take_in_progress_cells();
2357        if let Some(ref cells) = in_progress_cells {
2358            for state in cells.values() {
2359                state.event.notify(usize::MAX);
2360            }
2361        }
2362
2363        // Grab the old dirty state
2364        let old_dirtyness = task.get_dirty().cloned();
2365        let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2366            None => (false, false),
2367            Some(Dirtyness::Dirty(_)) => (true, false),
2368            Some(Dirtyness::SessionDependent) => {
2369                let clean_in_current_session = task.current_session_clean();
2370                (true, clean_in_current_session)
2371            }
2372        };
2373
2374        // Compute the new dirty state
2375        let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2376            (Some(Dirtyness::SessionDependent), true, true)
2377        } else {
2378            (None, false, false)
2379        };
2380
2381        // Update the dirty state
2382        if old_dirtyness != new_dirtyness {
2383            if let Some(value) = new_dirtyness {
2384                task.set_dirty(value);
2385            } else if old_dirtyness.is_some() {
2386                task.take_dirty();
2387            }
2388        }
2389        if old_current_session_self_clean != new_current_session_self_clean {
2390            if new_current_session_self_clean {
2391                task.set_current_session_clean(true);
2392            } else if old_current_session_self_clean {
2393                task.set_current_session_clean(false);
2394            }
2395        }
2396
2397        // Propagate dirtyness changes
2398        let data_update = if old_self_dirty != new_self_dirty
2399            || old_current_session_self_clean != new_current_session_self_clean
2400        {
2401            let dirty_container_count = task
2402                .get_aggregated_dirty_container_count()
2403                .cloned()
2404                .unwrap_or_default();
2405            let current_session_clean_container_count = task
2406                .get_aggregated_current_session_clean_container_count()
2407                .copied()
2408                .unwrap_or_default();
2409            let result = ComputeDirtyAndCleanUpdate {
2410                old_dirty_container_count: dirty_container_count,
2411                new_dirty_container_count: dirty_container_count,
2412                old_current_session_clean_container_count: current_session_clean_container_count,
2413                new_current_session_clean_container_count: current_session_clean_container_count,
2414                old_self_dirty,
2415                new_self_dirty,
2416                old_current_session_self_clean,
2417                new_current_session_self_clean,
2418            }
2419            .compute();
2420            if result.dirty_count_update - result.current_session_clean_update < 0 {
2421                // The task is clean now
2422                if let Some(activeness_state) = task.get_activeness_mut() {
2423                    activeness_state.all_clean_event.notify(usize::MAX);
2424                    activeness_state.unset_active_until_clean();
2425                    if activeness_state.is_empty() {
2426                        task.take_activeness();
2427                    }
2428                }
2429            }
2430            result
2431                .aggregated_update(task_id)
2432                .and_then(|aggregated_update| {
2433                    AggregationUpdateJob::data_update(&mut task, aggregated_update)
2434                })
2435        } else {
2436            None
2437        };
2438
2439        #[cfg(feature = "verify_determinism")]
2440        let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2441        #[cfg(not(feature = "verify_determinism"))]
2442        let reschedule = false;
2443        if reschedule {
2444            let old = task.set_in_progress(InProgressState::Scheduled {
2445                done_event,
2446                reason: TaskExecutionReason::Stale,
2447            });
2448            debug_assert!(old.is_none(), "InProgress already exists");
2449            drop(task);
2450        } else {
2451            drop(task);
2452
2453            // Notify dependent tasks that are waiting for this task to finish
2454            done_event.notify(usize::MAX);
2455        }
2456
2457        drop(old_content);
2458
2459        if let Some(data_update) = data_update {
2460            AggregationUpdateQueue::run(data_update, ctx);
2461        }
2462
2463        // We return so the data can be dropped outside of critical sections
2464        (reschedule, in_progress_cells)
2465    }
2466
2467    fn task_execution_completed_cleanup(
2468        &self,
2469        ctx: &mut impl ExecuteContext<'_>,
2470        task_id: TaskId,
2471        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2472        is_error: bool,
2473    ) -> Vec<SharedReference> {
2474        let mut task = ctx.task(task_id, TaskDataCategory::All);
2475        let mut removed_cell_data = Vec::new();
2476        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2477        // after an error as it is likely transient and we want to keep the dependent tasks
2478        // clean to avoid re-executions.
2479        if !is_error {
2480            // Remove no longer existing cells and
2481            // find all outdated data items (removed cells, outdated edges)
2482            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2483            // anyway and we want to avoid needless re-executions. When the cells become
2484            // used again, they are invalidated from the update cell operation.
2485            // Remove cell data for cells that no longer exist
2486            let to_remove_persistent: Vec<_> = task
2487                .iter_persistent_cell_data()
2488                .filter_map(|(cell, _)| {
2489                    cell_counters
2490                        .get(&cell.type_id)
2491                        .is_none_or(|start_index| cell.index >= *start_index)
2492                        .then_some(*cell)
2493                })
2494                .collect();
2495
2496            // Remove transient cell data for cells that no longer exist
2497            let to_remove_transient: Vec<_> = task
2498                .iter_transient_cell_data()
2499                .filter_map(|(cell, _)| {
2500                    cell_counters
2501                        .get(&cell.type_id)
2502                        .is_none_or(|start_index| cell.index >= *start_index)
2503                        .then_some(*cell)
2504                })
2505                .collect();
2506            removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2507            for cell in to_remove_persistent {
2508                if let Some(data) = task.remove_persistent_cell_data(&cell) {
2509                    removed_cell_data.push(data.into_untyped());
2510                }
2511            }
2512            for cell in to_remove_transient {
2513                if let Some(data) = task.remove_transient_cell_data(&cell) {
2514                    removed_cell_data.push(data);
2515                }
2516            }
2517        }
2518
2519        // Shrink memory usage for collections that are only mutated during/after execution
2520        task.shrink_persistent_cell_data();
2521        task.shrink_transient_cell_data();
2522        task.shrink_cell_type_max_index();
2523        task.shrink_cell_dependencies();
2524        task.shrink_output_dependencies();
2525        task.shrink_collectibles_dependencies();
2526        task.shrink_outdated_cell_dependencies();
2527        task.shrink_outdated_output_dependencies();
2528        task.shrink_outdated_collectibles();
2529        task.shrink_outdated_collectibles_dependencies();
2530        task.shrink_children();
2531        task.shrink_collectibles();
2532        task.shrink_in_progress_cells();
2533
2534        drop(task);
2535
2536        // Return so we can drop outside of critical sections
2537        removed_cell_data
2538    }
2539
2540    fn run_backend_job<'a>(
2541        self: &'a Arc<Self>,
2542        job: TurboTasksBackendJob,
2543        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2544    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2545        Box::pin(async move {
2546            match job {
2547                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2548                    debug_assert!(self.should_persist());
2549
2550                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2551                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2552                    let mut idle_start_listener = self.idle_start_event.listen();
2553                    let mut idle_end_listener = self.idle_end_event.listen();
2554                    let mut fresh_idle = true;
2555                    loop {
2556                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2557                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2558                        let idle_timeout = *IDLE_TIMEOUT;
2559                        let (time, mut reason) =
2560                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2561                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2562                            } else {
2563                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2564                            };
2565
2566                        let until = last_snapshot + time;
2567                        if until > Instant::now() {
2568                            let mut stop_listener = self.stopping_event.listen();
2569                            if self.stopping.load(Ordering::Acquire) {
2570                                return;
2571                            }
2572                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2573                                Instant::now() + idle_timeout
2574                            } else {
2575                                far_future()
2576                            };
2577                            loop {
2578                                tokio::select! {
2579                                    _ = &mut stop_listener => {
2580                                        return;
2581                                    },
2582                                    _ = &mut idle_start_listener => {
2583                                        fresh_idle = true;
2584                                        idle_time = Instant::now() + idle_timeout;
2585                                        idle_start_listener = self.idle_start_event.listen()
2586                                    },
2587                                    _ = &mut idle_end_listener => {
2588                                        idle_time = until + idle_timeout;
2589                                        idle_end_listener = self.idle_end_event.listen()
2590                                    },
2591                                    _ = tokio::time::sleep_until(until) => {
2592                                        break;
2593                                    },
2594                                    _ = tokio::time::sleep_until(idle_time) => {
2595                                        if turbo_tasks.is_idle() {
2596                                            reason = "idle timeout";
2597                                            break;
2598                                        }
2599                                    },
2600                                }
2601                            }
2602                        }
2603
2604                        let this = self.clone();
2605                        let snapshot = this.snapshot_and_persist(None, reason);
2606                        if let Some((snapshot_start, new_data)) = snapshot {
2607                            last_snapshot = snapshot_start;
2608                            if !new_data {
2609                                fresh_idle = false;
2610                                continue;
2611                            }
2612                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2613                            self.last_snapshot.store(
2614                                last_snapshot.as_millis().try_into().unwrap(),
2615                                Ordering::Relaxed,
2616                            );
2617
2618                            turbo_tasks.schedule_backend_background_job(
2619                                TurboTasksBackendJob::FollowUpSnapshot,
2620                            );
2621                            return;
2622                        }
2623                    }
2624                }
2625            }
2626        })
2627    }
2628
2629    fn try_read_own_task_cell(
2630        &self,
2631        task_id: TaskId,
2632        cell: CellId,
2633        options: ReadCellOptions,
2634        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2635    ) -> Result<TypedCellContent> {
2636        let mut ctx = self.execute_context(turbo_tasks);
2637        let task = ctx.task(task_id, TaskDataCategory::Data);
2638        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2639            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2640            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2641        } else {
2642            Ok(CellContent(None).into_typed(cell.type_id))
2643        }
2644    }
2645
2646    fn read_task_collectibles(
2647        &self,
2648        task_id: TaskId,
2649        collectible_type: TraitTypeId,
2650        reader_id: Option<TaskId>,
2651        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2652    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2653        let mut ctx = self.execute_context(turbo_tasks);
2654        let mut collectibles = AutoMap::default();
2655        {
2656            let mut task = ctx.task(task_id, TaskDataCategory::All);
2657            // Ensure it's an root node
2658            loop {
2659                let aggregation_number = get_aggregation_number(&task);
2660                if is_root_node(aggregation_number) {
2661                    break;
2662                }
2663                drop(task);
2664                AggregationUpdateQueue::run(
2665                    AggregationUpdateJob::UpdateAggregationNumber {
2666                        task_id,
2667                        base_aggregation_number: u32::MAX,
2668                        distance: None,
2669                    },
2670                    &mut ctx,
2671                );
2672                task = ctx.task(task_id, TaskDataCategory::All);
2673            }
2674            for (collectible, count) in task.iter_aggregated_collectibles() {
2675                if *count > 0 && collectible.collectible_type == collectible_type {
2676                    *collectibles
2677                        .entry(RawVc::TaskCell(
2678                            collectible.cell.task,
2679                            collectible.cell.cell,
2680                        ))
2681                        .or_insert(0) += 1;
2682                }
2683            }
2684            for (&collectible, &count) in task.iter_collectibles() {
2685                if collectible.collectible_type == collectible_type {
2686                    *collectibles
2687                        .entry(RawVc::TaskCell(
2688                            collectible.cell.task,
2689                            collectible.cell.cell,
2690                        ))
2691                        .or_insert(0) += count;
2692                }
2693            }
2694            if let Some(reader_id) = reader_id {
2695                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2696            }
2697        }
2698        if let Some(reader_id) = reader_id {
2699            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2700            let target = CollectiblesRef {
2701                task: task_id,
2702                collectible_type,
2703            };
2704            if !reader.remove_outdated_collectibles_dependencies(&target) {
2705                let _ = reader.add_collectibles_dependencies(target);
2706            }
2707        }
2708        collectibles
2709    }
2710
2711    fn emit_collectible(
2712        &self,
2713        collectible_type: TraitTypeId,
2714        collectible: RawVc,
2715        task_id: TaskId,
2716        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2717    ) {
2718        self.assert_valid_collectible(task_id, collectible);
2719
2720        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2721            panic!("Collectibles need to be resolved");
2722        };
2723        let cell = CellRef {
2724            task: collectible_task,
2725            cell,
2726        };
2727        operation::UpdateCollectibleOperation::run(
2728            task_id,
2729            CollectibleRef {
2730                collectible_type,
2731                cell,
2732            },
2733            1,
2734            self.execute_context(turbo_tasks),
2735        );
2736    }
2737
2738    fn unemit_collectible(
2739        &self,
2740        collectible_type: TraitTypeId,
2741        collectible: RawVc,
2742        count: u32,
2743        task_id: TaskId,
2744        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2745    ) {
2746        self.assert_valid_collectible(task_id, collectible);
2747
2748        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2749            panic!("Collectibles need to be resolved");
2750        };
2751        let cell = CellRef {
2752            task: collectible_task,
2753            cell,
2754        };
2755        operation::UpdateCollectibleOperation::run(
2756            task_id,
2757            CollectibleRef {
2758                collectible_type,
2759                cell,
2760            },
2761            -(i32::try_from(count).unwrap()),
2762            self.execute_context(turbo_tasks),
2763        );
2764    }
2765
2766    fn update_task_cell(
2767        &self,
2768        task_id: TaskId,
2769        cell: CellId,
2770        is_serializable_cell_content: bool,
2771        content: CellContent,
2772        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2773        verification_mode: VerificationMode,
2774        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2775    ) {
2776        operation::UpdateCellOperation::run(
2777            task_id,
2778            cell,
2779            content,
2780            is_serializable_cell_content,
2781            updated_key_hashes,
2782            verification_mode,
2783            self.execute_context(turbo_tasks),
2784        );
2785    }
2786
2787    fn mark_own_task_as_session_dependent(
2788        &self,
2789        task_id: TaskId,
2790        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2791    ) {
2792        if !self.should_track_dependencies() {
2793            // Without dependency tracking we don't need session dependent tasks
2794            return;
2795        }
2796        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2797        let mut ctx = self.execute_context(turbo_tasks);
2798        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2799        let aggregation_number = get_aggregation_number(&task);
2800        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2801            drop(task);
2802            // We want to use a high aggregation number to avoid large aggregation chains for
2803            // session dependent tasks (which change on every run)
2804            AggregationUpdateQueue::run(
2805                AggregationUpdateJob::UpdateAggregationNumber {
2806                    task_id,
2807                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2808                    distance: None,
2809                },
2810                &mut ctx,
2811            );
2812            task = ctx.task(task_id, TaskDataCategory::Meta);
2813        }
2814        if let Some(InProgressState::InProgress(box InProgressStateInner {
2815            session_dependent,
2816            ..
2817        })) = task.get_in_progress_mut()
2818        {
2819            *session_dependent = true;
2820        }
2821    }
2822
2823    fn mark_own_task_as_finished(
2824        &self,
2825        task: TaskId,
2826        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2827    ) {
2828        let mut ctx = self.execute_context(turbo_tasks);
2829        let mut task = ctx.task(task, TaskDataCategory::Data);
2830        if let Some(InProgressState::InProgress(box InProgressStateInner {
2831            marked_as_completed,
2832            ..
2833        })) = task.get_in_progress_mut()
2834        {
2835            *marked_as_completed = true;
2836            // TODO this should remove the dirty state (also check session_dependent)
2837            // but this would break some assumptions for strongly consistent reads.
2838            // Client tasks are not connected yet, so we wouldn't wait for them.
2839            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
2840        }
2841    }
2842
2843    fn set_own_task_aggregation_number(
2844        &self,
2845        task: TaskId,
2846        aggregation_number: u32,
2847        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2848    ) {
2849        let mut ctx = self.execute_context(turbo_tasks);
2850        AggregationUpdateQueue::run(
2851            AggregationUpdateJob::UpdateAggregationNumber {
2852                task_id: task,
2853                base_aggregation_number: aggregation_number,
2854                distance: None,
2855            },
2856            &mut ctx,
2857        );
2858    }
2859
2860    fn connect_task(
2861        &self,
2862        task: TaskId,
2863        parent_task: Option<TaskId>,
2864        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2865    ) {
2866        self.assert_not_persistent_calling_transient(parent_task, task, None);
2867        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2868    }
2869
2870    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2871        let task_id = self.transient_task_id_factory.get();
2872        let root_type = match task_type {
2873            TransientTaskType::Root(_) => RootType::RootTask,
2874            TransientTaskType::Once(_) => RootType::OnceTask,
2875        };
2876        self.transient_tasks.insert(
2877            task_id,
2878            Arc::new(match task_type {
2879                TransientTaskType::Root(f) => TransientTask::Root(f),
2880                TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2881            }),
2882        );
2883        {
2884            let mut task = self.storage.access_mut(task_id);
2885            (*task).init_transient_task(task_id, root_type, self.should_track_activeness());
2886        }
2887        #[cfg(feature = "verify_aggregation_graph")]
2888        self.root_tasks.lock().insert(task_id);
2889        task_id
2890    }
2891
2892    fn dispose_root_task(
2893        &self,
2894        task_id: TaskId,
2895        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2896    ) {
2897        #[cfg(feature = "verify_aggregation_graph")]
2898        self.root_tasks.lock().remove(&task_id);
2899
2900        let mut ctx = self.execute_context(turbo_tasks);
2901        let mut task = ctx.task(task_id, TaskDataCategory::All);
2902        let is_dirty = task.is_dirty();
2903        let has_dirty_containers = task.has_dirty_containers();
2904        if is_dirty.is_some() || has_dirty_containers {
2905            if let Some(activeness_state) = task.get_activeness_mut() {
2906                // We will finish the task, but it would be removed after the task is done
2907                activeness_state.unset_root_type();
2908                activeness_state.set_active_until_clean();
2909            };
2910        } else if let Some(activeness_state) = task.take_activeness() {
2911            // Technically nobody should be listening to this event, but just in case
2912            // we notify it anyway
2913            activeness_state.all_clean_event.notify(usize::MAX);
2914        }
2915    }
2916
2917    #[cfg(feature = "verify_aggregation_graph")]
2918    fn verify_aggregation_graph(
2919        &self,
2920        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2921        idle: bool,
2922    ) {
2923        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2924            return;
2925        }
2926        use std::{collections::VecDeque, env, io::stdout};
2927
2928        use crate::backend::operation::{get_uppers, is_aggregating_node};
2929
2930        let mut ctx = self.execute_context(turbo_tasks);
2931        let root_tasks = self.root_tasks.lock().clone();
2932
2933        for task_id in root_tasks.into_iter() {
2934            let mut queue = VecDeque::new();
2935            let mut visited = FxHashSet::default();
2936            let mut aggregated_nodes = FxHashSet::default();
2937            let mut collectibles = FxHashMap::default();
2938            let root_task_id = task_id;
2939            visited.insert(task_id);
2940            aggregated_nodes.insert(task_id);
2941            queue.push_back(task_id);
2942            let mut counter = 0;
2943            while let Some(task_id) = queue.pop_front() {
2944                counter += 1;
2945                if counter % 100000 == 0 {
2946                    println!(
2947                        "queue={}, visited={}, aggregated_nodes={}",
2948                        queue.len(),
2949                        visited.len(),
2950                        aggregated_nodes.len()
2951                    );
2952                }
2953                let task = ctx.task(task_id, TaskDataCategory::All);
2954                if idle && !self.is_idle.load(Ordering::Relaxed) {
2955                    return;
2956                }
2957
2958                let uppers = get_uppers(&task);
2959                if task_id != root_task_id
2960                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2961                {
2962                    panic!(
2963                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2964                         {:?})",
2965                        task_id,
2966                        ctx.get_task_description(task_id),
2967                        uppers
2968                    );
2969                }
2970
2971                let aggregated_collectibles: Vec<_> = task
2972                    .iter_aggregated_collectibles()
2973                    .map(|(collectible, _)| collectible)
2974                    .collect();
2975                for collectible in aggregated_collectibles {
2976                    collectibles
2977                        .entry(collectible)
2978                        .or_insert_with(|| (false, Vec::new()))
2979                        .1
2980                        .push(task_id);
2981                }
2982
2983                let own_collectibles: Vec<_> = task
2984                    .iter_collectibles_entries()
2985                    .filter_map(|(&collectible, &value)| (value > 0).then_some(collectible))
2986                    .collect::<Vec<_>>();
2987                for collectible in own_collectibles {
2988                    if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2989                        *flag = true
2990                    } else {
2991                        panic!(
2992                            "Task {} has a collectible {:?} that is not in any upper task",
2993                            task_id, collectible
2994                        );
2995                    }
2996                }
2997
2998                let is_dirty = task.has_dirty();
2999                let has_dirty_container = task.has_dirty_containers();
3000                let should_be_in_upper = is_dirty || has_dirty_container;
3001
3002                let aggregation_number = get_aggregation_number(&task);
3003                if is_aggregating_node(aggregation_number) {
3004                    aggregated_nodes.insert(task_id);
3005                }
3006                // println!(
3007                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3008                //     ctx.get_task_description(task_id),
3009                //     uppers
3010                // );
3011
3012                for child_id in task.iter_children() {
3013                    // println!("{task_id}: child -> {child_id}");
3014                    if visited.insert(child_id) {
3015                        queue.push_back(child_id);
3016                    }
3017                }
3018                drop(task);
3019
3020                if should_be_in_upper {
3021                    for upper_id in uppers {
3022                        let task = ctx.task(upper_id, TaskDataCategory::All);
3023                        let in_upper = task
3024                            .get_aggregated_dirty_containers(&task_id)
3025                            .is_some_and(|&dirty| dirty > 0);
3026                        if !in_upper {
3027                            let containers: Vec<_> = task
3028                                .iter_aggregated_dirty_containers_entries()
3029                                .map(|(&k, &v)| (k, v))
3030                                .collect();
3031                            panic!(
3032                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3033                                 ({})\nThese dirty containers are present:\n{:#?}",
3034                                task_id,
3035                                ctx.get_task_description(task_id),
3036                                upper_id,
3037                                ctx.get_task_description(upper_id),
3038                                containers,
3039                            );
3040                        }
3041                    }
3042                }
3043            }
3044
3045            for (collectible, (flag, task_ids)) in collectibles {
3046                if !flag {
3047                    use std::io::Write;
3048                    let mut stdout = stdout().lock();
3049                    writeln!(
3050                        stdout,
3051                        "{:?} that is not emitted in any child task but in these aggregated \
3052                         tasks: {:#?}",
3053                        collectible,
3054                        task_ids
3055                            .iter()
3056                            .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
3057                            .collect::<Vec<_>>()
3058                    )
3059                    .unwrap();
3060
3061                    let task_id = collectible.cell.task;
3062                    let mut queue = {
3063                        let task = ctx.task(task_id, TaskDataCategory::All);
3064                        get_uppers(&task)
3065                    };
3066                    let mut visited = FxHashSet::default();
3067                    for &upper_id in queue.iter() {
3068                        visited.insert(upper_id);
3069                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3070                    }
3071                    while let Some(task_id) = queue.pop() {
3072                        let desc = ctx.get_task_description(task_id);
3073                        let task = ctx.task(task_id, TaskDataCategory::All);
3074                        let aggregated_collectible = task
3075                            .get_aggregated_collectibles(&collectible)
3076                            .copied()
3077                            .unwrap_or_default();
3078                        let uppers = get_uppers(&task);
3079                        drop(task);
3080                        writeln!(
3081                            stdout,
3082                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3083                        )
3084                        .unwrap();
3085                        if task_ids.contains(&task_id) {
3086                            writeln!(
3087                                stdout,
3088                                "Task has an upper connection to an aggregated task that doesn't \
3089                                 reference it. Upper connection is invalid!"
3090                            )
3091                            .unwrap();
3092                        }
3093                        for upper_id in uppers {
3094                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3095                            if !visited.contains(&upper_id) {
3096                                queue.push(upper_id);
3097                            }
3098                        }
3099                    }
3100                    panic!("See stdout for more details");
3101                }
3102            }
3103        }
3104    }
3105
3106    fn assert_not_persistent_calling_transient(
3107        &self,
3108        parent_id: Option<TaskId>,
3109        child_id: TaskId,
3110        cell_id: Option<CellId>,
3111    ) {
3112        if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
3113            self.panic_persistent_calling_transient(
3114                parent_id
3115                    .and_then(|id| self.lookup_task_type(id))
3116                    .as_deref(),
3117                self.lookup_task_type(child_id).as_deref(),
3118                cell_id,
3119            );
3120        }
3121    }
3122
3123    fn panic_persistent_calling_transient(
3124        &self,
3125        parent: Option<&CachedTaskType>,
3126        child: Option<&CachedTaskType>,
3127        cell_id: Option<CellId>,
3128    ) {
3129        let transient_reason = if let Some(child) = child {
3130            Cow::Owned(format!(
3131                " The callee is transient because it depends on:\n{}",
3132                self.debug_trace_transient_task(child, cell_id),
3133            ))
3134        } else {
3135            Cow::Borrowed("")
3136        };
3137        panic!(
3138            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3139            parent.map_or("unknown", |t| t.get_name()),
3140            child.map_or("unknown", |t| t.get_name()),
3141            transient_reason,
3142        );
3143    }
3144
3145    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3146        // these checks occur in a potentially hot codepath, but they're cheap
3147        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3148            // This should never happen: The collectible APIs use ResolvedVc
3149            let task_info = if let Some(col_task_ty) = collectible
3150                .try_get_task_id()
3151                .and_then(|t| self.lookup_task_type(t))
3152            {
3153                Cow::Owned(format!(" (return type of {col_task_ty})"))
3154            } else {
3155                Cow::Borrowed("")
3156            };
3157            panic!("Collectible{task_info} must be a ResolvedVc")
3158        };
3159        if col_task_id.is_transient() && !task_id.is_transient() {
3160            let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
3161                Cow::Owned(format!(
3162                    ". The collectible is transient because it depends on:\n{}",
3163                    self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3164                ))
3165            } else {
3166                Cow::Borrowed("")
3167            };
3168            // this should never happen: How would a persistent function get a transient Vc?
3169            panic!(
3170                "Collectible is transient, transient collectibles cannot be emitted from \
3171                 persistent tasks{transient_reason}",
3172            )
3173        }
3174    }
3175}
3176
3177impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3178    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3179        self.0.startup(turbo_tasks);
3180    }
3181
3182    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3183        self.0.stopping();
3184    }
3185
3186    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3187        self.0.stop(turbo_tasks);
3188    }
3189
3190    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3191        self.0.idle_start(turbo_tasks);
3192    }
3193
3194    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3195        self.0.idle_end();
3196    }
3197
3198    fn get_or_create_persistent_task(
3199        &self,
3200        task_type: CachedTaskType,
3201        parent_task: Option<TaskId>,
3202        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3203    ) -> TaskId {
3204        self.0
3205            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3206    }
3207
3208    fn get_or_create_transient_task(
3209        &self,
3210        task_type: CachedTaskType,
3211        parent_task: Option<TaskId>,
3212        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3213    ) -> TaskId {
3214        self.0
3215            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3216    }
3217
3218    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3219        self.0.invalidate_task(task_id, turbo_tasks);
3220    }
3221
3222    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3223        self.0.invalidate_tasks(tasks, turbo_tasks);
3224    }
3225
3226    fn invalidate_tasks_set(
3227        &self,
3228        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3229        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3230    ) {
3231        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3232    }
3233
3234    fn invalidate_serialization(
3235        &self,
3236        task_id: TaskId,
3237        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3238    ) {
3239        self.0.invalidate_serialization(task_id, turbo_tasks);
3240    }
3241
3242    fn get_task_description(&self, task: TaskId) -> String {
3243        self.0.get_task_description(task)
3244    }
3245
3246    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3247        self.0.task_execution_canceled(task, turbo_tasks)
3248    }
3249
3250    fn try_start_task_execution(
3251        &self,
3252        task_id: TaskId,
3253        priority: TaskPriority,
3254        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3255    ) -> Option<TaskExecutionSpec<'_>> {
3256        self.0
3257            .try_start_task_execution(task_id, priority, turbo_tasks)
3258    }
3259
3260    fn task_execution_completed(
3261        &self,
3262        task_id: TaskId,
3263        result: Result<RawVc, TurboTasksExecutionError>,
3264        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3265        has_invalidator: bool,
3266        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3267    ) -> bool {
3268        self.0.task_execution_completed(
3269            task_id,
3270            result,
3271            cell_counters,
3272            has_invalidator,
3273            turbo_tasks,
3274        )
3275    }
3276
3277    type BackendJob = TurboTasksBackendJob;
3278
3279    fn run_backend_job<'a>(
3280        &'a self,
3281        job: Self::BackendJob,
3282        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3283    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3284        self.0.run_backend_job(job, turbo_tasks)
3285    }
3286
3287    fn try_read_task_output(
3288        &self,
3289        task_id: TaskId,
3290        reader: Option<TaskId>,
3291        options: ReadOutputOptions,
3292        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3293    ) -> Result<Result<RawVc, EventListener>> {
3294        self.0
3295            .try_read_task_output(task_id, reader, options, turbo_tasks)
3296    }
3297
3298    fn try_read_task_cell(
3299        &self,
3300        task_id: TaskId,
3301        cell: CellId,
3302        reader: Option<TaskId>,
3303        options: ReadCellOptions,
3304        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3305    ) -> Result<Result<TypedCellContent, EventListener>> {
3306        self.0
3307            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3308    }
3309
3310    fn try_read_own_task_cell(
3311        &self,
3312        task_id: TaskId,
3313        cell: CellId,
3314        options: ReadCellOptions,
3315        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3316    ) -> Result<TypedCellContent> {
3317        self.0
3318            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3319    }
3320
3321    fn read_task_collectibles(
3322        &self,
3323        task_id: TaskId,
3324        collectible_type: TraitTypeId,
3325        reader: Option<TaskId>,
3326        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3327    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3328        self.0
3329            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3330    }
3331
3332    fn emit_collectible(
3333        &self,
3334        collectible_type: TraitTypeId,
3335        collectible: RawVc,
3336        task_id: TaskId,
3337        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3338    ) {
3339        self.0
3340            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3341    }
3342
3343    fn unemit_collectible(
3344        &self,
3345        collectible_type: TraitTypeId,
3346        collectible: RawVc,
3347        count: u32,
3348        task_id: TaskId,
3349        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3350    ) {
3351        self.0
3352            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3353    }
3354
3355    fn update_task_cell(
3356        &self,
3357        task_id: TaskId,
3358        cell: CellId,
3359        is_serializable_cell_content: bool,
3360        content: CellContent,
3361        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3362        verification_mode: VerificationMode,
3363        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3364    ) {
3365        self.0.update_task_cell(
3366            task_id,
3367            cell,
3368            is_serializable_cell_content,
3369            content,
3370            updated_key_hashes,
3371            verification_mode,
3372            turbo_tasks,
3373        );
3374    }
3375
3376    fn mark_own_task_as_finished(
3377        &self,
3378        task_id: TaskId,
3379        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3380    ) {
3381        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3382    }
3383
3384    fn set_own_task_aggregation_number(
3385        &self,
3386        task: TaskId,
3387        aggregation_number: u32,
3388        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3389    ) {
3390        self.0
3391            .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3392    }
3393
3394    fn mark_own_task_as_session_dependent(
3395        &self,
3396        task: TaskId,
3397        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3398    ) {
3399        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3400    }
3401
3402    fn connect_task(
3403        &self,
3404        task: TaskId,
3405        parent_task: Option<TaskId>,
3406        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3407    ) {
3408        self.0.connect_task(task, parent_task, turbo_tasks);
3409    }
3410
3411    fn create_transient_task(
3412        &self,
3413        task_type: TransientTaskType,
3414        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3415    ) -> TaskId {
3416        self.0.create_transient_task(task_type)
3417    }
3418
3419    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3420        self.0.dispose_root_task(task_id, turbo_tasks);
3421    }
3422
3423    fn task_statistics(&self) -> &TaskStatisticsApi {
3424        &self.0.task_statistics
3425    }
3426
3427    fn is_tracking_dependencies(&self) -> bool {
3428        self.0.options.dependency_tracking
3429    }
3430}
3431
3432enum DebugTraceTransientTask {
3433    Cached {
3434        task_name: &'static str,
3435        cell_type_id: Option<ValueTypeId>,
3436        cause_self: Option<Box<DebugTraceTransientTask>>,
3437        cause_args: Vec<DebugTraceTransientTask>,
3438    },
3439    /// This representation is used when this task is a duplicate of one previously shown
3440    Collapsed {
3441        task_name: &'static str,
3442        cell_type_id: Option<ValueTypeId>,
3443    },
3444    Uncached {
3445        cell_type_id: Option<ValueTypeId>,
3446    },
3447}
3448
3449impl DebugTraceTransientTask {
3450    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3451        let indent = "    ".repeat(level);
3452        f.write_str(&indent)?;
3453
3454        fn fmt_cell_type_id(
3455            f: &mut fmt::Formatter<'_>,
3456            cell_type_id: Option<ValueTypeId>,
3457        ) -> fmt::Result {
3458            if let Some(ty) = cell_type_id {
3459                write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3460            } else {
3461                Ok(())
3462            }
3463        }
3464
3465        // write the name and type
3466        match self {
3467            Self::Cached {
3468                task_name,
3469                cell_type_id,
3470                ..
3471            }
3472            | Self::Collapsed {
3473                task_name,
3474                cell_type_id,
3475                ..
3476            } => {
3477                f.write_str(task_name)?;
3478                fmt_cell_type_id(f, *cell_type_id)?;
3479                if matches!(self, Self::Collapsed { .. }) {
3480                    f.write_str(" (collapsed)")?;
3481                }
3482            }
3483            Self::Uncached { cell_type_id } => {
3484                f.write_str("unknown transient task")?;
3485                fmt_cell_type_id(f, *cell_type_id)?;
3486            }
3487        }
3488        f.write_char('\n')?;
3489
3490        // write any extra "cause" information we might have
3491        if let Self::Cached {
3492            cause_self,
3493            cause_args,
3494            ..
3495        } = self
3496        {
3497            if let Some(c) = cause_self {
3498                writeln!(f, "{indent}  self:")?;
3499                c.fmt_indented(f, level + 1)?;
3500            }
3501            if !cause_args.is_empty() {
3502                writeln!(f, "{indent}  args:")?;
3503                for c in cause_args {
3504                    c.fmt_indented(f, level + 1)?;
3505                }
3506            }
3507        }
3508        Ok(())
3509    }
3510}
3511
3512impl fmt::Display for DebugTraceTransientTask {
3513    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3514        self.fmt_indented(f, 0)
3515    }
3516}
3517
3518// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3519fn far_future() -> Instant {
3520    // Roughly 30 years from now.
3521    // API does not provide a way to obtain max `Instant`
3522    // or convert specific date in the future to instant.
3523    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3524    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3525}