Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9    borrow::Cow,
10    fmt::{self, Write},
11    future::Future,
12    hash::BuildHasherDefault,
13    mem::take,
14    pin::Pin,
15    sync::{
16        Arc, LazyLock,
17        atomic::{AtomicBool, Ordering},
18    },
19    time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32    CellId, DynTaskInputsStorage, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
33    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
34    TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasks, TurboTasksCallApi,
35    TurboTasksPanic, ValueTypeId,
36    backend::{
37        Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38        TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40        VerificationMode,
41    },
42    event::{Event, EventDescription, EventListener},
43    macro_helpers::NativeFunction,
44    message_queue::{TimingEvent, TraceEvent},
45    registry::get_value_type,
46    scope::scope_and_block,
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51#[cfg(feature = "task_dirty_cause")]
52use turbo_tasks::{FunctionId, TaskDirtyCause};
53
54pub use self::{
55    operation::AnyOperation,
56    storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
57};
58use crate::{
59    backend::{
60        operation::{
61            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64            connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65            prepare_new_children,
66        },
67        snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68        storage::Storage,
69        storage_schema::{TaskStorage, TaskStorageAccessors},
70    },
71    backing_storage::{BackingStorage, SnapshotItem, SnapshotMeta, compute_task_type_hash},
72    data::{
73        ActivenessState, CellDependency, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
74        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, TransientTask,
75    },
76    error::TaskError,
77    utils::{
78        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79        shard_amount::compute_shard_amount,
80    },
81};
82
83/// Threshold for parallelizing making dependent tasks dirty.
84/// If the number of dependent tasks exceeds this threshold,
85/// the operation will be parallelized.
86const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88/// Configurable idle timeout for snapshot persistence.
89/// Defaults to 2 seconds if not set or if the value is invalid.
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92        .ok()
93        .and_then(|v| v.parse::<u64>().ok())
94        .map(Duration::from_millis)
95        .unwrap_or(Duration::from_secs(2))
96});
97
98/// Priority used to re-schedule a task that became stale during execution.
99///
100/// Stale tasks must run again, but at a priority that reflects why they're being re-run rather
101/// than the (likely higher) priority of the original schedule. We use invalidation priority
102/// based on the task's leaf distance, parented under either the task's current dirty priority
103/// or `leaf()` if it is no longer dirty.
104fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105    TaskPriority::invalidation(
106        task.get_leaf_distance()
107            .copied()
108            .unwrap_or_default()
109            .distance,
110    )
111    .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115    /// Queries the storage for cache entries that don't exist locally.
116    ReadOnly,
117    /// Queries the storage for cache entries that don't exist locally.
118    /// Regularly pushes changes to the backing storage.
119    ReadWrite,
120    /// Queries the storage for cache entries that don't exist locally.
121    /// On shutdown, pushes all changes to the backing storage.
122    ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126    /// Enables dependency tracking.
127    ///
128    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
129    /// forever.
130    pub dependency_tracking: bool,
131
132    /// Enables active tracking.
133    ///
134    /// Automatically disabled when `dependency_tracking` is disabled.
135    ///
136    /// When disabled: All tasks are considered as active.
137    pub active_tracking: bool,
138
139    /// Enables the backing storage.
140    pub storage_mode: Option<StorageMode>,
141
142    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
143    /// datastructures. If `None`, it will use the available parallelism.
144    pub num_workers: Option<usize>,
145
146    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
147    pub small_preallocation: bool,
148
149    /// When enabled, evict all evictable tasks from in-memory storage after every snapshot.
150    /// This reclaims memory by clearing persisted data that can be re-loaded from disk on demand.
151    /// This is an EXPERIMENTAL FEATURE under development
152    pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156    fn default() -> Self {
157        Self {
158            dependency_tracking: true,
159            active_tracking: true,
160            storage_mode: Some(StorageMode::ReadWrite),
161            num_workers: None,
162            small_preallocation: false,
163            evict_after_snapshot: false,
164        }
165    }
166}
167
168pub enum TurboTasksBackendJob {
169    Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175    options: BackendOptions,
176
177    start_time: Instant,
178
179    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182    storage: Storage,
183
184    /// Coordinates the operation/snapshot interleaving protocol. See
185    /// [`SnapshotCoordinator`] for details.
186    snapshot_coord: SnapshotCoordinator,
187    /// Serializes calls to `snapshot_and_persist`. The coordinator's
188    /// `begin_snapshot` asserts that snapshots don't overlap; this mutex
189    /// enforces that contract for our two callers (background loop and
190    /// `stop_and_wait`).
191    snapshot_in_progress: Mutex<()>,
192
193    stopping: AtomicBool,
194    stopping_event: Event,
195    idle_start_event: Event,
196    idle_end_event: Event,
197    #[cfg(feature = "verify_aggregation_graph")]
198    is_idle: AtomicBool,
199
200    task_statistics: TaskStatisticsApi,
201
202    backing_storage: B,
203
204    #[cfg(feature = "verify_aggregation_graph")]
205    root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210        Self(Arc::new(TurboTasksBackendInner::new(
211            options,
212            backing_storage,
213        )))
214    }
215
216    pub fn backing_storage(&self) -> &B {
217        &self.0.backing_storage
218    }
219
220    /// Perform a snapshot and then evict all evictable tasks from memory.
221    ///
222    /// This is exposed for integration tests that need to verify the
223    /// snapshot → evict → restore cycle works correctly.
224    ///
225    /// Returns `(snapshot_had_new_data, eviction_counts)`.
226    pub fn snapshot_and_evict_for_testing(
227        &self,
228        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
229    ) -> (bool, EvictionCounts) {
230        self.0.snapshot_and_evict_for_testing(turbo_tasks)
231    }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237        if !options.dependency_tracking {
238            options.active_tracking = false;
239        }
240        let small_preallocation = options.small_preallocation;
241        let next_task_id = backing_storage
242            .next_free_task_id()
243            .expect("Failed to get task id");
244        Self {
245            options,
246            start_time: Instant::now(),
247            persisted_task_id_factory: IdFactoryWithReuse::new(
248                next_task_id,
249                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250            ),
251            transient_task_id_factory: IdFactoryWithReuse::new(
252                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253                TaskId::MAX,
254            ),
255            storage: Storage::new(shard_amount, small_preallocation),
256            snapshot_coord: SnapshotCoordinator::new(),
257            snapshot_in_progress: Mutex::new(()),
258            stopping: AtomicBool::new(false),
259            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262            #[cfg(feature = "verify_aggregation_graph")]
263            is_idle: AtomicBool::new(false),
264            task_statistics: TaskStatisticsApi::default(),
265            backing_storage,
266            #[cfg(feature = "verify_aggregation_graph")]
267            root_tasks: Default::default(),
268        }
269    }
270
271    fn execute_context<'a>(
272        &'a self,
273        turbo_tasks: &'a TurboTasks<TurboTasksBackend<B>>,
274    ) -> impl ExecuteContext<'a> {
275        ExecuteContextImpl::new(self, turbo_tasks)
276    }
277
278    fn suspending_requested(&self) -> bool {
279        self.should_persist() && self.snapshot_coord.snapshot_pending()
280    }
281
282    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283        if self.should_persist() {
284            self.snapshot_coord.suspend_point(suspend);
285        }
286    }
287
288    pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289        if !self.should_persist() {
290            return OperationGuard::noop();
291        }
292        self.snapshot_coord.begin_operation()
293    }
294
295    fn should_persist(&self) -> bool {
296        matches!(
297            self.options.storage_mode,
298            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299        )
300    }
301
302    fn should_evict(&self) -> bool {
303        self.options.evict_after_snapshot && self.should_persist()
304    }
305
306    /// Perform a snapshot and then evict all evictable tasks from memory.
307    ///
308    /// This is exposed for integration tests that need to verify the
309    /// snapshot → evict → restore cycle works correctly.
310    ///
311    /// Returns `(snapshot_had_new_data, eviction_counts)`.
312    #[doc(hidden)]
313    pub fn snapshot_and_evict_for_testing(
314        &self,
315        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
316    ) -> (bool, EvictionCounts) {
317        assert!(
318            self.should_persist(),
319            "snapshot_and_evict requires persistence"
320        );
321        let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322        let had_new_data = match snapshot_result {
323            Ok((_, new_data)) => new_data,
324            Err(_) => {
325                // Snapshot/persist failed — skip eviction since the data may not
326                // be on disk yet. Evicting now could lose in-memory state that
327                // can't be restored.
328                return (false, EvictionCounts::default());
329            }
330        };
331        let counts = self.storage.evict_after_snapshot(None);
332        (had_new_data, counts)
333    }
334
335    fn should_restore(&self) -> bool {
336        self.options.storage_mode.is_some()
337    }
338
339    fn should_track_dependencies(&self) -> bool {
340        self.options.dependency_tracking
341    }
342
343    fn should_track_activeness(&self) -> bool {
344        self.options.active_tracking
345    }
346
347    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348        self.task_statistics
349            .map(|stats| stats.increment_cache_hit(native_fn));
350    }
351
352    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353        self.task_statistics
354            .map(|stats| stats.increment_cache_miss(native_fn));
355    }
356
357    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
358    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
359    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
360    /// references for lazy name resolution.
361    fn task_error_to_turbo_tasks_execution_error(
362        &self,
363        error: &TaskError,
364        ctx: &mut impl ExecuteContext<'_>,
365    ) -> TurboTasksExecutionError {
366        match error {
367            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369                message: item.message.clone(),
370                source: item
371                    .source
372                    .as_ref()
373                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374            })),
375            TaskError::LocalTaskContext(local_task_context) => {
376                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377                    name: local_task_context.name.clone(),
378                    source: local_task_context
379                        .source
380                        .as_ref()
381                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382                }))
383            }
384            TaskError::TaskChain(chain) => {
385                let task_id = chain.last().unwrap();
386                let error = {
387                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
388                    if let Some(OutputValue::Error(error)) = task.get_output() {
389                        Some(error.clone())
390                    } else {
391                        None
392                    }
393                };
394                let error = error.map_or_else(
395                    || {
396                        // Eventual consistency will cause errors to no longer be available
397                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399                                "Error no longer available",
400                            )),
401                            location: None,
402                        }))
403                    },
404                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405                );
406                let mut current_error = error;
407                for &task_id in chain.iter().rev() {
408                    current_error =
409                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410                            task_id,
411                            source: Some(current_error),
412                            turbo_tasks: ctx.turbo_tasks(),
413                        }));
414                }
415                current_error
416            }
417        }
418    }
419}
420
421/// Intermediate result of step 1 of task execution completion.
422struct TaskExecutionCompletePrepareResult {
423    pub new_children: FxHashSet<TaskId>,
424    pub is_now_immutable: bool,
425    #[cfg(feature = "verify_determinism")]
426    pub no_output_set: bool,
427    #[cfg(feature = "task_dirty_cause")]
428    pub function_id: Option<FunctionId>,
429    pub new_output: Option<OutputValue>,
430    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
431    pub is_recomputation: bool,
432    pub is_session_dependent: bool,
433}
434
435// Operations
436impl<B: BackingStorage> TurboTasksBackendInner<B> {
437    fn try_read_task_output(
438        self: &Arc<Self>,
439        task_id: TaskId,
440        reader: Option<TaskId>,
441        options: ReadOutputOptions,
442        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
443    ) -> Result<Result<RawVc, EventListener>> {
444        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
445
446        let mut ctx = self.execute_context(turbo_tasks);
447        let need_reader_task = if self.should_track_dependencies()
448            && !matches!(options.tracking, ReadTracking::Untracked)
449            && reader.is_some_and(|reader_id| reader_id != task_id)
450            && let Some(reader_id) = reader
451            && reader_id != task_id
452        {
453            Some(reader_id)
454        } else {
455            None
456        };
457        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
458            // Having a task_pair here is not optimal, but otherwise this would lead to a race
459            // condition. See below.
460            // TODO(sokra): solve that in a more performant way.
461            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
462            (task, Some(reader))
463        } else {
464            (ctx.task(task_id, TaskDataCategory::All), None)
465        };
466
467        fn listen_to_done_event(
468            reader_description: Option<EventDescription>,
469            tracking: ReadTracking,
470            done_event: &Event,
471        ) -> EventListener {
472            done_event.listen_with_note(move || {
473                move || {
474                    if let Some(reader_description) = reader_description.as_ref() {
475                        format!(
476                            "try_read_task_output from {} ({})",
477                            reader_description, tracking
478                        )
479                    } else {
480                        format!("try_read_task_output ({})", tracking)
481                    }
482                }
483            })
484        }
485
486        fn check_in_progress(
487            task: &impl TaskGuard,
488            reader_description: Option<EventDescription>,
489            tracking: ReadTracking,
490        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
491        {
492            match task.get_in_progress() {
493                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
494                    listen_to_done_event(reader_description, tracking, done_event),
495                ))),
496                Some(InProgressState::InProgress(box InProgressStateInner {
497                    done_event, ..
498                })) => Some(Ok(Err(listen_to_done_event(
499                    reader_description,
500                    tracking,
501                    done_event,
502                )))),
503                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
504                    "{} was canceled",
505                    task.get_task_description()
506                ))),
507                None => None,
508            }
509        }
510
511        if matches!(options.consistency, ReadConsistency::Strong) {
512            if task
513                .get_persistent_task_type()
514                .is_some_and(|t| !t.native_fn.is_root)
515            {
516                drop(task);
517                drop(reader_task);
518                panic!(
519                    "Strongly consistent read of non-root task {} (reader: {}). The `root` \
520                     attribute is missing on the task.",
521                    self.debug_get_task_description(task_id),
522                    reader.map_or_else(
523                        || "unknown".to_string(),
524                        |r| self.debug_get_task_description(r)
525                    )
526                );
527            }
528
529            let is_dirty = task.is_dirty();
530
531            // Check the dirty count of the root node
532            let has_dirty_containers = task.has_dirty_containers();
533            if has_dirty_containers || is_dirty.is_some() {
534                let activeness = task.get_activeness_mut();
535                let mut task_ids_to_schedule: Vec<_> = Vec::new();
536                // When there are dirty task, subscribe to the all_clean_event
537                let activeness = if let Some(activeness) = activeness {
538                    // This makes sure all tasks stay active and this task won't stale.
539                    // active_until_clean is automatically removed when this
540                    // task is clean.
541                    activeness.set_active_until_clean();
542                    activeness
543                } else {
544                    // If we don't have a root state, add one. This also makes sure all tasks stay
545                    // active and this task won't stale. active_until_clean
546                    // is automatically removed when this task is clean.
547                    if ctx.should_track_activeness() {
548                        // A newly added Activeness need to make sure to schedule the tasks
549                        task_ids_to_schedule = task.dirty_containers().collect();
550                        task_ids_to_schedule.push(task_id);
551                    }
552                    let activeness =
553                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
554                    activeness.set_active_until_clean();
555                    activeness
556                };
557                let listener = activeness.all_clean_event.listen_with_note(move || {
558                    let this = self.clone();
559                    let tt = turbo_tasks.pin();
560                    move || {
561                        let mut ctx = this.execute_context(&tt);
562                        let mut visited = FxHashSet::default();
563                        fn indent(s: &str) -> String {
564                            s.split_inclusive('\n')
565                                .flat_map(|line: &str| ["  ", line].into_iter())
566                                .collect::<String>()
567                        }
568                        fn get_info(
569                            ctx: &mut impl ExecuteContext<'_>,
570                            task_id: TaskId,
571                            parent_and_count: Option<(TaskId, i32)>,
572                            visited: &mut FxHashSet<TaskId>,
573                        ) -> String {
574                            let task = ctx.task(task_id, TaskDataCategory::All);
575                            let is_dirty = task.is_dirty();
576                            let in_progress =
577                                task.get_in_progress()
578                                    .map_or("not in progress", |p| match p {
579                                        InProgressState::InProgress(_) => "in progress",
580                                        InProgressState::Scheduled { .. } => "scheduled",
581                                        InProgressState::Canceled => "canceled",
582                                    });
583                            let activeness = task.get_activeness().map_or_else(
584                                || "not active".to_string(),
585                                |activeness| format!("{activeness:?}"),
586                            );
587                            let aggregation_number = get_aggregation_number(&task);
588                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
589                            {
590                                let uppers = get_uppers(&task);
591                                !uppers.contains(&parent_task_id)
592                            } else {
593                                false
594                            };
595
596                            // Check the dirty count of the root node
597                            let has_dirty_containers = task.has_dirty_containers();
598
599                            let task_description = task.get_task_description();
600                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
601                                format!(", dirty({parent_priority})")
602                            } else {
603                                String::new()
604                            };
605                            let has_dirty_containers_label = if has_dirty_containers {
606                                ", dirty containers"
607                            } else {
608                                ""
609                            };
610                            let count = if let Some((_, count)) = parent_and_count {
611                                format!(" {count}")
612                            } else {
613                                String::new()
614                            };
615                            let mut info = format!(
616                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
617                                 {in_progress}, \
618                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
619                            );
620                            let children: Vec<_> = task.dirty_containers_with_count().collect();
621                            drop(task);
622
623                            if missing_upper {
624                                info.push_str("\n  ERROR: missing upper connection");
625                            }
626
627                            if has_dirty_containers || !children.is_empty() {
628                                writeln!(info, "\n  dirty tasks:").unwrap();
629
630                                for (child_task_id, count) in children {
631                                    let task_description = ctx
632                                        .task(child_task_id, TaskDataCategory::Data)
633                                        .get_task_description();
634                                    if visited.insert(child_task_id) {
635                                        let child_info = get_info(
636                                            ctx,
637                                            child_task_id,
638                                            Some((task_id, count)),
639                                            visited,
640                                        );
641                                        info.push_str(&indent(&child_info));
642                                        if !info.ends_with('\n') {
643                                            info.push('\n');
644                                        }
645                                    } else {
646                                        writeln!(
647                                            info,
648                                            "  {child_task_id} {task_description} {count} \
649                                             (already visited)"
650                                        )
651                                        .unwrap();
652                                    }
653                                }
654                            }
655                            info
656                        }
657                        let info = get_info(&mut ctx, task_id, None, &mut visited);
658                        format!(
659                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
660                        )
661                    }
662                });
663                drop(reader_task);
664                drop(task);
665                if !task_ids_to_schedule.is_empty() {
666                    let mut queue = AggregationUpdateQueue::new();
667                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
668                    queue.execute(&mut ctx);
669                }
670
671                return Ok(Err(listener));
672            }
673        }
674
675        let reader_description = reader_task
676            .as_ref()
677            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
678        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
679        {
680            return value;
681        }
682
683        if let Some(output) = task.get_output() {
684            let result = match output {
685                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
686                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
687                OutputValue::Error(error) => Err(error.clone()),
688            };
689            if let Some(mut reader_task) = reader_task.take()
690                && options.tracking.should_track(result.is_err())
691                && (!task.immutable() || cfg!(feature = "verify_immutable"))
692            {
693                #[cfg(feature = "trace_task_output_dependencies")]
694                let _span = tracing::trace_span!(
695                    "add output dependency",
696                    task = %task_id,
697                    dependent_task = ?reader
698                )
699                .entered();
700                let mut queue = LeafDistanceUpdateQueue::new();
701                let reader = reader.unwrap();
702                if task.add_output_dependent(reader) {
703                    // Ensure that dependent leaf distance is strictly monotonic increasing
704                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
705                    let reader_leaf_distance =
706                        reader_task.get_leaf_distance().copied().unwrap_or_default();
707                    if reader_leaf_distance.distance <= leaf_distance.distance {
708                        queue.push(
709                            reader,
710                            leaf_distance.distance,
711                            leaf_distance.max_distance_in_buffer,
712                        );
713                    }
714                }
715
716                drop(task);
717
718                // Note: We use `task_pair` earlier to lock the task and its reader at the same
719                // time. If we didn't and just locked the reader here, an invalidation could occur
720                // between grabbing the locks. If that happened, and if the task is "outdated" or
721                // doesn't have the dependency edge yet, the invalidation would be lost.
722
723                if !reader_task.remove_outdated_output_dependencies(&task_id) {
724                    let _ = reader_task.add_output_dependencies(task_id);
725                }
726                drop(reader_task);
727
728                queue.execute(&mut ctx);
729            } else {
730                drop(task);
731            }
732
733            return result.map_err(|error| {
734                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
735                    .with_task_context(task_id, turbo_tasks.pin())
736                    .into()
737            });
738        }
739        drop(reader_task);
740
741        let note = EventDescription::new(|| {
742            move || {
743                if let Some(reader) = reader_description.as_ref() {
744                    format!("try_read_task_output (recompute) from {reader}",)
745                } else {
746                    "try_read_task_output (recompute, untracked)".to_string()
747                }
748            }
749        });
750
751        // Output doesn't exist. We need to schedule the task to compute it.
752        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
753            TaskExecutionReason::OutputNotAvailable,
754            EventDescription::new(|| task.get_task_desc_fn()),
755            note,
756        );
757
758        // It's not possible that the task is InProgress at this point. If it is InProgress {
759        // done: true } it must have Output and would early return.
760        let old = task.set_in_progress(in_progress_state);
761        debug_assert!(old.is_none(), "InProgress already exists");
762        ctx.schedule_task(task, TaskPriority::Recomputation);
763
764        Ok(Err(listener))
765    }
766
767    fn try_read_task_cell(
768        &self,
769        task_id: TaskId,
770        reader: Option<TaskId>,
771        cell: CellId,
772        options: ReadCellOptions,
773        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
774    ) -> Result<Result<TypedCellContent, EventListener>> {
775        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
776
777        fn add_cell_dependency(
778            task_id: TaskId,
779            mut task: impl TaskGuard,
780            reader: Option<TaskId>,
781            reader_task: Option<impl TaskGuard>,
782            cell: CellId,
783            key: Option<u64>,
784        ) {
785            if let Some(mut reader_task) = reader_task
786                && (!task.immutable() || cfg!(feature = "verify_immutable"))
787            {
788                let reader = reader.unwrap();
789                let _ = task
790                    .add_cell_dependents(CellDependency::new(CellRef { task: reader, cell }, key));
791                drop(task);
792
793                // Note: We use `task_pair` earlier to lock the task and its reader at the same
794                // time. If we didn't and just locked the reader here, an invalidation could occur
795                // between grabbing the locks. If that happened, and if the task is "outdated" or
796                // doesn't have the dependency edge yet, the invalidation would be lost.
797
798                let target = CellRef {
799                    task: task_id,
800                    cell,
801                };
802                let dep = CellDependency::new(target, key);
803                if !reader_task.remove_outdated_cell_dependencies(&dep) {
804                    let _ = reader_task.add_cell_dependencies(dep);
805                }
806                drop(reader_task);
807            }
808        }
809
810        let ReadCellOptions {
811            tracking,
812            final_read_hint,
813        } = options;
814
815        let mut ctx = self.execute_context(turbo_tasks);
816        let (mut task, reader_task) = if self.should_track_dependencies()
817            && !matches!(tracking, ReadCellTracking::Untracked)
818            && let Some(reader_id) = reader
819            && reader_id != task_id
820        {
821            // Having a task_pair here is not optimal, but otherwise this would lead to a race
822            // condition. See below.
823            // TODO(sokra): solve that in a more performant way.
824            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
825            (task, Some(reader))
826        } else {
827            (ctx.task(task_id, TaskDataCategory::All), None)
828        };
829
830        let content = if final_read_hint {
831            task.remove_cell_data(&cell)
832        } else {
833            task.get_cell_data(&cell).cloned()
834        };
835        if let Some(content) = content {
836            if tracking.should_track(false) {
837                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
838            }
839            return Ok(Ok(TypedCellContent(
840                cell.type_id,
841                CellContent(Some(content)),
842            )));
843        }
844
845        let in_progress = task.get_in_progress();
846        if matches!(
847            in_progress,
848            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
849        ) {
850            return Ok(Err(self
851                .listen_to_cell(&mut task, task_id, &reader_task, cell)
852                .0));
853        }
854        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
855
856        // Check cell index range (cell might not exist at all)
857        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
858        let Some(max_id) = max_id else {
859            let task_desc = task.get_task_description();
860            if tracking.should_track(true) {
861                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
862            }
863            bail!(
864                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
865            );
866        };
867        if cell.index >= max_id {
868            let task_desc = task.get_task_description();
869            if tracking.should_track(true) {
870                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
871            }
872            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
873        }
874
875        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
876        // task to get the cell content.
877
878        // Bail early if the task was cancelled — no point in registering a listener
879        // on a task that won't execute again.
880        if is_cancelled {
881            bail!("{} was canceled", task.get_task_description());
882        }
883
884        // Listen to the cell and potentially schedule the task
885        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
886        drop(reader_task);
887        if !new_listener {
888            return Ok(Err(listener));
889        }
890
891        let _span = tracing::trace_span!(
892            "recomputation",
893            cell_type = get_value_type(cell.type_id).ty.global_name,
894            cell_index = cell.index
895        )
896        .entered();
897
898        let _ = task.add_scheduled(
899            TaskExecutionReason::CellNotAvailable,
900            EventDescription::new(|| task.get_task_desc_fn()),
901        );
902        ctx.schedule_task(task, TaskPriority::Recomputation);
903
904        Ok(Err(listener))
905    }
906
907    fn listen_to_cell(
908        &self,
909        task: &mut impl TaskGuard,
910        task_id: TaskId,
911        reader_task: &Option<impl TaskGuard>,
912        cell: CellId,
913    ) -> (EventListener, bool) {
914        let note = || {
915            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
916            move || {
917                if let Some(reader_desc) = reader_desc.as_ref() {
918                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
919                } else {
920                    "try_read_task_cell (in progress, untracked)".to_string()
921                }
922            }
923        };
924        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
925            // Someone else is already computing the cell
926            let listener = in_progress.event.listen_with_note(note);
927            return (listener, false);
928        }
929        let in_progress = InProgressCellState::new(task_id, cell);
930        let listener = in_progress.event.listen_with_note(note);
931        let old = task.insert_in_progress_cells(cell, in_progress);
932        debug_assert!(old.is_none(), "InProgressCell already exists");
933        (listener, true)
934    }
935
936    fn snapshot_and_persist(
937        &self,
938        parent_span: Option<tracing::Id>,
939        reason: &str,
940        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
941    ) -> Result<(Instant, bool), anyhow::Error> {
942        let snapshot_span =
943            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
944                .entered();
945        // Serialize snapshots. The internal protocol (snapshot_mode, snapshot
946        // request bit, suspended_operations) assumes only one snapshot runs at
947        // a time. Held for the entire snapshot lifecycle.
948        let _snapshot_in_progress = self.snapshot_in_progress.lock();
949        let start = Instant::now();
950        // SystemTime for wall-clock timestamps in trace events (milliseconds
951        // since epoch). Instant is monotonic but has no defined epoch, so it
952        // can't be used for cross-process trace correlation.
953        let wall_start = SystemTime::now();
954        debug_assert!(self.should_persist());
955
956        let mut snapshot_phase = {
957            let _span = tracing::info_span!("blocking").entered();
958            self.snapshot_coord.begin_snapshot()
959        };
960        // Enter snapshot mode, which atomically reads and resets the modified count.
961        // Checking after start_snapshot ensures no concurrent increments can race.
962        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
963
964        let suspended_operations = snapshot_phase.take_suspended_operations();
965
966        let snapshot_time = Instant::now();
967        drop(snapshot_phase);
968
969        if !has_modifications {
970            // No tasks modified since the last snapshot — drop the guard (which
971            // calls end_snapshot) and skip the expensive O(N) scan.
972            drop(snapshot_guard);
973            return Ok((start, false));
974        }
975
976        #[cfg(feature = "print_cache_item_size")]
977        #[derive(Default)]
978        struct TaskCacheStats {
979            data: usize,
980            #[cfg(feature = "print_cache_item_size_with_compressed")]
981            data_compressed: usize,
982            data_count: usize,
983            meta: usize,
984            #[cfg(feature = "print_cache_item_size_with_compressed")]
985            meta_compressed: usize,
986            meta_count: usize,
987            upper_count: usize,
988            collectibles_count: usize,
989            aggregated_collectibles_count: usize,
990            children_count: usize,
991            followers_count: usize,
992            collectibles_dependents_count: usize,
993            aggregated_dirty_containers_count: usize,
994            output_size: usize,
995        }
996        /// Formats a byte size, optionally including the compressed size when the
997        /// `print_cache_item_size_with_compressed` feature is enabled.
998        #[cfg(feature = "print_cache_item_size")]
999        struct FormatSizes {
1000            size: usize,
1001            #[cfg(feature = "print_cache_item_size_with_compressed")]
1002            compressed_size: usize,
1003        }
1004        #[cfg(feature = "print_cache_item_size")]
1005        impl std::fmt::Display for FormatSizes {
1006            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1007                use turbo_tasks::util::FormatBytes;
1008                #[cfg(feature = "print_cache_item_size_with_compressed")]
1009                {
1010                    write!(
1011                        f,
1012                        "{} ({} compressed)",
1013                        FormatBytes(self.size),
1014                        FormatBytes(self.compressed_size)
1015                    )
1016                }
1017                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1018                {
1019                    write!(f, "{}", FormatBytes(self.size))
1020                }
1021            }
1022        }
1023        #[cfg(feature = "print_cache_item_size")]
1024        impl TaskCacheStats {
1025            #[cfg(feature = "print_cache_item_size_with_compressed")]
1026            fn compressed_size(data: &[u8]) -> Result<usize> {
1027                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1028                    data,
1029                    &mut Vec::new(),
1030                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1031                )?)
1032            }
1033
1034            fn add_data(&mut self, data: &[u8]) {
1035                self.data += data.len();
1036                #[cfg(feature = "print_cache_item_size_with_compressed")]
1037                {
1038                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1039                }
1040                self.data_count += 1;
1041            }
1042
1043            fn add_meta(&mut self, data: &[u8]) {
1044                self.meta += data.len();
1045                #[cfg(feature = "print_cache_item_size_with_compressed")]
1046                {
1047                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1048                }
1049                self.meta_count += 1;
1050            }
1051
1052            fn add_counts(&mut self, storage: &TaskStorage) {
1053                let counts = storage.meta_counts();
1054                self.upper_count += counts.upper;
1055                self.collectibles_count += counts.collectibles;
1056                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1057                self.children_count += counts.children;
1058                self.followers_count += counts.followers;
1059                self.collectibles_dependents_count += counts.collectibles_dependents;
1060                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1061                if let Some(output) = storage.get_output() {
1062                    use turbo_bincode::turbo_bincode_encode;
1063
1064                    self.output_size += turbo_bincode_encode(&output)
1065                        .map(|data| data.len())
1066                        .unwrap_or(0);
1067                }
1068            }
1069
1070            /// Returns the task name used as the stats grouping key.
1071            fn task_name(storage: &TaskStorage) -> String {
1072                storage
1073                    .get_persistent_task_type()
1074                    .map(|t| t.to_string())
1075                    .unwrap_or_else(|| "<unknown>".to_string())
1076            }
1077
1078            /// Returns the primary sort key: compressed total when
1079            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1080            fn sort_key(&self) -> usize {
1081                #[cfg(feature = "print_cache_item_size_with_compressed")]
1082                {
1083                    self.data_compressed + self.meta_compressed
1084                }
1085                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1086                {
1087                    self.data + self.meta
1088                }
1089            }
1090
1091            fn format_total(&self) -> FormatSizes {
1092                FormatSizes {
1093                    size: self.data + self.meta,
1094                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1095                    compressed_size: self.data_compressed + self.meta_compressed,
1096                }
1097            }
1098
1099            fn format_data(&self) -> FormatSizes {
1100                FormatSizes {
1101                    size: self.data,
1102                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1103                    compressed_size: self.data_compressed,
1104                }
1105            }
1106
1107            fn format_avg_data(&self) -> FormatSizes {
1108                FormatSizes {
1109                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1110                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1111                    compressed_size: self
1112                        .data_compressed
1113                        .checked_div(self.data_count)
1114                        .unwrap_or(0),
1115                }
1116            }
1117
1118            fn format_meta(&self) -> FormatSizes {
1119                FormatSizes {
1120                    size: self.meta,
1121                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1122                    compressed_size: self.meta_compressed,
1123                }
1124            }
1125
1126            fn format_avg_meta(&self) -> FormatSizes {
1127                FormatSizes {
1128                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1129                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1130                    compressed_size: self
1131                        .meta_compressed
1132                        .checked_div(self.meta_count)
1133                        .unwrap_or(0),
1134                }
1135            }
1136        }
1137        #[cfg(feature = "print_cache_item_size")]
1138        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1139            Mutex::new(FxHashMap::default());
1140
1141        // Encode each task's modified categories. We only encode categories with `modified` set,
1142        // meaning the category was actually dirtied. Categories restored from disk but never
1143        // modified don't need re-persisting since the on-disk version is still valid.
1144        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1145        // flags were copied from the live task at snapshot creation time, reflecting which
1146        // categories were dirtied before the snapshot was taken.
1147        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1148            let encode_category = |task_id: TaskId,
1149                                   data: &TaskStorage,
1150                                   category: SpecificTaskDataCategory,
1151                                   buffer: &mut TurboBincodeBuffer|
1152             -> Option<TurboBincodeBuffer> {
1153                match encode_task_data(task_id, data, category, buffer) {
1154                    Ok(encoded) => {
1155                        #[cfg(feature = "print_cache_item_size")]
1156                        {
1157                            let mut stats = task_cache_stats.lock();
1158                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1159                            match category {
1160                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1161                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1162                            }
1163                        }
1164                        Some(encoded)
1165                    }
1166                    Err(err) => {
1167                        panic!(
1168                            "Serializing task {} failed ({:?}): {:?}",
1169                            self.debug_get_task_description(task_id),
1170                            category,
1171                            err
1172                        );
1173                    }
1174                }
1175            };
1176            if task_id.is_transient() {
1177                unreachable!("transient task_ids should never be enqueued to be persisted");
1178            }
1179
1180            let encode_meta = inner.flags.meta_modified();
1181            let encode_data = inner.flags.data_modified();
1182
1183            #[cfg(feature = "print_cache_item_size")]
1184            if encode_data || encode_meta {
1185                task_cache_stats
1186                    .lock()
1187                    .entry(TaskCacheStats::task_name(inner))
1188                    .or_default()
1189                    .add_counts(inner);
1190            }
1191
1192            let meta = if encode_meta {
1193                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1194            } else {
1195                None
1196            };
1197
1198            let data = if encode_data {
1199                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1200            } else {
1201                None
1202            };
1203            let task_type_hash = if inner.flags.new_task() {
1204                let task_type = inner.get_persistent_task_type().expect(
1205                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1206                     creation for persistent tasks uses a single ExecutionContextImpl for \
1207                     creating the task (which sets new_task) and connect_child (which sets \
1208                     persistent_task_type) and take_snapshot waits for all operations to complete \
1209                     or suspend before we start snapshotting.  So task creation will always set \
1210                     the task_type.",
1211                );
1212                Some(compute_task_type_hash(task_type))
1213            } else {
1214                None
1215            };
1216
1217            SnapshotItem {
1218                task_id,
1219                meta,
1220                data,
1221                task_type_hash,
1222            }
1223        };
1224
1225        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1226
1227        drop(snapshot_span);
1228        let snapshot_duration = start.elapsed();
1229        let task_count = task_snapshots.len();
1230
1231        if task_snapshots.is_empty() {
1232            // This should be impossible — if we got here, modified_count was nonzero, and every
1233            // modification that increments the count also failed during encoding.
1234            std::hint::cold_path();
1235            return Ok((snapshot_time, false));
1236        }
1237
1238        let persist_start = Instant::now();
1239        let span = tracing::info_span!(
1240            parent: parent_span,
1241            "persist",
1242            reason = reason,
1243            data_items= tracing::field::Empty,
1244            meta_items= tracing::field::Empty,
1245            task_cache_items= tracing::field::Empty,
1246            next_task_id= tracing::field::Empty,)
1247        .entered();
1248        {
1249            // Tasks were already consumed by take_snapshot, so a future snapshot
1250            // would not re-persist them — returning an error signals to the caller
1251            // that further persist attempts would corrupt the task graph in storage.
1252            let SnapshotMeta {
1253                task_cache_items,
1254                data_items,
1255                meta_items,
1256                max_next_task_id,
1257            } = self
1258                .backing_storage
1259                .save_snapshot(suspended_operations, task_snapshots)?;
1260            span.record("data_items", data_items);
1261            span.record("meta_items", meta_items);
1262            span.record("task_cache_items", task_cache_items);
1263            span.record("next_task_id", max_next_task_id);
1264
1265            #[cfg(feature = "print_cache_item_size")]
1266            {
1267                let mut task_cache_stats = task_cache_stats
1268                    .into_inner()
1269                    .into_iter()
1270                    .collect::<Vec<_>>();
1271                if !task_cache_stats.is_empty() {
1272                    use turbo_tasks::util::FormatBytes;
1273
1274                    use crate::utils::markdown_table::print_markdown_table;
1275
1276                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1277                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1278                    });
1279
1280                    println!(
1281                        "Task cache stats: {}",
1282                        FormatSizes {
1283                            size: task_cache_stats
1284                                .iter()
1285                                .map(|(_, s)| s.data + s.meta)
1286                                .sum::<usize>(),
1287                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1288                            compressed_size: task_cache_stats
1289                                .iter()
1290                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1291                                .sum::<usize>()
1292                        },
1293                    );
1294
1295                    print_markdown_table(
1296                        [
1297                            "Task",
1298                            " Total Size",
1299                            " Data Size",
1300                            " Data Count x Avg",
1301                            " Data Count x Avg",
1302                            " Meta Size",
1303                            " Meta Count x Avg",
1304                            " Meta Count x Avg",
1305                            " Uppers",
1306                            " Coll",
1307                            " Agg Coll",
1308                            " Children",
1309                            " Followers",
1310                            " Coll Deps",
1311                            " Agg Dirty",
1312                            " Output Size",
1313                        ],
1314                        task_cache_stats.iter(),
1315                        |(task_desc, stats)| {
1316                            [
1317                                task_desc.to_string(),
1318                                format!(" {}", stats.format_total()),
1319                                format!(" {}", stats.format_data()),
1320                                format!(" {} x", stats.data_count),
1321                                format!("{}", stats.format_avg_data()),
1322                                format!(" {}", stats.format_meta()),
1323                                format!(" {} x", stats.meta_count),
1324                                format!("{}", stats.format_avg_meta()),
1325                                format!(" {}", stats.upper_count),
1326                                format!(" {}", stats.collectibles_count),
1327                                format!(" {}", stats.aggregated_collectibles_count),
1328                                format!(" {}", stats.children_count),
1329                                format!(" {}", stats.followers_count),
1330                                format!(" {}", stats.collectibles_dependents_count),
1331                                format!(" {}", stats.aggregated_dirty_containers_count),
1332                                format!(" {}", FormatBytes(stats.output_size)),
1333                            ]
1334                        },
1335                    );
1336                }
1337            }
1338        }
1339
1340        let elapsed = start.elapsed();
1341        let persist_duration = persist_start.elapsed();
1342        // avoid spamming the event queue with information about fast operations
1343        if elapsed > Duration::from_secs(10) {
1344            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1345                "Finished writing to filesystem cache".to_string(),
1346                elapsed,
1347            )));
1348        }
1349
1350        let wall_start_ms = wall_start
1351            .duration_since(SystemTime::UNIX_EPOCH)
1352            .unwrap_or_default()
1353            // as_millis_f64 is not stable yet
1354            .as_secs_f64()
1355            * 1000.0;
1356        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1357        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1358            "turbopack-persistence",
1359            wall_start_ms,
1360            wall_end_ms,
1361            vec![
1362                ("reason", serde_json::Value::from(reason)),
1363                (
1364                    "snapshot_duration_ms",
1365                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1366                ),
1367                (
1368                    "persist_duration_ms",
1369                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1370                ),
1371                ("task_count", serde_json::Value::from(task_count)),
1372            ],
1373        )));
1374
1375        Ok((snapshot_time, true))
1376    }
1377
1378    fn startup(&self, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1379        if self.should_restore() {
1380            // Continue all uncompleted operations
1381            // They can't be interrupted by a snapshot since the snapshotting job has not been
1382            // scheduled yet.
1383            let uncompleted_operations = self
1384                .backing_storage
1385                .uncompleted_operations()
1386                .expect("Failed to get uncompleted operations");
1387            if !uncompleted_operations.is_empty() {
1388                let mut ctx = self.execute_context(turbo_tasks);
1389                for op in uncompleted_operations {
1390                    op.execute(&mut ctx);
1391                }
1392            }
1393        }
1394
1395        // Only when it should write regularly to the storage, we schedule the initial snapshot
1396        // job.
1397        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1398            // Schedule the snapshot job
1399            let _span = trace_span!("persisting background job").entered();
1400            let _span = tracing::info_span!("thread").entered();
1401            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1402        }
1403    }
1404
1405    fn stopping(&self) {
1406        self.stopping.store(true, Ordering::Release);
1407        self.stopping_event.notify(usize::MAX);
1408    }
1409
1410    #[allow(unused_variables)]
1411    fn stop(&self, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1412        #[cfg(feature = "verify_aggregation_graph")]
1413        {
1414            self.is_idle.store(false, Ordering::Release);
1415            self.verify_aggregation_graph(turbo_tasks, false);
1416        }
1417        if self.should_persist()
1418            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1419        {
1420            eprintln!("Persisting failed during shutdown: {err:?}");
1421        }
1422        self.storage.drop_contents();
1423        if let Err(err) = self.backing_storage.shutdown() {
1424            println!("Shutting down failed: {err}");
1425        }
1426    }
1427
1428    #[allow(unused_variables)]
1429    fn idle_start(self: &Arc<Self>, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1430        self.idle_start_event.notify(usize::MAX);
1431
1432        #[cfg(feature = "verify_aggregation_graph")]
1433        {
1434            use tokio::select;
1435
1436            self.is_idle.store(true, Ordering::Release);
1437            let this = self.clone();
1438            let turbo_tasks = turbo_tasks.pin();
1439            tokio::task::spawn(async move {
1440                select! {
1441                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1442                        // do nothing
1443                    }
1444                    _ = this.idle_end_event.listen() => {
1445                        return;
1446                    }
1447                }
1448                if !this.is_idle.load(Ordering::Relaxed) {
1449                    return;
1450                }
1451                this.verify_aggregation_graph(&*turbo_tasks, true);
1452            });
1453        }
1454    }
1455
1456    fn idle_end(&self) {
1457        #[cfg(feature = "verify_aggregation_graph")]
1458        self.is_idle.store(false, Ordering::Release);
1459        self.idle_end_event.notify(usize::MAX);
1460    }
1461
1462    fn get_or_create_task(
1463        &self,
1464        native_fn: &'static NativeFunction,
1465        this: Option<RawVc>,
1466        arg: &mut dyn DynTaskInputsStorage,
1467        parent_task: Option<TaskId>,
1468        persistence: TaskPersistence,
1469        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1470    ) -> TaskId {
1471        let transient = matches!(persistence, TaskPersistence::Transient);
1472
1473        if transient
1474            && let Some(parent_task) = parent_task
1475            && !parent_task.is_transient()
1476        {
1477            let task_type = CachedTaskType {
1478                native_fn,
1479                this,
1480                arg: arg.take_box(),
1481            };
1482            self.panic_persistent_calling_transient(
1483                self.debug_get_task_description(parent_task),
1484                Some(&task_type),
1485                /* cell_id */ None,
1486            );
1487        }
1488
1489        let is_root = native_fn.is_root;
1490
1491        // Compute hash and shard index once from borrowed components (no heap allocation).
1492        let arg_ref = arg.as_ref();
1493        let hash = CachedTaskType::hash_from_components(
1494            self.storage.task_cache.hasher(),
1495            native_fn,
1496            this,
1497            arg_ref,
1498        );
1499        // Locate the shard once so that the read-only lookup and any
1500        // write-lock retry below share the same reference (saves a modulo +
1501        // memory lookup on the miss path).
1502        let shard = get_shard(&self.storage.task_cache, hash);
1503
1504        let mut ctx = self.execute_context(turbo_tasks);
1505        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1506        // Use a read lock rather than a write lock to avoid contention. connect_child
1507        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1508        if let Some(task_id) =
1509            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1510        {
1511            self.track_cache_hit_by_fn(native_fn);
1512            operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1513            return task_id;
1514        }
1515
1516        // Step 2: Check backing storage using borrowed components (no box needed yet).
1517
1518        // Task exists in backing storage.
1519        // We only need to insert it into the in-memory cache.
1520        let task_id = if !transient
1521            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1522        {
1523            self.track_cache_hit_by_fn(native_fn);
1524            // Step 3a: Insert into in-memory cache using the pre-located shard.
1525            // Use the existing Arc from storage to avoid a duplicate allocation.
1526            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1527                k.eq_components(native_fn, this, arg_ref)
1528            }) {
1529                RawEntry::Occupied(_) => {}
1530                RawEntry::Vacant(e) => {
1531                    e.insert(stored_type, task_id);
1532                }
1533            };
1534            task_id
1535        } else {
1536            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1537                k.eq_components(native_fn, this, arg_ref)
1538            }) {
1539                RawEntry::Occupied(e) => {
1540                    // Another thread beat us to creating this task — use their task_id.
1541                    // They will handle logging the new task as modified.
1542                    let task_id = *e.get();
1543                    drop(e);
1544                    self.track_cache_hit_by_fn(native_fn);
1545                    task_id
1546                }
1547                RawEntry::Vacant(e) => {
1548                    // Only now do we force the allocation.
1549                    // NOTE: if our caller had to perform resolution, then this will have already
1550                    // been boxed and take_box just takes it.
1551                    let task_type = CachedTaskTypeArc::new(CachedTaskType {
1552                        native_fn,
1553                        this,
1554                        arg: arg.take_box(),
1555                    });
1556                    let task_id = if transient {
1557                        self.transient_task_id_factory.get()
1558                    } else {
1559                        self.persisted_task_id_factory.get()
1560                    };
1561                    // Initialize storage BEFORE making task_id visible in the cache.
1562                    // This ensures any thread that reads task_id from the cache sees
1563                    // the storage entry already initialized (restored flags set).
1564                    self.storage
1565                        .initialize_new_task(task_id, Some(task_type.clone()));
1566                    // insert() consumes e, releasing the shard write lock.
1567                    e.insert(task_type, task_id);
1568                    self.track_cache_miss_by_fn(native_fn);
1569                    // Update the aggregation number before connecting the child
1570                    // We don't need this on any of the task recovery paths above because the
1571                    // aggregation number will already be set.
1572                    if is_root {
1573                        AggregationUpdateQueue::run(
1574                            AggregationUpdateJob::UpdateAggregationNumber {
1575                                task_id,
1576                                base_aggregation_number: u32::MAX,
1577                                distance: None,
1578                            },
1579                            &mut ctx,
1580                        );
1581                    } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1582                        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1583                        AggregationUpdateQueue::run(
1584                            AggregationUpdateJob::UpdateAggregationNumber {
1585                                task_id,
1586                                base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1587                                distance: None,
1588                            },
1589                            &mut ctx,
1590                        );
1591                    };
1592
1593                    task_id
1594                }
1595            }
1596        };
1597
1598        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1599
1600        task_id
1601    }
1602
1603    /// Generate an object that implements [`fmt::Display`] explaining why the given
1604    /// [`CachedTaskType`] is transient.
1605    fn debug_trace_transient_task(
1606        &self,
1607        task_type: &CachedTaskType,
1608        cell_id: Option<CellId>,
1609    ) -> DebugTraceTransientTask {
1610        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1611        // from tracing the same task many times, so use a visited_set
1612        fn inner_id(
1613            backend: &TurboTasksBackendInner<impl BackingStorage>,
1614            task_id: TaskId,
1615            cell_type_id: Option<ValueTypeId>,
1616            visited_set: &mut FxHashSet<TaskId>,
1617        ) -> DebugTraceTransientTask {
1618            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1619                if visited_set.contains(&task_id) {
1620                    let task_name = task_type.get_name();
1621                    DebugTraceTransientTask::Collapsed {
1622                        task_name,
1623                        cell_type_id,
1624                    }
1625                } else {
1626                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1627                }
1628            } else {
1629                DebugTraceTransientTask::Uncached { cell_type_id }
1630            }
1631        }
1632        fn inner_cached(
1633            backend: &TurboTasksBackendInner<impl BackingStorage>,
1634            task_type: &CachedTaskType,
1635            cell_type_id: Option<ValueTypeId>,
1636            visited_set: &mut FxHashSet<TaskId>,
1637        ) -> DebugTraceTransientTask {
1638            let task_name = task_type.get_name();
1639
1640            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1641                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1642                    // `task_id` should never be `None` at this point, as that would imply a
1643                    // non-local task is returning a local `Vc`...
1644                    // Just ignore if it happens, as we're likely already panicking.
1645                    return None;
1646                };
1647                if task_id.is_transient() {
1648                    Some(Box::new(inner_id(
1649                        backend,
1650                        task_id,
1651                        cause_self_raw_vc.try_get_type_id(),
1652                        visited_set,
1653                    )))
1654                } else {
1655                    None
1656                }
1657            });
1658            let cause_args = task_type
1659                .arg
1660                .get_raw_vcs()
1661                .into_iter()
1662                .filter_map(|raw_vc| {
1663                    let Some(task_id) = raw_vc.try_get_task_id() else {
1664                        // `task_id` should never be `None` (see comment above)
1665                        return None;
1666                    };
1667                    if !task_id.is_transient() {
1668                        return None;
1669                    }
1670                    Some((task_id, raw_vc.try_get_type_id()))
1671                })
1672                .collect::<IndexSet<_>>() // dedupe
1673                .into_iter()
1674                .map(|(task_id, cell_type_id)| {
1675                    inner_id(backend, task_id, cell_type_id, visited_set)
1676                })
1677                .collect();
1678
1679            DebugTraceTransientTask::Cached {
1680                task_name,
1681                cell_type_id,
1682                cause_self,
1683                cause_args,
1684            }
1685        }
1686        inner_cached(
1687            self,
1688            task_type,
1689            cell_id.map(|c| c.type_id),
1690            &mut FxHashSet::default(),
1691        )
1692    }
1693
1694    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1695        if !self.should_track_dependencies() {
1696            panic!("Dependency tracking is disabled so invalidation is not allowed");
1697        }
1698        operation::InvalidateOperation::run(
1699            smallvec![task_id],
1700            #[cfg(feature = "task_dirty_cause")]
1701            TaskDirtyCause::Invalidator,
1702            self.execute_context(turbo_tasks),
1703        );
1704    }
1705
1706    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1707        if !self.should_track_dependencies() {
1708            panic!("Dependency tracking is disabled so invalidation is not allowed");
1709        }
1710        operation::InvalidateOperation::run(
1711            tasks.iter().copied().collect(),
1712            #[cfg(feature = "task_dirty_cause")]
1713            TaskDirtyCause::Unknown,
1714            self.execute_context(turbo_tasks),
1715        );
1716    }
1717
1718    fn invalidate_tasks_set(
1719        &self,
1720        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1721        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1722    ) {
1723        if !self.should_track_dependencies() {
1724            panic!("Dependency tracking is disabled so invalidation is not allowed");
1725        }
1726        operation::InvalidateOperation::run(
1727            tasks.iter().copied().collect(),
1728            #[cfg(feature = "task_dirty_cause")]
1729            TaskDirtyCause::Unknown,
1730            self.execute_context(turbo_tasks),
1731        );
1732    }
1733
1734    fn invalidate_serialization(
1735        &self,
1736        task_id: TaskId,
1737        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1738    ) {
1739        if task_id.is_transient() {
1740            return;
1741        }
1742        let mut ctx = self.execute_context(turbo_tasks);
1743        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1744        task.invalidate_serialization();
1745    }
1746
1747    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1748        let task = self.storage.access_mut(task_id);
1749        if let Some(value) = task.get_persistent_task_type() {
1750            format!("{task_id:?} {}", value)
1751        } else if let Some(value) = task.get_transient_task_type() {
1752            format!("{task_id:?} {}", value)
1753        } else {
1754            format!("{task_id:?} unknown")
1755        }
1756    }
1757
1758    fn get_task_name(
1759        &self,
1760        task_id: TaskId,
1761        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1762    ) -> String {
1763        let mut ctx = self.execute_context(turbo_tasks);
1764        let task = ctx.task(task_id, TaskDataCategory::Data);
1765        if let Some(value) = task.get_persistent_task_type() {
1766            value.to_string()
1767        } else if let Some(value) = task.get_transient_task_type() {
1768            value.to_string()
1769        } else {
1770            "unknown".to_string()
1771        }
1772    }
1773
1774    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1775        let task = self.storage.access_mut(task_id);
1776        task.get_persistent_task_type().cloned()
1777    }
1778
1779    fn task_execution_canceled(
1780        &self,
1781        task_id: TaskId,
1782        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1783    ) {
1784        let mut ctx = self.execute_context(turbo_tasks);
1785        let mut task = ctx.task(task_id, TaskDataCategory::All);
1786        if let Some(in_progress) = task.take_in_progress() {
1787            match in_progress {
1788                InProgressState::Scheduled {
1789                    done_event,
1790                    reason: _,
1791                } => done_event.notify(usize::MAX),
1792                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1793                    done_event.notify(usize::MAX)
1794                }
1795                InProgressState::Canceled => {}
1796            }
1797        }
1798        // Notify any readers waiting on in-progress cells so their listeners
1799        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1800        let in_progress_cells = task.take_in_progress_cells();
1801        if let Some(ref cells) = in_progress_cells {
1802            for state in cells.values() {
1803                state.event.notify(usize::MAX);
1804            }
1805        }
1806
1807        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1808        // in the next session. Without this, any reader that encounters the cancelled task
1809        // records an error in its output. That error is persisted and would poison
1810        // subsequent builds. By marking the task session-dependent dirty, the next build
1811        // re-executes it, which invalidates dependents and corrects the stale errors.
1812        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1813            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1814        } else {
1815            None
1816        };
1817
1818        let old = task.set_in_progress(InProgressState::Canceled);
1819        debug_assert!(old.is_none(), "InProgress already exists");
1820        drop(task);
1821
1822        if let Some(data_update) = data_update {
1823            AggregationUpdateQueue::run(data_update, &mut ctx);
1824        }
1825
1826        drop(in_progress_cells);
1827    }
1828
1829    fn try_start_task_execution(
1830        &self,
1831        task_id: TaskId,
1832        priority: TaskPriority,
1833        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1834    ) -> Option<TaskExecutionSpec<'_>> {
1835        let execution_reason;
1836        let task_type;
1837        #[cfg(feature = "task_dirty_cause")]
1838        let cause;
1839        {
1840            let mut ctx = self.execute_context(turbo_tasks);
1841            let mut task = ctx.task(task_id, TaskDataCategory::All);
1842            task_type = task.get_task_type().to_owned();
1843            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1844            if let Some(tasks) = task.prefetch() {
1845                drop(task);
1846                ctx.prepare_tasks(tasks, "prefetch");
1847                task = ctx.task(task_id, TaskDataCategory::All);
1848            }
1849            let in_progress = task.take_in_progress()?;
1850            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1851                let old = task.set_in_progress(in_progress);
1852                debug_assert!(old.is_none(), "InProgress already exists");
1853                return None;
1854            };
1855            execution_reason = reason;
1856            #[cfg(feature = "task_dirty_cause")]
1857            {
1858                cause = match task.get_dirty() {
1859                    Some(Dirtyness::Dirty { cause, .. }) => Some(cause.clone()),
1860                    _ => None,
1861                };
1862            }
1863            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1864                InProgressStateInner {
1865                    stale: false,
1866                    once_task,
1867                    done_event,
1868                    marked_as_completed: false,
1869                    new_children: Default::default(),
1870                },
1871            )));
1872            debug_assert!(old.is_none(), "InProgress already exists");
1873
1874            // Make all current collectibles outdated (remove left-over outdated collectibles)
1875            enum Collectible {
1876                Current(CollectibleRef, i32),
1877                Outdated(CollectibleRef),
1878            }
1879            let collectibles = task
1880                .iter_collectibles()
1881                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1882                .chain(
1883                    task.iter_outdated_collectibles()
1884                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1885                )
1886                .collect::<Vec<_>>();
1887            for collectible in collectibles {
1888                match collectible {
1889                    Collectible::Current(collectible, value) => {
1890                        let _ = task.insert_outdated_collectible(collectible, value);
1891                    }
1892                    Collectible::Outdated(collectible) => {
1893                        if task
1894                            .collectibles()
1895                            .is_none_or(|m| m.get(&collectible).is_none())
1896                        {
1897                            task.remove_outdated_collectibles(&collectible);
1898                        }
1899                    }
1900                }
1901            }
1902
1903            if self.should_track_dependencies() {
1904                // Make all dependencies outdated
1905                let cell_dependencies = task.iter_cell_dependencies().collect();
1906                task.set_outdated_cell_dependencies(cell_dependencies);
1907
1908                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1909                task.set_outdated_output_dependencies(outdated_output_dependencies);
1910            }
1911        }
1912
1913        let (span, future) = match task_type {
1914            TaskType::Cached(task_type) => {
1915                let CachedTaskType {
1916                    native_fn,
1917                    this,
1918                    arg,
1919                } = &*task_type;
1920                (
1921                    native_fn.span(
1922                        task_id.persistence(),
1923                        execution_reason,
1924                        priority,
1925                        #[cfg(feature = "task_dirty_cause")]
1926                        cause.as_ref(),
1927                    ),
1928                    native_fn.execute(*this, &**arg),
1929                )
1930            }
1931            TaskType::Transient(task_type) => {
1932                let span = tracing::trace_span!("turbo_tasks::root_task");
1933                let future = match &*task_type {
1934                    TransientTask::Root(f) => f(),
1935                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1936                };
1937                (span, future)
1938            }
1939        };
1940        Some(TaskExecutionSpec { future, span })
1941    }
1942
1943    /// Returns `Some(priority)` if the task became stale during execution and needs to be
1944    /// re-scheduled at the given priority. The caller (turbo-tasks manager) hands it to the
1945    /// priority runner so re-execution doesn't inherit the original schedule priority.
1946    fn task_execution_completed(
1947        &self,
1948        task_id: TaskId,
1949        result: Result<RawVc, TurboTasksExecutionError>,
1950        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1951        #[cfg(feature = "verify_determinism")] stateful: bool,
1952        has_invalidator: bool,
1953        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1954    ) -> Option<TaskPriority> {
1955        // Task completion is a 4 step process:
1956        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1957        //    aggregation number of the task and the new children.
1958        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1959        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1960        // 4. Shrink the task memory to reduce footprint of the task.
1961
1962        // Due to persistence it is possible that the process is cancelled after any step. This is
1963        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1964        // in-memory representation.
1965
1966        // The task might be invalidated during this process, so we need to check the stale flag
1967        // at the start of every step.
1968
1969        #[cfg(not(feature = "trace_task_details"))]
1970        let span = tracing::trace_span!(
1971            "task execution completed",
1972            new_children = tracing::field::Empty
1973        )
1974        .entered();
1975        #[cfg(feature = "trace_task_details")]
1976        let span = tracing::trace_span!(
1977            "task execution completed",
1978            task_id = display(task_id),
1979            result = match result.as_ref() {
1980                Ok(value) => display(either::Either::Left(value)),
1981                Err(err) => display(either::Either::Right(err)),
1982            },
1983            new_children = tracing::field::Empty,
1984            immutable = tracing::field::Empty,
1985            new_output = tracing::field::Empty,
1986            output_dependents = tracing::field::Empty,
1987            aggregation_number = tracing::field::Empty,
1988            stale = tracing::field::Empty,
1989        )
1990        .entered();
1991
1992        let is_error = result.is_err();
1993
1994        let mut ctx = self.execute_context(turbo_tasks);
1995
1996        let TaskExecutionCompletePrepareResult {
1997            new_children,
1998            is_now_immutable,
1999            #[cfg(feature = "verify_determinism")]
2000            no_output_set,
2001            new_output,
2002            #[cfg(feature = "task_dirty_cause")]
2003            function_id,
2004            output_dependent_tasks,
2005            is_recomputation,
2006            is_session_dependent,
2007        } = match self.task_execution_completed_prepare(
2008            &mut ctx,
2009            #[cfg(feature = "trace_task_details")]
2010            &span,
2011            task_id,
2012            result,
2013            cell_counters,
2014            #[cfg(feature = "verify_determinism")]
2015            stateful,
2016            has_invalidator,
2017        ) {
2018            Ok(r) => r,
2019            Err(stale_priority) => {
2020                // Task was stale and has been rescheduled
2021                #[cfg(feature = "trace_task_details")]
2022                span.record("stale", "prepare");
2023                return Some(stale_priority);
2024            }
2025        };
2026
2027        #[cfg(feature = "trace_task_details")]
2028        span.record("new_output", new_output.is_some());
2029        #[cfg(feature = "trace_task_details")]
2030        span.record("output_dependents", output_dependent_tasks.len());
2031
2032        // When restoring from filesystem cache the following might not be executed (since we can
2033        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2034        // would be executed again.
2035
2036        if !output_dependent_tasks.is_empty() {
2037            self.task_execution_completed_invalidate_output_dependent(
2038                &mut ctx,
2039                task_id,
2040                #[cfg(feature = "task_dirty_cause")]
2041                function_id,
2042                output_dependent_tasks,
2043            );
2044        }
2045
2046        let has_new_children = !new_children.is_empty();
2047        span.record("new_children", new_children.len());
2048
2049        if has_new_children {
2050            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2051        }
2052
2053        if has_new_children
2054            && let Some(stale_priority) =
2055                self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2056        {
2057            // Task was stale and has been rescheduled
2058            #[cfg(feature = "trace_task_details")]
2059            span.record("stale", "connect");
2060            return Some(stale_priority);
2061        }
2062
2063        let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2064            &mut ctx,
2065            task_id,
2066            #[cfg(feature = "verify_determinism")]
2067            no_output_set,
2068            new_output,
2069            is_now_immutable,
2070            is_session_dependent,
2071        );
2072        if let Some(stale_priority) = stale_priority {
2073            // Task was stale and has been rescheduled
2074            #[cfg(feature = "trace_task_details")]
2075            span.record("stale", "finish");
2076            return Some(stale_priority);
2077        }
2078
2079        let removed_data = self.task_execution_completed_cleanup(
2080            &mut ctx,
2081            task_id,
2082            cell_counters,
2083            is_error,
2084            is_recomputation,
2085        );
2086
2087        // Drop data outside of critical sections
2088        drop(removed_data);
2089        drop(in_progress_cells);
2090
2091        None
2092    }
2093
2094    fn task_execution_completed_prepare(
2095        &self,
2096        ctx: &mut impl ExecuteContext<'_>,
2097        #[cfg(feature = "trace_task_details")] span: &Span,
2098        task_id: TaskId,
2099        result: Result<RawVc, TurboTasksExecutionError>,
2100        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2101        #[cfg(feature = "verify_determinism")] stateful: bool,
2102        has_invalidator: bool,
2103    ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2104        let mut task = ctx.task(task_id, TaskDataCategory::All);
2105        let is_recomputation = task.is_dirty().is_none();
2106        // Without dependency tracking, the SessionDependent dirty state is never read (no session
2107        // restore), so skip the work
2108        let is_session_dependent = self.should_track_dependencies()
2109            && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2110        let Some(in_progress) = task.get_in_progress_mut() else {
2111            panic!("Task execution completed, but task is not in progress: {task:#?}");
2112        };
2113        if matches!(in_progress, InProgressState::Canceled) {
2114            return Ok(TaskExecutionCompletePrepareResult {
2115                new_children: Default::default(),
2116                is_now_immutable: false,
2117                #[cfg(feature = "verify_determinism")]
2118                no_output_set: false,
2119                #[cfg(feature = "task_dirty_cause")]
2120                function_id: None,
2121                new_output: None,
2122                output_dependent_tasks: Default::default(),
2123                is_recomputation,
2124                is_session_dependent,
2125            });
2126        }
2127        let &mut InProgressState::InProgress(box InProgressStateInner {
2128            stale,
2129            ref mut new_children,
2130            once_task: is_once_task,
2131            ..
2132        }) = in_progress
2133        else {
2134            panic!("Task execution completed, but task is not in progress: {task:#?}");
2135        };
2136
2137        // If the task is stale, reschedule it
2138        #[cfg(not(feature = "no_fast_stale"))]
2139        if stale && !is_once_task {
2140            let stale_priority = compute_stale_priority(&task);
2141            let Some(InProgressState::InProgress(box InProgressStateInner {
2142                done_event,
2143                mut new_children,
2144                ..
2145            })) = task.take_in_progress()
2146            else {
2147                unreachable!();
2148            };
2149            let old = task.set_in_progress(InProgressState::Scheduled {
2150                done_event,
2151                reason: TaskExecutionReason::Stale,
2152            });
2153            debug_assert!(old.is_none(), "InProgress already exists");
2154            // Remove old children from new_children to leave only the children that had their
2155            // active count increased
2156            for task in task.iter_children() {
2157                new_children.remove(&task);
2158            }
2159            drop(task);
2160
2161            // We need to undo the active count increase for the children since we throw away the
2162            // new_children list now.
2163            AggregationUpdateQueue::run(
2164                AggregationUpdateJob::DecreaseActiveCounts {
2165                    task_ids: new_children.into_iter().collect(),
2166                },
2167                ctx,
2168            );
2169            return Err(stale_priority);
2170        }
2171
2172        // take the children from the task to process them
2173        let mut new_children = take(new_children);
2174
2175        // get the function id for the dirty cause
2176        #[cfg(feature = "task_dirty_cause")]
2177        let function_id = match task.get_task_type() {
2178            TaskTypeRef::Cached(task_type) => {
2179                Some(turbo_tasks::registry::get_function_id(task_type.native_fn))
2180            }
2181            TaskTypeRef::Transient(_) => None,
2182        };
2183
2184        // handle stateful (only tracked when verify_determinism is enabled)
2185        #[cfg(feature = "verify_determinism")]
2186        if stateful {
2187            task.set_stateful(true);
2188        }
2189
2190        // handle has_invalidator
2191        if has_invalidator {
2192            task.set_invalidator(true);
2193        }
2194
2195        // handle cell counters: update max index and remove cells that are no longer used
2196        // On error, skip this update: the task may have failed before creating all cells it
2197        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2198        // based on partial counters would cause "cell no longer exists" errors for tasks that
2199        // still hold dependencies on those cells. The old cell data is preserved on error
2200        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2201        // that data is correct.
2202        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2203        // skips cell data removal on error.
2204        if result.is_ok() || is_recomputation {
2205            let old_counters: FxHashMap<_, _> = task
2206                .iter_cell_type_max_index()
2207                .map(|(&k, &v)| (k, v))
2208                .collect();
2209            let mut counters_to_remove = old_counters.clone();
2210
2211            for (&cell_type, &max_index) in cell_counters.iter() {
2212                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2213                    if old_max_index != max_index {
2214                        task.insert_cell_type_max_index(cell_type, max_index);
2215                    }
2216                } else {
2217                    task.insert_cell_type_max_index(cell_type, max_index);
2218                }
2219            }
2220            for (cell_type, _) in counters_to_remove {
2221                task.remove_cell_type_max_index(&cell_type);
2222            }
2223        }
2224
2225        let mut queue = AggregationUpdateQueue::new();
2226
2227        let mut old_edges = Vec::new();
2228
2229        let has_children = !new_children.is_empty();
2230        let is_immutable = task.immutable();
2231        let task_dependencies_for_immutable =
2232            // Task was previously marked as immutable
2233            if !is_immutable
2234            // Task is not session dependent (session dependent tasks can change between sessions)
2235            && !is_session_dependent
2236            // Task has no invalidator
2237            && !task.invalidator()
2238            // Task has no dependencies on collectibles
2239            && task.is_collectibles_dependencies_empty()
2240        {
2241            Some(
2242                // Collect all dependencies on tasks to check if all dependencies are immutable
2243                task.iter_output_dependencies()
2244                    .chain(task.iter_cell_dependencies().map(|dep| dep.cell_ref().task))
2245                    .collect::<FxHashSet<_>>(),
2246            )
2247        } else {
2248            None
2249        };
2250
2251        if has_children {
2252            // Prepare all new children
2253            let _aggregation_number =
2254                prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2255
2256            #[cfg(feature = "trace_task_details")]
2257            span.record("aggregation_number", _aggregation_number);
2258
2259            // Filter actual new children
2260            old_edges.extend(
2261                task.iter_children()
2262                    .filter(|task| !new_children.remove(task))
2263                    .map(OutdatedEdge::Child),
2264            );
2265        } else {
2266            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2267        }
2268
2269        old_edges.extend(
2270            task.iter_outdated_collectibles()
2271                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2272        );
2273
2274        if self.should_track_dependencies() {
2275            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2276            // At execution start, active deps are copied to outdated as a "before" snapshot.
2277            // During execution, new deps are added to active.
2278            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2279            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2280            // breaking dependency tracking.
2281            old_edges.extend(
2282                task.iter_outdated_cell_dependencies()
2283                    .map(OutdatedEdge::CellDependency),
2284            );
2285            old_edges.extend(
2286                task.iter_outdated_output_dependencies()
2287                    .map(OutdatedEdge::OutputDependency),
2288            );
2289        }
2290
2291        // Check if output need to be updated
2292        let current_output = task.get_output();
2293        #[cfg(feature = "verify_determinism")]
2294        let no_output_set = current_output.is_none();
2295        let new_output = match result {
2296            Ok(RawVc::TaskOutput(output_task_id)) => {
2297                if let Some(OutputValue::Output(current_task_id)) = current_output
2298                    && *current_task_id == output_task_id
2299                {
2300                    None
2301                } else {
2302                    Some(OutputValue::Output(output_task_id))
2303                }
2304            }
2305            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2306                if let Some(OutputValue::Cell(CellRef {
2307                    task: current_task_id,
2308                    cell: current_cell,
2309                })) = current_output
2310                    && *current_task_id == output_task_id
2311                    && *current_cell == cell
2312                {
2313                    None
2314                } else {
2315                    Some(OutputValue::Cell(CellRef {
2316                        task: output_task_id,
2317                        cell,
2318                    }))
2319                }
2320            }
2321            Ok(RawVc::LocalOutput(..)) => {
2322                panic!("Non-local tasks must not return a local Vc");
2323            }
2324            Err(err) => {
2325                if let Some(OutputValue::Error(old_error)) = current_output
2326                    && **old_error == err
2327                {
2328                    None
2329                } else {
2330                    Some(OutputValue::Error(Arc::new((&err).into())))
2331                }
2332            }
2333        };
2334        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2335        // When output has changed, grab the dependent tasks
2336        if new_output.is_some() && ctx.should_track_dependencies() {
2337            output_dependent_tasks = task.iter_output_dependent().collect();
2338        }
2339
2340        drop(task);
2341
2342        // Check if the task can be marked as immutable
2343        let mut is_now_immutable = false;
2344        if let Some(dependencies) = task_dependencies_for_immutable
2345            && dependencies
2346                .iter()
2347                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2348        {
2349            is_now_immutable = true;
2350        }
2351        #[cfg(feature = "trace_task_details")]
2352        span.record("immutable", is_immutable || is_now_immutable);
2353
2354        if !queue.is_empty() || !old_edges.is_empty() {
2355            #[cfg(any(
2356                feature = "trace_task_completion",
2357                feature = "trace_aggregation_update_stats"
2358            ))]
2359            let _span =
2360                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2361                    .entered();
2362            // Remove outdated edges first, before removing in_progress+dirty flag.
2363            // We need to make sure all outdated edges are removed before the task can potentially
2364            // be scheduled and executed again
2365            #[cfg(feature = "trace_aggregation_update_stats")]
2366            {
2367                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2368                _span.record("stats", tracing::field::debug(stats));
2369            }
2370            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2371            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2372        }
2373
2374        Ok(TaskExecutionCompletePrepareResult {
2375            new_children,
2376            is_now_immutable,
2377            #[cfg(feature = "verify_determinism")]
2378            no_output_set,
2379            #[cfg(feature = "task_dirty_cause")]
2380            function_id,
2381            new_output,
2382            output_dependent_tasks,
2383            is_recomputation,
2384            is_session_dependent,
2385        })
2386    }
2387
2388    fn task_execution_completed_invalidate_output_dependent(
2389        &self,
2390        ctx: &mut impl ExecuteContext<'_>,
2391        task_id: TaskId,
2392        #[cfg(feature = "task_dirty_cause")] function_id: Option<FunctionId>,
2393        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2394    ) {
2395        debug_assert!(!output_dependent_tasks.is_empty());
2396
2397        #[cfg(feature = "task_dirty_cause")]
2398        let cause = match function_id {
2399            Some(function) => TaskDirtyCause::OutputChange { function },
2400            None => TaskDirtyCause::RootOutputChange,
2401        };
2402
2403        if output_dependent_tasks.len() > 1 {
2404            ctx.prepare_tasks(
2405                output_dependent_tasks
2406                    .iter()
2407                    .map(|&id| (id, TaskDataCategory::All)),
2408                "invalidate output dependents",
2409            );
2410        }
2411
2412        fn process_output_dependents(
2413            ctx: &mut impl ExecuteContext<'_>,
2414            task_id: TaskId,
2415            #[cfg(feature = "task_dirty_cause")] cause: &TaskDirtyCause,
2416            dependent_task_id: TaskId,
2417            queue: &mut AggregationUpdateQueue,
2418        ) {
2419            #[cfg(feature = "trace_task_output_dependencies")]
2420            let span = tracing::trace_span!(
2421                "invalidate output dependency",
2422                task = %task_id,
2423                dependent_task = %dependent_task_id,
2424                result = tracing::field::Empty,
2425            )
2426            .entered();
2427            let mut make_stale = true;
2428            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2429            let transient_task_type = dependent.get_transient_task_type();
2430            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2431                // once tasks are never invalidated
2432                #[cfg(feature = "trace_task_output_dependencies")]
2433                span.record("result", "once task");
2434                return;
2435            }
2436            if dependent.outdated_output_dependencies_contains(&task_id) {
2437                #[cfg(feature = "trace_task_output_dependencies")]
2438                span.record("result", "outdated dependency");
2439                // output dependency is outdated, so it hasn't read the output yet
2440                // and doesn't need to be invalidated
2441                // But importantly we still need to make the task dirty as it should no longer
2442                // be considered as "recomputation".
2443                make_stale = false;
2444            } else if !dependent.output_dependencies_contains(&task_id) {
2445                // output dependency has been removed, so the task doesn't depend on the
2446                // output anymore and doesn't need to be invalidated
2447                #[cfg(feature = "trace_task_output_dependencies")]
2448                span.record("result", "no backward dependency");
2449                return;
2450            }
2451            make_task_dirty_internal(
2452                dependent,
2453                dependent_task_id,
2454                make_stale,
2455                #[cfg(feature = "task_dirty_cause")]
2456                cause.clone(),
2457                queue,
2458                ctx,
2459            );
2460            #[cfg(feature = "trace_task_output_dependencies")]
2461            span.record("result", "marked dirty");
2462        }
2463
2464        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2465            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2466            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2467            let _ = scope_and_block(chunks.len(), |scope| {
2468                for chunk in chunks {
2469                    let child_ctx = ctx.child_context();
2470                    #[cfg(feature = "task_dirty_cause")]
2471                    let cause = &cause;
2472                    scope.spawn(move || {
2473                        let mut ctx = child_ctx.create();
2474                        let mut queue = AggregationUpdateQueue::new();
2475                        for dependent_task_id in chunk {
2476                            process_output_dependents(
2477                                &mut ctx,
2478                                task_id,
2479                                #[cfg(feature = "task_dirty_cause")]
2480                                cause,
2481                                dependent_task_id,
2482                                &mut queue,
2483                            )
2484                        }
2485                        queue.execute(&mut ctx);
2486                    });
2487                }
2488            });
2489        } else {
2490            let mut queue = AggregationUpdateQueue::new();
2491            for dependent_task_id in output_dependent_tasks {
2492                process_output_dependents(
2493                    ctx,
2494                    task_id,
2495                    #[cfg(feature = "task_dirty_cause")]
2496                    &cause,
2497                    dependent_task_id,
2498                    &mut queue,
2499                );
2500            }
2501            queue.execute(ctx);
2502        }
2503    }
2504
2505    fn task_execution_completed_unfinished_children_dirty(
2506        &self,
2507        ctx: &mut impl ExecuteContext<'_>,
2508        new_children: &FxHashSet<TaskId>,
2509    ) {
2510        debug_assert!(!new_children.is_empty());
2511
2512        let mut queue = AggregationUpdateQueue::new();
2513        ctx.for_each_task_all(
2514            new_children.iter().copied(),
2515            "unfinished children dirty",
2516            |child_task, ctx| {
2517                if !child_task.has_output() {
2518                    let child_id = child_task.id();
2519                    make_task_dirty_internal(
2520                        child_task,
2521                        child_id,
2522                        false,
2523                        #[cfg(feature = "task_dirty_cause")]
2524                        TaskDirtyCause::InitialDirty,
2525                        &mut queue,
2526                        ctx,
2527                    );
2528                }
2529            },
2530        );
2531
2532        queue.execute(ctx);
2533    }
2534
2535    fn task_execution_completed_connect(
2536        &self,
2537        ctx: &mut impl ExecuteContext<'_>,
2538        task_id: TaskId,
2539        new_children: FxHashSet<TaskId>,
2540    ) -> Option<TaskPriority> {
2541        debug_assert!(!new_children.is_empty());
2542
2543        let mut task = ctx.task(task_id, TaskDataCategory::All);
2544        let Some(in_progress) = task.get_in_progress() else {
2545            panic!("Task execution completed, but task is not in progress: {task:#?}");
2546        };
2547        if matches!(in_progress, InProgressState::Canceled) {
2548            // Task was canceled in the meantime, so we don't connect the children
2549            return None;
2550        }
2551        let InProgressState::InProgress(box InProgressStateInner {
2552            #[cfg(not(feature = "no_fast_stale"))]
2553            stale,
2554            once_task: is_once_task,
2555            ..
2556        }) = in_progress
2557        else {
2558            panic!("Task execution completed, but task is not in progress: {task:#?}");
2559        };
2560
2561        // If the task is stale, reschedule it
2562        #[cfg(not(feature = "no_fast_stale"))]
2563        if *stale && !is_once_task {
2564            let stale_priority = compute_stale_priority(&task);
2565            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2566                task.take_in_progress()
2567            else {
2568                unreachable!();
2569            };
2570            let old = task.set_in_progress(InProgressState::Scheduled {
2571                done_event,
2572                reason: TaskExecutionReason::Stale,
2573            });
2574            debug_assert!(old.is_none(), "InProgress already exists");
2575            drop(task);
2576
2577            // All `new_children` are currently hold active with an active count and we need to undo
2578            // that. (We already filtered out the old children from that list)
2579            AggregationUpdateQueue::run(
2580                AggregationUpdateJob::DecreaseActiveCounts {
2581                    task_ids: new_children.into_iter().collect(),
2582                },
2583                ctx,
2584            );
2585            return Some(stale_priority);
2586        }
2587
2588        let has_active_count = ctx.should_track_activeness()
2589            && task
2590                .get_activeness()
2591                .is_some_and(|activeness| activeness.active_counter > 0);
2592        connect_children(
2593            ctx,
2594            task_id,
2595            task,
2596            new_children,
2597            has_active_count,
2598            ctx.should_track_activeness(),
2599        );
2600
2601        None
2602    }
2603
2604    #[allow(clippy::type_complexity)]
2605    fn task_execution_completed_finish(
2606        &self,
2607        ctx: &mut impl ExecuteContext<'_>,
2608        task_id: TaskId,
2609        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2610        new_output: Option<OutputValue>,
2611        is_now_immutable: bool,
2612        is_session_dependent: bool,
2613    ) -> (
2614        Option<TaskPriority>,
2615        Option<
2616            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2617        >,
2618    ) {
2619        let mut task = ctx.task(task_id, TaskDataCategory::All);
2620        let Some(in_progress) = task.take_in_progress() else {
2621            panic!("Task execution completed, but task is not in progress: {task:#?}");
2622        };
2623        if matches!(in_progress, InProgressState::Canceled) {
2624            // Task was canceled in the meantime, so we don't finish it
2625            return (None, None);
2626        }
2627        let InProgressState::InProgress(box InProgressStateInner {
2628            done_event,
2629            once_task: is_once_task,
2630            stale,
2631            marked_as_completed: _,
2632            new_children,
2633        }) = in_progress
2634        else {
2635            panic!("Task execution completed, but task is not in progress: {task:#?}");
2636        };
2637        debug_assert!(new_children.is_empty());
2638
2639        // If the task is stale, reschedule it
2640        if stale && !is_once_task {
2641            let stale_priority = compute_stale_priority(&task);
2642            let old = task.set_in_progress(InProgressState::Scheduled {
2643                done_event,
2644                reason: TaskExecutionReason::Stale,
2645            });
2646            debug_assert!(old.is_none(), "InProgress already exists");
2647            return (Some(stale_priority), None);
2648        }
2649
2650        // Set the output if it has changed
2651        let mut old_content = None;
2652        if let Some(value) = new_output {
2653            old_content = task.set_output(value);
2654        }
2655
2656        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2657        // to be invalidated and we can mark it as immutable.
2658        if is_now_immutable {
2659            task.set_immutable(true);
2660        }
2661
2662        // Notify in progress cells and remove all of them
2663        let in_progress_cells = task.take_in_progress_cells();
2664        if let Some(ref cells) = in_progress_cells {
2665            for state in cells.values() {
2666                state.event.notify(usize::MAX);
2667            }
2668        }
2669
2670        // Compute and apply the new dirty state, propagating to aggregating ancestors
2671        let new_dirtyness = if is_session_dependent {
2672            Some(Dirtyness::SessionDependent)
2673        } else {
2674            None
2675        };
2676        #[cfg(feature = "verify_determinism")]
2677        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2678        let data_update = task.update_dirty_state(new_dirtyness);
2679
2680        // Under verify_determinism we re-run a task whose dirty state or output unexpectedly
2681        // changed during execution to confirm determinism. The leaf priority keeps these
2682        // verification reruns at the highest priority.
2683        #[cfg(feature = "verify_determinism")]
2684        let stale_priority: Option<TaskPriority> =
2685            ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2686                .then(TaskPriority::leaf);
2687        #[cfg(not(feature = "verify_determinism"))]
2688        let stale_priority: Option<TaskPriority> = None;
2689        if stale_priority.is_some() {
2690            let old = task.set_in_progress(InProgressState::Scheduled {
2691                done_event,
2692                reason: TaskExecutionReason::Stale,
2693            });
2694            debug_assert!(old.is_none(), "InProgress already exists");
2695            drop(task);
2696        } else {
2697            drop(task);
2698
2699            // Notify dependent tasks that are waiting for this task to finish
2700            done_event.notify(usize::MAX);
2701        }
2702
2703        drop(old_content);
2704
2705        if let Some(data_update) = data_update {
2706            AggregationUpdateQueue::run(data_update, ctx);
2707        }
2708
2709        // We return so the data can be dropped outside of critical sections
2710        (stale_priority, in_progress_cells)
2711    }
2712
2713    fn task_execution_completed_cleanup(
2714        &self,
2715        ctx: &mut impl ExecuteContext<'_>,
2716        task_id: TaskId,
2717        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2718        is_error: bool,
2719        is_recomputation: bool,
2720    ) -> Vec<SharedReference> {
2721        let mut task = ctx.task(task_id, TaskDataCategory::All);
2722        let mut removed_cell_data = Vec::new();
2723        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2724        // after an error as it is likely transient and we want to keep the dependent tasks
2725        // clean to avoid re-executions.
2726        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2727        // skips cell_type_max_index updates on error.
2728        if !is_error || is_recomputation {
2729            // Remove no longer existing cells and
2730            // find all outdated data items (removed cells, outdated edges)
2731            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2732            // anyway and we want to avoid needless re-executions. When the cells become
2733            // used again, they are invalidated from the update cell operation.
2734            let to_remove: Vec<_> = task
2735                .iter_cell_data()
2736                .filter_map(|(cell, _)| {
2737                    cell_counters
2738                        .get(&cell.type_id)
2739                        .is_none_or(|start_index| cell.index >= *start_index)
2740                        .then_some(*cell)
2741                })
2742                .collect();
2743            removed_cell_data.reserve_exact(to_remove.len());
2744            for cell in to_remove {
2745                if let Some(data) = task.remove_cell_data(&cell) {
2746                    removed_cell_data.push(data);
2747                }
2748            }
2749            // Remove cell_data_hash entries for cells that no longer exist
2750            let to_remove_hash: Vec<_> = task
2751                .iter_cell_data_hash()
2752                .filter_map(|(cell, _)| {
2753                    cell_counters
2754                        .get(&cell.type_id)
2755                        .is_none_or(|start_index| cell.index >= *start_index)
2756                        .then_some(*cell)
2757                })
2758                .collect();
2759            for cell in to_remove_hash {
2760                task.remove_cell_data_hash(&cell);
2761            }
2762        }
2763
2764        // Clean up task storage after execution:
2765        // - Shrink collections marked with shrink_on_completion
2766        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2767        task.cleanup_after_execution();
2768
2769        drop(task);
2770
2771        // Return so we can drop outside of critical sections
2772        removed_cell_data
2773    }
2774
2775    /// Prints the standard message emitted when the background persisting process stops due to an
2776    /// unrecoverable write error. The caller is responsible for returning from the background job.
2777    fn log_unrecoverable_persist_error() {
2778        eprintln!(
2779            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2780             background persisting process."
2781        );
2782    }
2783
2784    fn run_backend_job<'a>(
2785        self: &'a Arc<Self>,
2786        job: TurboTasksBackendJob,
2787        turbo_tasks: &'a TurboTasks<TurboTasksBackend<B>>,
2788    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2789        Box::pin(async move {
2790            match job {
2791                TurboTasksBackendJob::Snapshot => {
2792                    debug_assert!(self.should_persist());
2793
2794                    let mut last_snapshot = self.start_time;
2795                    let mut idle_start_listener = self.idle_start_event.listen();
2796                    let mut idle_end_listener = self.idle_end_event.listen();
2797                    // Whether to immediately set an idle timeout if possible.
2798                    // Set to false if we don't persist anything in a cycle.
2799                    let mut fresh_idle = true;
2800                    let mut evicted = false;
2801                    let mut is_first = true;
2802                    'outer: loop {
2803                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2804                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2805                        let idle_timeout = *IDLE_TIMEOUT;
2806                        let (time, mut reason) = if is_first {
2807                            (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2808                        } else {
2809                            (SNAPSHOT_INTERVAL, "regular snapshot interval")
2810                        };
2811
2812                        let until = last_snapshot + time;
2813                        if until > Instant::now() {
2814                            let mut stop_listener = self.stopping_event.listen();
2815                            if self.stopping.load(Ordering::Acquire) {
2816                                return;
2817                            }
2818                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2819                                Instant::now() + idle_timeout
2820                            } else {
2821                                far_future()
2822                            };
2823                            loop {
2824                                tokio::select! {
2825                                    _ = &mut stop_listener => {
2826                                        return;
2827                                    },
2828                                    _ = &mut idle_start_listener => {
2829                                        idle_time = Instant::now() + idle_timeout;
2830                                        idle_start_listener = self.idle_start_event.listen()
2831                                    },
2832                                    _ = &mut idle_end_listener => {
2833                                        idle_time = far_future();
2834                                        idle_end_listener = self.idle_end_event.listen()
2835                                    },
2836                                    _ = tokio::time::sleep_until(until) => {
2837                                        break;
2838                                    },
2839                                    _ = tokio::time::sleep_until(idle_time) => {
2840                                        if turbo_tasks.is_idle() {
2841                                            reason = "idle timeout";
2842                                            break;
2843                                        }
2844                                    },
2845                                }
2846                            }
2847                        }
2848
2849                        let this = self.clone();
2850                        // Create a root span shared by both the snapshot/persist
2851                        // work and the subsequent compaction so they appear
2852                        // grouped together in trace viewers.
2853                        let background_span =
2854                            tracing::info_span!(parent: None, "background snapshot");
2855                        match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2856                            Err(err) => {
2857                                // save_snapshot consumed persisted_task_cache_log entries;
2858                                // further snapshots would corrupt the task graph.
2859                                eprintln!("Persisting failed: {err:?}");
2860                                Self::log_unrecoverable_persist_error();
2861                                return;
2862                            }
2863                            Ok((snapshot_start, new_data)) => {
2864                                // if we see 'new_data' then the next idle transition is 'fresh'
2865                                fresh_idle = new_data;
2866                                is_first = false;
2867                                last_snapshot = snapshot_start;
2868
2869                                // Polls the idle-end event without blocking. Returns
2870                                // `true` and refreshes the listener if idle has ended,
2871                                // `false` if we are still idle.
2872                                macro_rules! check_idle_ended {
2873                                    () => {{
2874                                        tokio::select! {
2875                                            biased;
2876                                            _ = &mut idle_end_listener => {
2877                                                idle_end_listener = self.idle_end_event.listen();
2878                                                true
2879                                            },
2880                                            _ = std::future::ready(()) => false,
2881                                        }
2882                                    }};
2883                                }
2884                                // Evict persisted tasks from memory to reclaim space.
2885                                // Like compaction, this runs after snapshot_and_persist
2886                                // as a separate concern.
2887                                //
2888                                // TODO: improve eviction policy — current approach is a full sweep
2889                                // after every snapshot. Better strategies to consider:
2890                                //   - Memory pressure signals: only evict when RSS exceeds a
2891                                //     threshold rather than unconditionally.
2892                                //   - Recency data: track last-access time per task and evict
2893                                //     least-recently-used entries first rather than all at once.
2894                                //   - Eviction intensity: partial sweeps (evict a fraction of
2895                                //     eligible tasks per cycle) to reduce latency spikes.
2896                                // Evict when there is new data to persist (the common
2897                                // case) or on the very first snapshot after startup
2898                                // (data was already on disk from a prior run, so
2899                                // new_data may be false but in-memory state can still
2900                                // be evicted).
2901                                let mut ran_eviction = false;
2902                                if this.should_evict() && (new_data || !evicted) {
2903                                    if check_idle_ended!() {
2904                                        // need to start all the way over so we catch the next
2905                                        // signal
2906                                        continue 'outer;
2907                                    }
2908                                    evicted = true;
2909                                    ran_eviction = true;
2910                                    this.storage.evict_after_snapshot(background_span.id());
2911                                }
2912
2913                                // Compact while idle (up to limit), regardless of
2914                                // whether the snapshot had new data.
2915                                // `background_span` is not entered here because
2916                                // `EnteredSpan` is `!Send` and would prevent the
2917                                // future from being sent across threads when it
2918                                // suspends at the `select!` await below.
2919                                let mut ran_compaction = false;
2920                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2921                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2922                                    if check_idle_ended!() {
2923                                        continue 'outer;
2924                                    }
2925                                    // Enter the span only around the synchronous
2926                                    // compact() call so we never hold an
2927                                    // `EnteredSpan` across an await point.
2928                                    let _compact_span = tracing::info_span!(
2929                                        parent: background_span.id(),
2930                                        "compact database"
2931                                    )
2932                                    .entered();
2933                                    match self.backing_storage.compact() {
2934                                        Ok(true) => {
2935                                            ran_compaction = true;
2936                                        }
2937                                        Ok(false) => break,
2938                                        Err(err) => {
2939                                            eprintln!("Compaction failed: {err:?}");
2940                                            if self.backing_storage.has_unrecoverable_write_error()
2941                                            {
2942                                                Self::log_unrecoverable_persist_error();
2943                                                return;
2944                                            }
2945                                            break;
2946                                        }
2947                                    }
2948                                }
2949                                if check_idle_ended!() {
2950                                    continue 'outer;
2951                                }
2952                                // After running snapshotting/eviction/compaction we have churned a
2953                                // _lot_ of memory if we are still
2954                                // idle tell `mimalloc` that now would be a good time to release
2955                                // memory back to the OS
2956                                if new_data || ran_compaction || ran_eviction {
2957                                    turbo_tasks_malloc::TurboMalloc::collect(true);
2958                                }
2959                            }
2960                        }
2961                    }
2962                }
2963            }
2964        })
2965    }
2966
2967    fn try_read_own_task_cell(
2968        &self,
2969        task_id: TaskId,
2970        cell: CellId,
2971        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
2972    ) -> Result<TypedCellContent> {
2973        let mut ctx = self.execute_context(turbo_tasks);
2974        let task = ctx.task(task_id, TaskDataCategory::Data);
2975        if let Some(content) = task.get_cell_data(&cell).cloned() {
2976            Ok(CellContent(Some(content)).into_typed(cell.type_id))
2977        } else {
2978            Ok(CellContent(None).into_typed(cell.type_id))
2979        }
2980    }
2981
2982    fn read_task_collectibles(
2983        &self,
2984        task_id: TaskId,
2985        collectible_type: TraitTypeId,
2986        reader_id: Option<TaskId>,
2987        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
2988    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2989        let mut ctx = self.execute_context(turbo_tasks);
2990        let mut collectibles = AutoMap::default();
2991        {
2992            let mut task = ctx.task(task_id, TaskDataCategory::All);
2993            if task
2994                .get_persistent_task_type()
2995                .is_some_and(|t| !t.native_fn.is_root)
2996            {
2997                drop(task);
2998                panic!(
2999                    "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
3000                     is missing on the task.",
3001                    self.debug_get_task_description(task_id),
3002                    reader_id.map_or_else(
3003                        || "unknown".to_string(),
3004                        |r| self.debug_get_task_description(r)
3005                    )
3006                );
3007            }
3008            for (collectible, count) in task.iter_aggregated_collectibles() {
3009                if *count > 0 && collectible.collectible_type == collectible_type {
3010                    *collectibles
3011                        .entry(RawVc::TaskCell(
3012                            collectible.cell.task,
3013                            collectible.cell.cell,
3014                        ))
3015                        .or_insert(0) += 1;
3016                }
3017            }
3018            for (&collectible, &count) in task.iter_collectibles() {
3019                if collectible.collectible_type == collectible_type {
3020                    *collectibles
3021                        .entry(RawVc::TaskCell(
3022                            collectible.cell.task,
3023                            collectible.cell.cell,
3024                        ))
3025                        .or_insert(0) += count;
3026                }
3027            }
3028            if let Some(reader_id) = reader_id {
3029                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3030            }
3031        }
3032        if let Some(reader_id) = reader_id {
3033            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3034            let target = CollectiblesRef {
3035                task: task_id,
3036                collectible_type,
3037            };
3038            if !reader.remove_outdated_collectibles_dependencies(&target) {
3039                let _ = reader.add_collectibles_dependencies(target);
3040            }
3041        }
3042        collectibles
3043    }
3044
3045    fn emit_collectible(
3046        &self,
3047        collectible_type: TraitTypeId,
3048        collectible: RawVc,
3049        task_id: TaskId,
3050        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3051    ) {
3052        self.assert_valid_collectible(task_id, collectible);
3053
3054        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3055            panic!("Collectibles need to be resolved");
3056        };
3057        let cell = CellRef {
3058            task: collectible_task,
3059            cell,
3060        };
3061        operation::UpdateCollectibleOperation::run(
3062            task_id,
3063            CollectibleRef {
3064                collectible_type,
3065                cell,
3066            },
3067            1,
3068            self.execute_context(turbo_tasks),
3069        );
3070    }
3071
3072    fn unemit_collectible(
3073        &self,
3074        collectible_type: TraitTypeId,
3075        collectible: RawVc,
3076        count: u32,
3077        task_id: TaskId,
3078        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3079    ) {
3080        self.assert_valid_collectible(task_id, collectible);
3081
3082        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3083            panic!("Collectibles need to be resolved");
3084        };
3085        let cell = CellRef {
3086            task: collectible_task,
3087            cell,
3088        };
3089        operation::UpdateCollectibleOperation::run(
3090            task_id,
3091            CollectibleRef {
3092                collectible_type,
3093                cell,
3094            },
3095            -(i32::try_from(count).unwrap()),
3096            self.execute_context(turbo_tasks),
3097        );
3098    }
3099
3100    fn update_task_cell(
3101        &self,
3102        task_id: TaskId,
3103        cell: CellId,
3104        content: CellContent,
3105        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3106        content_hash: Option<CellHash>,
3107        verification_mode: VerificationMode,
3108        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3109    ) {
3110        operation::UpdateCellOperation::run(
3111            task_id,
3112            cell,
3113            content,
3114            updated_key_hashes,
3115            content_hash,
3116            verification_mode,
3117            self.execute_context(turbo_tasks),
3118        );
3119    }
3120
3121    fn mark_own_task_as_finished(
3122        &self,
3123        task: TaskId,
3124        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3125    ) {
3126        let mut ctx = self.execute_context(turbo_tasks);
3127        let mut task = ctx.task(task, TaskDataCategory::Data);
3128        if let Some(InProgressState::InProgress(box InProgressStateInner {
3129            marked_as_completed,
3130            ..
3131        })) = task.get_in_progress_mut()
3132        {
3133            *marked_as_completed = true;
3134            // TODO this should remove the dirty state (also check session_dependent)
3135            // but this would break some assumptions for strongly consistent reads.
3136            // Client tasks are not connected yet, so we wouldn't wait for them.
3137            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3138        }
3139    }
3140
3141    fn connect_task(
3142        &self,
3143        task: TaskId,
3144        parent_task: Option<TaskId>,
3145        turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3146    ) {
3147        self.assert_not_persistent_calling_transient(parent_task, task, None);
3148        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3149    }
3150
3151    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3152        let task_id = self.transient_task_id_factory.get();
3153        {
3154            let mut task = self.storage.access_mut(task_id);
3155            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3156        }
3157        #[cfg(feature = "verify_aggregation_graph")]
3158        self.root_tasks.lock().insert(task_id);
3159        task_id
3160    }
3161
3162    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
3163        #[cfg(feature = "verify_aggregation_graph")]
3164        self.root_tasks.lock().remove(&task_id);
3165
3166        let mut ctx = self.execute_context(turbo_tasks);
3167        let mut task = ctx.task(task_id, TaskDataCategory::All);
3168        let is_dirty = task.is_dirty();
3169        let has_dirty_containers = task.has_dirty_containers();
3170        if is_dirty.is_some() || has_dirty_containers {
3171            if let Some(activeness_state) = task.get_activeness_mut() {
3172                // We will finish the task, but it would be removed after the task is done
3173                activeness_state.unset_root_type();
3174                activeness_state.set_active_until_clean();
3175            };
3176        } else if let Some(activeness_state) = task.take_activeness() {
3177            // Technically nobody should be listening to this event, but just in case
3178            // we notify it anyway
3179            activeness_state.all_clean_event.notify(usize::MAX);
3180        }
3181    }
3182
3183    #[cfg(feature = "verify_aggregation_graph")]
3184    fn verify_aggregation_graph(&self, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>, idle: bool) {
3185        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3186            return;
3187        }
3188        use std::{collections::VecDeque, env, io::stdout};
3189
3190        use crate::backend::operation::{get_uppers, is_aggregating_node};
3191
3192        let mut ctx = self.execute_context(turbo_tasks);
3193        let root_tasks = self.root_tasks.lock().clone();
3194
3195        for task_id in root_tasks.into_iter() {
3196            let mut queue = VecDeque::new();
3197            let mut visited = FxHashSet::default();
3198            let mut aggregated_nodes = FxHashSet::default();
3199            let mut collectibles = FxHashMap::default();
3200            let root_task_id = task_id;
3201            visited.insert(task_id);
3202            aggregated_nodes.insert(task_id);
3203            queue.push_back(task_id);
3204            let mut counter = 0;
3205            while let Some(task_id) = queue.pop_front() {
3206                counter += 1;
3207                if counter % 100000 == 0 {
3208                    println!(
3209                        "queue={}, visited={}, aggregated_nodes={}",
3210                        queue.len(),
3211                        visited.len(),
3212                        aggregated_nodes.len()
3213                    );
3214                }
3215                let task = ctx.task(task_id, TaskDataCategory::All);
3216                if idle && !self.is_idle.load(Ordering::Relaxed) {
3217                    return;
3218                }
3219
3220                let uppers = get_uppers(&task);
3221                if task_id != root_task_id
3222                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3223                {
3224                    panic!(
3225                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3226                         {:?})",
3227                        task_id,
3228                        task.get_task_description(),
3229                        uppers
3230                    );
3231                }
3232
3233                for (collectible, _) in task.iter_aggregated_collectibles() {
3234                    collectibles
3235                        .entry(*collectible)
3236                        .or_insert_with(|| (false, Vec::new()))
3237                        .1
3238                        .push(task_id);
3239                }
3240
3241                for (&collectible, &value) in task.iter_collectibles() {
3242                    if value > 0 {
3243                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3244                            *flag = true
3245                        } else {
3246                            panic!(
3247                                "Task {} has a collectible {:?} that is not in any upper task",
3248                                task_id, collectible
3249                            );
3250                        }
3251                    }
3252                }
3253
3254                let is_dirty = task.has_dirty();
3255                let has_dirty_container = task.has_dirty_containers();
3256                let should_be_in_upper = is_dirty || has_dirty_container;
3257
3258                let aggregation_number = get_aggregation_number(&task);
3259                if is_aggregating_node(aggregation_number) {
3260                    aggregated_nodes.insert(task_id);
3261                }
3262                // println!(
3263                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3264                //     ctx.get_task_description(task_id),
3265                //     uppers
3266                // );
3267
3268                for child_id in task.iter_children() {
3269                    // println!("{task_id}: child -> {child_id}");
3270                    if visited.insert(child_id) {
3271                        queue.push_back(child_id);
3272                    }
3273                }
3274                drop(task);
3275
3276                if should_be_in_upper {
3277                    for upper_id in uppers {
3278                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3279                        let in_upper = upper
3280                            .get_aggregated_dirty_containers(&task_id)
3281                            .is_some_and(|&dirty| dirty > 0);
3282                        if !in_upper {
3283                            let containers: Vec<_> = upper
3284                                .iter_aggregated_dirty_containers()
3285                                .map(|(&k, &v)| (k, v))
3286                                .collect();
3287                            let upper_task_desc = upper.get_task_description();
3288                            drop(upper);
3289                            panic!(
3290                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3291                                 ({})\nThese dirty containers are present:\n{:#?}",
3292                                task_id,
3293                                ctx.task(task_id, TaskDataCategory::Data)
3294                                    .get_task_description(),
3295                                upper_id,
3296                                upper_task_desc,
3297                                containers,
3298                            );
3299                        }
3300                    }
3301                }
3302            }
3303
3304            for (collectible, (flag, task_ids)) in collectibles {
3305                if !flag {
3306                    use std::io::Write;
3307                    let mut stdout = stdout().lock();
3308                    writeln!(
3309                        stdout,
3310                        "{:?} that is not emitted in any child task but in these aggregated \
3311                         tasks: {:#?}",
3312                        collectible,
3313                        task_ids
3314                            .iter()
3315                            .map(|t| format!(
3316                                "{t} {}",
3317                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3318                            ))
3319                            .collect::<Vec<_>>()
3320                    )
3321                    .unwrap();
3322
3323                    let task_id = collectible.cell.task;
3324                    let mut queue = {
3325                        let task = ctx.task(task_id, TaskDataCategory::All);
3326                        get_uppers(&task)
3327                    };
3328                    let mut visited = FxHashSet::default();
3329                    for &upper_id in queue.iter() {
3330                        visited.insert(upper_id);
3331                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3332                    }
3333                    while let Some(task_id) = queue.pop() {
3334                        let task = ctx.task(task_id, TaskDataCategory::All);
3335                        let desc = task.get_task_description();
3336                        let aggregated_collectible = task
3337                            .get_aggregated_collectibles(&collectible)
3338                            .copied()
3339                            .unwrap_or_default();
3340                        let uppers = get_uppers(&task);
3341                        drop(task);
3342                        writeln!(
3343                            stdout,
3344                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3345                        )
3346                        .unwrap();
3347                        if task_ids.contains(&task_id) {
3348                            writeln!(
3349                                stdout,
3350                                "Task has an upper connection to an aggregated task that doesn't \
3351                                 reference it. Upper connection is invalid!"
3352                            )
3353                            .unwrap();
3354                        }
3355                        for upper_id in uppers {
3356                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3357                            if !visited.contains(&upper_id) {
3358                                queue.push(upper_id);
3359                            }
3360                        }
3361                    }
3362                    panic!("See stdout for more details");
3363                }
3364            }
3365        }
3366    }
3367
3368    fn assert_not_persistent_calling_transient(
3369        &self,
3370        parent_id: Option<TaskId>,
3371        child_id: TaskId,
3372        cell_id: Option<CellId>,
3373    ) {
3374        if let Some(parent_id) = parent_id
3375            && !parent_id.is_transient()
3376            && child_id.is_transient()
3377        {
3378            self.panic_persistent_calling_transient(
3379                self.debug_get_task_description(parent_id),
3380                self.debug_get_cached_task_type(child_id).as_deref(),
3381                cell_id,
3382            );
3383        }
3384    }
3385
3386    fn panic_persistent_calling_transient(
3387        &self,
3388        parent: String,
3389        child: Option<&CachedTaskType>,
3390        cell_id: Option<CellId>,
3391    ) -> ! {
3392        let transient_reason = if let Some(child) = child {
3393            Cow::Owned(format!(
3394                " The callee is transient because it depends on:\n{}",
3395                self.debug_trace_transient_task(child, cell_id),
3396            ))
3397        } else {
3398            Cow::Borrowed("")
3399        };
3400        panic!(
3401            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3402            parent,
3403            child.map_or("unknown", |t| t.get_name()),
3404            transient_reason,
3405        );
3406    }
3407
3408    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3409        // these checks occur in a potentially hot codepath, but they're cheap
3410        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3411            // This should never happen: The collectible APIs use ResolvedVc
3412            let task_info = if let Some(col_task_ty) = collectible
3413                .try_get_task_id()
3414                .map(|t| self.debug_get_task_description(t))
3415            {
3416                Cow::Owned(format!(" (return type of {col_task_ty})"))
3417            } else {
3418                Cow::Borrowed("")
3419            };
3420            panic!("Collectible{task_info} must be a ResolvedVc")
3421        };
3422        if col_task_id.is_transient() && !task_id.is_transient() {
3423            let transient_reason =
3424                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3425                    Cow::Owned(format!(
3426                        ". The collectible is transient because it depends on:\n{}",
3427                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3428                    ))
3429                } else {
3430                    Cow::Borrowed("")
3431                };
3432            // this should never happen: How would a persistent function get a transient Vc?
3433            panic!(
3434                "Collectible is transient, transient collectibles cannot be emitted from \
3435                 persistent tasks{transient_reason}",
3436            )
3437        }
3438    }
3439}
3440
3441impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3442    fn startup(&self, turbo_tasks: &TurboTasks<Self>) {
3443        self.0.startup(turbo_tasks);
3444    }
3445
3446    fn stopping(&self, _turbo_tasks: &TurboTasks<Self>) {
3447        self.0.stopping();
3448    }
3449
3450    fn stop(&self, turbo_tasks: &TurboTasks<Self>) {
3451        self.0.stop(turbo_tasks);
3452    }
3453
3454    fn idle_start(&self, turbo_tasks: &TurboTasks<Self>) {
3455        self.0.idle_start(turbo_tasks);
3456    }
3457
3458    fn idle_end(&self, _turbo_tasks: &TurboTasks<Self>) {
3459        self.0.idle_end();
3460    }
3461
3462    fn get_or_create_task(
3463        &self,
3464        native_fn: &'static NativeFunction,
3465        this: Option<RawVc>,
3466        arg: &mut dyn DynTaskInputsStorage,
3467        parent_task: Option<TaskId>,
3468        persistence: TaskPersistence,
3469        turbo_tasks: &TurboTasks<Self>,
3470    ) -> TaskId {
3471        self.0
3472            .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3473    }
3474
3475    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3476        self.0.invalidate_task(task_id, turbo_tasks);
3477    }
3478
3479    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<Self>) {
3480        self.0.invalidate_tasks(tasks, turbo_tasks);
3481    }
3482
3483    fn invalidate_tasks_set(
3484        &self,
3485        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3486        turbo_tasks: &TurboTasks<Self>,
3487    ) {
3488        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3489    }
3490
3491    fn invalidate_serialization(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3492        self.0.invalidate_serialization(task_id, turbo_tasks);
3493    }
3494
3495    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) {
3496        self.0.task_execution_canceled(task, turbo_tasks)
3497    }
3498
3499    fn try_start_task_execution(
3500        &self,
3501        task_id: TaskId,
3502        priority: TaskPriority,
3503        turbo_tasks: &TurboTasks<Self>,
3504    ) -> Option<TaskExecutionSpec<'_>> {
3505        self.0
3506            .try_start_task_execution(task_id, priority, turbo_tasks)
3507    }
3508
3509    fn task_execution_completed(
3510        &self,
3511        task_id: TaskId,
3512        result: Result<RawVc, TurboTasksExecutionError>,
3513        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3514        #[cfg(feature = "verify_determinism")] stateful: bool,
3515        has_invalidator: bool,
3516        turbo_tasks: &TurboTasks<Self>,
3517    ) -> Option<TaskPriority> {
3518        self.0.task_execution_completed(
3519            task_id,
3520            result,
3521            cell_counters,
3522            #[cfg(feature = "verify_determinism")]
3523            stateful,
3524            has_invalidator,
3525            turbo_tasks,
3526        )
3527    }
3528
3529    type BackendJob = TurboTasksBackendJob;
3530
3531    fn run_backend_job<'a>(
3532        &'a self,
3533        job: Self::BackendJob,
3534        turbo_tasks: &'a TurboTasks<Self>,
3535    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3536        self.0.run_backend_job(job, turbo_tasks)
3537    }
3538
3539    fn try_read_task_output(
3540        &self,
3541        task_id: TaskId,
3542        reader: Option<TaskId>,
3543        options: ReadOutputOptions,
3544        turbo_tasks: &TurboTasks<Self>,
3545    ) -> Result<Result<RawVc, EventListener>> {
3546        self.0
3547            .try_read_task_output(task_id, reader, options, turbo_tasks)
3548    }
3549
3550    fn try_read_task_cell(
3551        &self,
3552        task_id: TaskId,
3553        cell: CellId,
3554        reader: Option<TaskId>,
3555        options: ReadCellOptions,
3556        turbo_tasks: &TurboTasks<Self>,
3557    ) -> Result<Result<TypedCellContent, EventListener>> {
3558        self.0
3559            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3560    }
3561
3562    fn try_read_own_task_cell(
3563        &self,
3564        task_id: TaskId,
3565        cell: CellId,
3566        turbo_tasks: &TurboTasks<Self>,
3567    ) -> Result<TypedCellContent> {
3568        self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3569    }
3570
3571    fn read_task_collectibles(
3572        &self,
3573        task_id: TaskId,
3574        collectible_type: TraitTypeId,
3575        reader: Option<TaskId>,
3576        turbo_tasks: &TurboTasks<Self>,
3577    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3578        self.0
3579            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3580    }
3581
3582    fn emit_collectible(
3583        &self,
3584        collectible_type: TraitTypeId,
3585        collectible: RawVc,
3586        task_id: TaskId,
3587        turbo_tasks: &TurboTasks<Self>,
3588    ) {
3589        self.0
3590            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3591    }
3592
3593    fn unemit_collectible(
3594        &self,
3595        collectible_type: TraitTypeId,
3596        collectible: RawVc,
3597        count: u32,
3598        task_id: TaskId,
3599        turbo_tasks: &TurboTasks<Self>,
3600    ) {
3601        self.0
3602            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3603    }
3604
3605    fn update_task_cell(
3606        &self,
3607        task_id: TaskId,
3608        cell: CellId,
3609        content: CellContent,
3610        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3611        content_hash: Option<CellHash>,
3612        verification_mode: VerificationMode,
3613        turbo_tasks: &TurboTasks<Self>,
3614    ) {
3615        self.0.update_task_cell(
3616            task_id,
3617            cell,
3618            content,
3619            updated_key_hashes,
3620            content_hash,
3621            verification_mode,
3622            turbo_tasks,
3623        );
3624    }
3625
3626    fn mark_own_task_as_finished(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3627        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3628    }
3629
3630    fn connect_task(
3631        &self,
3632        task: TaskId,
3633        parent_task: Option<TaskId>,
3634        turbo_tasks: &TurboTasks<Self>,
3635    ) {
3636        self.0.connect_task(task, parent_task, turbo_tasks);
3637    }
3638
3639    fn create_transient_task(
3640        &self,
3641        task_type: TransientTaskType,
3642        _turbo_tasks: &TurboTasks<Self>,
3643    ) -> TaskId {
3644        self.0.create_transient_task(task_type)
3645    }
3646
3647    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3648        self.0.dispose_root_task(task_id, turbo_tasks);
3649    }
3650
3651    fn task_statistics(&self) -> &TaskStatisticsApi {
3652        &self.0.task_statistics
3653    }
3654
3655    fn is_tracking_dependencies(&self) -> bool {
3656        self.0.options.dependency_tracking
3657    }
3658
3659    fn get_task_name(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) -> String {
3660        self.0.get_task_name(task, turbo_tasks)
3661    }
3662}
3663
3664enum DebugTraceTransientTask {
3665    Cached {
3666        task_name: &'static str,
3667        cell_type_id: Option<ValueTypeId>,
3668        cause_self: Option<Box<DebugTraceTransientTask>>,
3669        cause_args: Vec<DebugTraceTransientTask>,
3670    },
3671    /// This representation is used when this task is a duplicate of one previously shown
3672    Collapsed {
3673        task_name: &'static str,
3674        cell_type_id: Option<ValueTypeId>,
3675    },
3676    Uncached {
3677        cell_type_id: Option<ValueTypeId>,
3678    },
3679}
3680
3681impl DebugTraceTransientTask {
3682    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3683        let indent = "    ".repeat(level);
3684        f.write_str(&indent)?;
3685
3686        fn fmt_cell_type_id(
3687            f: &mut fmt::Formatter<'_>,
3688            cell_type_id: Option<ValueTypeId>,
3689        ) -> fmt::Result {
3690            if let Some(ty) = cell_type_id {
3691                write!(
3692                    f,
3693                    " (read cell of type {})",
3694                    get_value_type(ty).ty.global_name
3695                )
3696            } else {
3697                Ok(())
3698            }
3699        }
3700
3701        // write the name and type
3702        match self {
3703            Self::Cached {
3704                task_name,
3705                cell_type_id,
3706                ..
3707            }
3708            | Self::Collapsed {
3709                task_name,
3710                cell_type_id,
3711                ..
3712            } => {
3713                f.write_str(task_name)?;
3714                fmt_cell_type_id(f, *cell_type_id)?;
3715                if matches!(self, Self::Collapsed { .. }) {
3716                    f.write_str(" (collapsed)")?;
3717                }
3718            }
3719            Self::Uncached { cell_type_id } => {
3720                f.write_str("unknown transient task")?;
3721                fmt_cell_type_id(f, *cell_type_id)?;
3722            }
3723        }
3724        f.write_char('\n')?;
3725
3726        // write any extra "cause" information we might have
3727        if let Self::Cached {
3728            cause_self,
3729            cause_args,
3730            ..
3731        } = self
3732        {
3733            if let Some(c) = cause_self {
3734                writeln!(f, "{indent}  self:")?;
3735                c.fmt_indented(f, level + 1)?;
3736            }
3737            if !cause_args.is_empty() {
3738                writeln!(f, "{indent}  args:")?;
3739                for c in cause_args {
3740                    c.fmt_indented(f, level + 1)?;
3741                }
3742            }
3743        }
3744        Ok(())
3745    }
3746}
3747
3748impl fmt::Display for DebugTraceTransientTask {
3749    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3750        self.fmt_indented(f, 0)
3751    }
3752}
3753
3754// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3755fn far_future() -> Instant {
3756    // Roughly 30 years from now.
3757    // API does not provide a way to obtain max `Instant`
3758    // or convert specific date in the future to instant.
3759    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3760    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3761}
3762
3763/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3764/// buffer.
3765/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3766/// resulting buffer sizes.
3767///
3768/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3769/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3770/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3771/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3772/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3773/// spurious `Result` threading throughout the encode path.
3774fn encode_task_data(
3775    task: TaskId,
3776    data: &TaskStorage,
3777    category: SpecificTaskDataCategory,
3778    scratch_buffer: &mut TurboBincodeBuffer,
3779) -> Result<TurboBincodeBuffer> {
3780    scratch_buffer.clear();
3781    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3782    data.encode(category, &mut encoder)?;
3783
3784    if cfg!(feature = "verify_serialization") {
3785        TaskStorage::new()
3786            .decode(
3787                category,
3788                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3789            )
3790            .with_context(|| {
3791                format!(
3792                    "expected to be able to decode serialized data for '{category:?}' information \
3793                     for {task}"
3794                )
3795            })?;
3796    }
3797    Ok(SmallVec::from_slice(scratch_buffer))
3798}