Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9    borrow::Cow,
10    fmt::{self, Write},
11    future::Future,
12    hash::BuildHasherDefault,
13    mem::take,
14    pin::Pin,
15    sync::{
16        Arc, LazyLock,
17        atomic::{AtomicBool, Ordering},
18    },
19    time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32    CellId, DynTaskInputsStorage, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
33    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
34    TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasks, TurboTasksCallApi,
35    TurboTasksPanic, ValueTypeId,
36    backend::{
37        Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38        TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40        VerificationMode,
41    },
42    event::{Event, EventDescription, EventListener},
43    macro_helpers::NativeFunction,
44    message_queue::{TimingEvent, TraceEvent},
45    registry::get_value_type,
46    scope::scope_and_block,
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51#[cfg(feature = "task_dirty_cause")]
52use turbo_tasks::{FunctionId, TaskDirtyCause};
53
54pub use self::{
55    operation::AnyOperation,
56    storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
57};
58use crate::{
59    backend::{
60        operation::{
61            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64            connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65            prepare_new_children,
66        },
67        snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68        storage::Storage,
69        storage_schema::{TaskStorage, TaskStorageAccessors},
70    },
71    backing_storage::{SnapshotItem, SnapshotMeta, compute_task_type_hash},
72    data::{
73        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
74        InProgressState, InProgressStateInner, OutputValue, TransientTask,
75    },
76    error::TaskError,
77    kv_backing_storage::TurboBackingStorage,
78    utils::{
79        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
80        shard_amount::compute_shard_amount,
81    },
82};
83
84/// Threshold for parallelizing making dependent tasks dirty.
85/// If the number of dependent tasks exceeds this threshold,
86/// the operation will be parallelized.
87const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
88
89/// Configurable idle timeout for snapshot persistence.
90/// Defaults to 2 seconds if not set or if the value is invalid.
91static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
92    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
93        .ok()
94        .and_then(|v| v.parse::<u64>().ok())
95        .map(Duration::from_millis)
96        .unwrap_or(Duration::from_secs(2))
97});
98
99/// Priority used to re-schedule a task that became stale during execution.
100///
101/// Stale tasks must run again, but at a priority that reflects why they're being re-run rather
102/// than the (likely higher) priority of the original schedule. We use invalidation priority
103/// based on the task's leaf distance, parented under either the task's current dirty priority
104/// or `leaf()` if it is no longer dirty.
105fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
106    TaskPriority::invalidation(
107        task.get_leaf_distance()
108            .copied()
109            .unwrap_or_default()
110            .distance,
111    )
112    .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
113}
114
115pub enum StorageMode {
116    /// Queries the storage for cache entries that don't exist locally.
117    ReadOnly,
118    /// Queries the storage for cache entries that don't exist locally.
119    /// Regularly pushes changes to the backing storage.
120    ReadWrite,
121    /// Queries the storage for cache entries that don't exist locally.
122    /// On shutdown, pushes all changes to the backing storage.
123    ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127    /// Enables dependency tracking.
128    ///
129    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
130    /// forever.
131    pub dependency_tracking: bool,
132
133    /// Enables active tracking.
134    ///
135    /// Automatically disabled when `dependency_tracking` is disabled.
136    ///
137    /// When disabled: All tasks are considered as active.
138    pub active_tracking: bool,
139
140    /// Enables the backing storage.
141    pub storage_mode: Option<StorageMode>,
142
143    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
144    /// datastructures. If `None`, it will use the available parallelism.
145    pub num_workers: Option<usize>,
146
147    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
148    pub small_preallocation: bool,
149
150    /// When enabled, evict all evictable tasks from in-memory storage after every snapshot.
151    /// This reclaims memory by clearing persisted data that can be re-loaded from disk on demand.
152    /// This is an EXPERIMENTAL FEATURE under development
153    pub evict_after_snapshot: bool,
154}
155
156impl Default for BackendOptions {
157    fn default() -> Self {
158        Self {
159            dependency_tracking: true,
160            active_tracking: true,
161            storage_mode: Some(StorageMode::ReadWrite),
162            num_workers: None,
163            small_preallocation: false,
164            evict_after_snapshot: false,
165        }
166    }
167}
168
169pub enum TurboTasksBackendJob {
170    Snapshot,
171}
172
173/// Why a snapshot/persist is being performed.
174#[derive(Clone, Copy, Debug, PartialEq, Eq)]
175enum SnapshotReason {
176    Test,
177    Stop,
178    InitialSnapshotTimeout,
179    RegularSnapshotInterval,
180    IdleTimeout,
181}
182
183impl SnapshotReason {
184    fn as_str(self) -> &'static str {
185        match self {
186            SnapshotReason::Test => "test",
187            SnapshotReason::Stop => "stop",
188            SnapshotReason::InitialSnapshotTimeout => "initial snapshot timeout",
189            SnapshotReason::RegularSnapshotInterval => "regular snapshot interval",
190            SnapshotReason::IdleTimeout => "idle timeout",
191        }
192    }
193
194    /// True only for `Stop`: at shutdown the whole map is dropped right after, so each task
195    /// entry can be drained from the map and freed as it is serialized instead of after the
196    /// whole batch is written. This reduces peak memory during `next build` shutdown.
197    fn drain_entries(self) -> bool {
198        matches!(self, SnapshotReason::Stop)
199    }
200}
201
202pub struct TurboTasksBackend {
203    options: BackendOptions,
204
205    start_time: Instant,
206
207    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
208    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
209
210    storage: Storage,
211
212    /// Coordinates the operation/snapshot interleaving protocol. See
213    /// [`SnapshotCoordinator`] for details.
214    snapshot_coord: SnapshotCoordinator,
215    /// Serializes calls to `snapshot_and_persist`. The coordinator's
216    /// `begin_snapshot` asserts that snapshots don't overlap; this mutex
217    /// enforces that contract for our two callers (background loop and
218    /// `stop_and_wait`).
219    snapshot_in_progress: Mutex<()>,
220
221    stopping: AtomicBool,
222    stopping_event: Event,
223    idle_start_event: Event,
224    idle_end_event: Event,
225    #[cfg(feature = "verify_aggregation_graph")]
226    is_idle: AtomicBool,
227
228    task_statistics: TaskStatisticsApi,
229
230    backing_storage: TurboBackingStorage,
231
232    #[cfg(feature = "verify_aggregation_graph")]
233    root_tasks: Mutex<FxHashSet<TaskId>>,
234}
235
236impl TurboTasksBackend {
237    /// Invalidates the persistent storage so that it will be deleted the next time a turbopack
238    /// instance is created with the filesystem cache enabled.
239    ///
240    /// `reason_code` should be one of the codes in
241    /// [`crate::db_invalidation::invalidation_reasons`].
242    pub fn invalidate_storage(&self, reason_code: &str) -> Result<()> {
243        self.backing_storage.invalidate(reason_code)
244    }
245
246    pub fn new(mut options: BackendOptions, backing_storage: TurboBackingStorage) -> Self {
247        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
248        if !options.dependency_tracking {
249            options.active_tracking = false;
250        }
251        let small_preallocation = options.small_preallocation;
252        let next_task_id = backing_storage
253            .next_free_task_id()
254            .expect("Failed to get task id");
255        Self {
256            options,
257            start_time: Instant::now(),
258            persisted_task_id_factory: IdFactoryWithReuse::new(
259                next_task_id,
260                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
261            ),
262            transient_task_id_factory: IdFactoryWithReuse::new(
263                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
264                TaskId::MAX,
265            ),
266            storage: Storage::new(shard_amount, small_preallocation),
267            snapshot_coord: SnapshotCoordinator::new(),
268            snapshot_in_progress: Mutex::new(()),
269            stopping: AtomicBool::new(false),
270            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
271            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
272            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
273            #[cfg(feature = "verify_aggregation_graph")]
274            is_idle: AtomicBool::new(false),
275            task_statistics: TaskStatisticsApi::default(),
276            backing_storage,
277            #[cfg(feature = "verify_aggregation_graph")]
278            root_tasks: Default::default(),
279        }
280    }
281
282    fn execute_context<'a>(
283        &'a self,
284        turbo_tasks: &'a TurboTasks<TurboTasksBackend>,
285    ) -> impl ExecuteContext<'a> {
286        ExecuteContextImpl::new(self, turbo_tasks)
287    }
288
289    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
290        if self.should_persist() {
291            self.snapshot_coord.suspend_point(suspend);
292        }
293    }
294
295    pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
296        if !self.should_persist() {
297            return OperationGuard::noop();
298        }
299        self.snapshot_coord.begin_operation()
300    }
301
302    fn should_persist(&self) -> bool {
303        matches!(
304            self.options.storage_mode,
305            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
306        )
307    }
308
309    fn should_evict(&self) -> bool {
310        self.options.evict_after_snapshot && self.should_persist()
311    }
312
313    /// Perform a snapshot and then evict all evictable tasks from memory.
314    ///
315    /// This is exposed for integration tests that need to verify the
316    /// snapshot → evict → restore cycle works correctly.
317    ///
318    /// Returns `(snapshot_had_new_data, eviction_counts)`.
319    #[doc(hidden)]
320    pub fn snapshot_and_evict_for_testing(
321        &self,
322        turbo_tasks: &TurboTasks<TurboTasksBackend>,
323    ) -> (bool, EvictionCounts) {
324        assert!(
325            self.should_persist(),
326            "snapshot_and_evict requires persistence"
327        );
328        let snapshot_result = self.snapshot_and_persist(None, SnapshotReason::Test, turbo_tasks);
329        let had_new_data = match snapshot_result {
330            Ok((_, new_data)) => new_data,
331            Err(_) => {
332                // Snapshot/persist failed — skip eviction since the data may not
333                // be on disk yet. Evicting now could lose in-memory state that
334                // can't be restored.
335                return (false, EvictionCounts::default());
336            }
337        };
338        let counts = self.storage.evict_after_snapshot(None);
339        (had_new_data, counts)
340    }
341
342    fn should_restore(&self) -> bool {
343        self.options.storage_mode.is_some()
344    }
345
346    fn should_track_dependencies(&self) -> bool {
347        self.options.dependency_tracking
348    }
349
350    fn should_track_activeness(&self) -> bool {
351        self.options.active_tracking
352    }
353
354    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
355        self.task_statistics
356            .map(|stats| stats.increment_cache_hit(native_fn));
357    }
358
359    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
360        self.task_statistics
361            .map(|stats| stats.increment_cache_miss(native_fn));
362    }
363
364    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
365    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
366    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
367    /// references for lazy name resolution.
368    fn task_error_to_turbo_tasks_execution_error(
369        &self,
370        error: &TaskError,
371        ctx: &mut impl ExecuteContext<'_>,
372    ) -> TurboTasksExecutionError {
373        match error {
374            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
375            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
376                message: item.message.clone(),
377                source: item
378                    .source
379                    .as_ref()
380                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
381            })),
382            TaskError::LocalTaskContext(local_task_context) => {
383                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
384                    name: local_task_context.name.clone(),
385                    source: local_task_context
386                        .source
387                        .as_ref()
388                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
389                }))
390            }
391            TaskError::TaskChain(chain) => {
392                let task_id = chain.last().unwrap();
393                let error = {
394                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
395                    if let Some(OutputValue::Error(error)) = task.get_output() {
396                        Some(error.clone())
397                    } else {
398                        None
399                    }
400                };
401                let error = error.map_or_else(
402                    || {
403                        // Eventual consistency will cause errors to no longer be available
404                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
405                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
406                                "Error no longer available",
407                            )),
408                            location: None,
409                        }))
410                    },
411                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
412                );
413                let mut current_error = error;
414                for &task_id in chain.iter().rev() {
415                    current_error =
416                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
417                            task_id,
418                            source: Some(current_error),
419                            turbo_tasks: ctx.turbo_tasks(),
420                        }));
421                }
422                current_error
423            }
424        }
425    }
426}
427
428/// Intermediate result of step 1 of task execution completion.
429struct TaskExecutionCompletePrepareResult {
430    pub new_children: FxHashSet<TaskId>,
431    pub is_now_immutable: bool,
432    #[cfg(feature = "verify_determinism")]
433    pub no_output_set: bool,
434    #[cfg(feature = "task_dirty_cause")]
435    pub function_id: Option<FunctionId>,
436    pub new_output: Option<OutputValue>,
437    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
438    pub is_recomputation: bool,
439    pub is_session_dependent: bool,
440}
441
442// Operations
443impl TurboTasksBackend {
444    fn try_read_task_output(
445        &self,
446        task_id: TaskId,
447        reader: Option<TaskId>,
448        options: ReadOutputOptions,
449        turbo_tasks: &TurboTasks<TurboTasksBackend>,
450    ) -> Result<Result<RawVc, EventListener>> {
451        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
452
453        let mut ctx = self.execute_context(turbo_tasks);
454        let need_reader_task = if self.should_track_dependencies()
455            && !matches!(options.tracking, ReadTracking::Untracked)
456            && let Some(reader_id) = reader
457            && reader_id != task_id
458        {
459            Some(reader_id)
460        } else {
461            None
462        };
463        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
464            // Having a task_pair here is not optimal, but otherwise this would lead to a race
465            // condition. See below.
466            // TODO(sokra): solve that in a more performant way.
467            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
468            (task, Some(reader))
469        } else {
470            (ctx.task(task_id, TaskDataCategory::All), None)
471        };
472
473        fn listen_to_done_event(
474            reader_description: Option<EventDescription>,
475            tracking: ReadTracking,
476            done_event: &Event,
477        ) -> EventListener {
478            done_event.listen_with_note(move || {
479                move || {
480                    if let Some(reader_description) = reader_description.as_ref() {
481                        format!(
482                            "try_read_task_output from {} ({})",
483                            reader_description, tracking
484                        )
485                    } else {
486                        format!("try_read_task_output ({})", tracking)
487                    }
488                }
489            })
490        }
491
492        fn check_in_progress(
493            task: &impl TaskGuard,
494            reader_description: Option<EventDescription>,
495            tracking: ReadTracking,
496        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
497        {
498            match task.get_in_progress() {
499                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
500                    listen_to_done_event(reader_description, tracking, done_event),
501                ))),
502                Some(InProgressState::InProgress(box InProgressStateInner {
503                    done_event, ..
504                })) => Some(Ok(Err(listen_to_done_event(
505                    reader_description,
506                    tracking,
507                    done_event,
508                )))),
509                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
510                    "{} was canceled",
511                    task.get_task_description()
512                ))),
513                None => None,
514            }
515        }
516
517        if matches!(options.consistency, ReadConsistency::Strong) {
518            if task
519                .get_persistent_task_type()
520                .is_some_and(|t| !t.native_fn.is_root)
521            {
522                drop(task);
523                drop(reader_task);
524                panic!(
525                    "Strongly consistent read of non-root task {} (reader: {}). The `root` \
526                     attribute is missing on the task.",
527                    self.debug_get_task_description(task_id),
528                    reader.map_or_else(
529                        || "unknown".to_string(),
530                        |r| self.debug_get_task_description(r)
531                    )
532                );
533            }
534
535            let is_dirty = task.is_dirty();
536
537            // Check the dirty count of the root node
538            let has_dirty_containers = task.has_dirty_containers();
539            if has_dirty_containers || is_dirty.is_some() {
540                let activeness = task.get_activeness_mut();
541                let mut task_ids_to_schedule: Vec<_> = Vec::new();
542                // When there are dirty task, subscribe to the all_clean_event
543                let activeness = if let Some(activeness) = activeness {
544                    // This makes sure all tasks stay active and this task won't stale.
545                    // active_until_clean is automatically removed when this
546                    // task is clean.
547                    activeness.set_active_until_clean();
548                    activeness
549                } else {
550                    // If we don't have a root state, add one. This also makes sure all tasks stay
551                    // active and this task won't stale. active_until_clean
552                    // is automatically removed when this task is clean.
553                    if ctx.should_track_activeness() {
554                        // A newly added Activeness need to make sure to schedule the tasks
555                        task_ids_to_schedule = task.dirty_containers().collect();
556                        task_ids_to_schedule.push(task_id);
557                    }
558                    let activeness =
559                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
560                    activeness.set_active_until_clean();
561                    activeness
562                };
563                let listener = activeness.all_clean_event.listen_with_note(move || {
564                    // Reach the backend through the pinned `turbo_tasks` handle rather than
565                    // cloning `self`: pinning keeps the backend alive for the closure's lifetime.
566                    let tt = turbo_tasks.pin();
567                    move || {
568                        let mut ctx = tt.backend().execute_context(&tt);
569                        let mut visited = FxHashSet::default();
570                        fn indent(s: &str) -> String {
571                            s.split_inclusive('\n')
572                                .flat_map(|line: &str| ["  ", line].into_iter())
573                                .collect::<String>()
574                        }
575                        fn get_info(
576                            ctx: &mut impl ExecuteContext<'_>,
577                            task_id: TaskId,
578                            parent_and_count: Option<(TaskId, i32)>,
579                            visited: &mut FxHashSet<TaskId>,
580                        ) -> String {
581                            let task = ctx.task(task_id, TaskDataCategory::All);
582                            let is_dirty = task.is_dirty();
583                            let in_progress =
584                                task.get_in_progress()
585                                    .map_or("not in progress", |p| match p {
586                                        InProgressState::InProgress(_) => "in progress",
587                                        InProgressState::Scheduled { .. } => "scheduled",
588                                        InProgressState::Canceled => "canceled",
589                                    });
590                            let activeness = task.get_activeness().map_or_else(
591                                || "not active".to_string(),
592                                |activeness| format!("{activeness:?}"),
593                            );
594                            let aggregation_number = get_aggregation_number(&task);
595                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
596                            {
597                                let uppers = get_uppers(&task);
598                                !uppers.contains(&parent_task_id)
599                            } else {
600                                false
601                            };
602
603                            // Check the dirty count of the root node
604                            let has_dirty_containers = task.has_dirty_containers();
605
606                            let task_description = task.get_task_description();
607                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
608                                format!(", dirty({parent_priority})")
609                            } else {
610                                String::new()
611                            };
612                            let has_dirty_containers_label = if has_dirty_containers {
613                                ", dirty containers"
614                            } else {
615                                ""
616                            };
617                            let count = if let Some((_, count)) = parent_and_count {
618                                format!(" {count}")
619                            } else {
620                                String::new()
621                            };
622                            let mut info = format!(
623                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
624                                 {in_progress}, \
625                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
626                            );
627                            let children: Vec<_> = task.dirty_containers_with_count().collect();
628                            drop(task);
629
630                            if missing_upper {
631                                info.push_str("\n  ERROR: missing upper connection");
632                            }
633
634                            if has_dirty_containers || !children.is_empty() {
635                                writeln!(info, "\n  dirty tasks:").unwrap();
636
637                                for (child_task_id, count) in children {
638                                    let task_description = ctx
639                                        .task(child_task_id, TaskDataCategory::Data)
640                                        .get_task_description();
641                                    if visited.insert(child_task_id) {
642                                        let child_info = get_info(
643                                            ctx,
644                                            child_task_id,
645                                            Some((task_id, count)),
646                                            visited,
647                                        );
648                                        info.push_str(&indent(&child_info));
649                                        if !info.ends_with('\n') {
650                                            info.push('\n');
651                                        }
652                                    } else {
653                                        writeln!(
654                                            info,
655                                            "  {child_task_id} {task_description} {count} \
656                                             (already visited)"
657                                        )
658                                        .unwrap();
659                                    }
660                                }
661                            }
662                            info
663                        }
664                        let info = get_info(&mut ctx, task_id, None, &mut visited);
665                        format!(
666                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
667                        )
668                    }
669                });
670                drop(reader_task);
671                drop(task);
672                if !task_ids_to_schedule.is_empty() {
673                    let mut queue = AggregationUpdateQueue::new();
674                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
675                    queue.execute(&mut ctx);
676                }
677
678                return Ok(Err(listener));
679            }
680        }
681
682        let reader_description = reader_task
683            .as_ref()
684            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
685        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
686        {
687            return value;
688        }
689
690        if let Some(output) = task.get_output() {
691            let result = match output {
692                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
693                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
694                OutputValue::Error(error) => Err(error.clone()),
695            };
696            if let Some(mut reader_task) = reader_task.take()
697                && options.tracking.should_track(result.is_err())
698                && (!task.immutable() || cfg!(feature = "verify_immutable"))
699            {
700                #[cfg(feature = "trace_task_output_dependencies")]
701                let _span = tracing::trace_span!(
702                    "add output dependency",
703                    task = %task_id,
704                    dependent_task = ?reader
705                )
706                .entered();
707                let mut queue = LeafDistanceUpdateQueue::new();
708                let reader = reader.unwrap();
709                if task.add_output_dependent(reader) {
710                    // Ensure that dependent leaf distance is strictly monotonic increasing
711                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
712                    let reader_leaf_distance =
713                        reader_task.get_leaf_distance().copied().unwrap_or_default();
714                    if reader_leaf_distance.distance <= leaf_distance.distance {
715                        queue.push(
716                            reader,
717                            leaf_distance.distance,
718                            leaf_distance.max_distance_in_buffer,
719                        );
720                    }
721                }
722
723                drop(task);
724
725                // Note: We use `task_pair` earlier to lock the task and its reader at the same
726                // time. If we didn't and just locked the reader here, an invalidation could occur
727                // between grabbing the locks. If that happened, and if the task is "outdated" or
728                // doesn't have the dependency edge yet, the invalidation would be lost.
729
730                if !reader_task.remove_outdated_output_dependencies(&task_id) {
731                    let _ = reader_task.add_output_dependencies(task_id);
732                }
733                drop(reader_task);
734
735                queue.execute(&mut ctx);
736            } else {
737                drop(task);
738            }
739
740            return result.map_err(|error| {
741                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
742                    .with_task_context(task_id, turbo_tasks.pin())
743                    .into()
744            });
745        }
746        drop(reader_task);
747
748        let note = EventDescription::new(|| {
749            move || {
750                if let Some(reader) = reader_description.as_ref() {
751                    format!("try_read_task_output (recompute) from {reader}",)
752                } else {
753                    "try_read_task_output (recompute, untracked)".to_string()
754                }
755            }
756        });
757
758        // Output doesn't exist. We need to schedule the task to compute it.
759        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
760            TaskExecutionReason::OutputNotAvailable,
761            EventDescription::new(|| task.get_task_desc_fn()),
762            note,
763        );
764
765        // It's not possible that the task is InProgress at this point. If it is InProgress {
766        // done: true } it must have Output and would early return.
767        let old = task.set_in_progress(in_progress_state);
768        debug_assert!(old.is_none(), "InProgress already exists");
769        ctx.schedule_task(task, TaskPriority::Recomputation);
770
771        Ok(Err(listener))
772    }
773
774    fn try_read_task_cell(
775        &self,
776        task_id: TaskId,
777        reader: Option<TaskId>,
778        cell: CellId,
779        options: ReadCellOptions,
780        turbo_tasks: &TurboTasks<TurboTasksBackend>,
781    ) -> Result<Result<TypedCellContent, EventListener>> {
782        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
783
784        fn add_cell_dependency(
785            task_id: TaskId,
786            mut task: impl TaskGuard,
787            reader: Option<TaskId>,
788            reader_task: Option<impl TaskGuard>,
789            cell: CellId,
790            key: Option<u64>,
791        ) {
792            if let Some(mut reader_task) = reader_task
793                && (!task.immutable() || cfg!(feature = "verify_immutable"))
794            {
795                let reader = reader.unwrap();
796                let reverse = CellRef { task: reader, cell };
797                if let Some(k) = key {
798                    let _ = task.add_cell_dependents_hashed((reverse, k));
799                } else {
800                    let _ = task.add_cell_dependents(reverse);
801                }
802                drop(task);
803
804                // Note: We use `task_pair` earlier to lock the task and its reader at the same
805                // time. If we didn't and just locked the reader here, an invalidation could occur
806                // between grabbing the locks. If that happened, and if the task is "outdated" or
807                // doesn't have the dependency edge yet, the invalidation would be lost.
808
809                let target = CellRef {
810                    task: task_id,
811                    cell,
812                };
813                if let Some(k) = key {
814                    if !reader_task.remove_outdated_cell_dependencies_hashed(&(target, k)) {
815                        let _ = reader_task.add_cell_dependencies_hashed((target, k));
816                    }
817                } else if !reader_task.remove_outdated_cell_dependencies(&target) {
818                    let _ = reader_task.add_cell_dependencies(target);
819                }
820                drop(reader_task);
821            }
822        }
823
824        let ReadCellOptions {
825            tracking,
826            final_read_hint,
827        } = options;
828
829        let mut ctx = self.execute_context(turbo_tasks);
830        let (mut task, reader_task) = if self.should_track_dependencies()
831            && !matches!(tracking, ReadCellTracking::Untracked)
832            && let Some(reader_id) = reader
833            && reader_id != task_id
834        {
835            // Having a task_pair here is not optimal, but otherwise this would lead to a race
836            // condition. See below.
837            // TODO(sokra): solve that in a more performant way.
838            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
839            (task, Some(reader))
840        } else {
841            (ctx.task(task_id, TaskDataCategory::All), None)
842        };
843
844        let content = if final_read_hint {
845            task.remove_cell_data(&cell)
846        } else {
847            task.get_cell_data(&cell).cloned()
848        };
849        if let Some(content) = content {
850            if tracking.should_track(false) {
851                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
852            }
853            return Ok(Ok(TypedCellContent(
854                cell.type_id,
855                CellContent(Some(content)),
856            )));
857        }
858
859        let in_progress = task.get_in_progress();
860        if matches!(
861            in_progress,
862            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
863        ) {
864            return Ok(Err(self
865                .listen_to_cell(&mut task, task_id, &reader_task, cell)
866                .0));
867        }
868        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
869
870        // Check cell index range (cell might not exist at all)
871        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
872        let Some(max_id) = max_id else {
873            let task_desc = task.get_task_description();
874            if tracking.should_track(true) {
875                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
876            }
877            bail!(
878                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
879            );
880        };
881        if cell.index >= max_id {
882            let task_desc = task.get_task_description();
883            if tracking.should_track(true) {
884                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
885            }
886            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
887        }
888
889        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
890        // task to get the cell content.
891
892        // Bail early if the task was cancelled — no point in registering a listener
893        // on a task that won't execute again.
894        if is_cancelled {
895            bail!("{} was canceled", task.get_task_description());
896        }
897
898        // Listen to the cell and potentially schedule the task
899        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
900        drop(reader_task);
901        if !new_listener {
902            return Ok(Err(listener));
903        }
904
905        let _span = tracing::trace_span!(
906            "recomputation",
907            cell_type = get_value_type(cell.type_id).ty.global_name,
908            cell_index = cell.index
909        )
910        .entered();
911
912        let _ = task.add_scheduled(
913            TaskExecutionReason::CellNotAvailable,
914            EventDescription::new(|| task.get_task_desc_fn()),
915        );
916        ctx.schedule_task(task, TaskPriority::Recomputation);
917
918        Ok(Err(listener))
919    }
920
921    fn listen_to_cell(
922        &self,
923        task: &mut impl TaskGuard,
924        task_id: TaskId,
925        reader_task: &Option<impl TaskGuard>,
926        cell: CellId,
927    ) -> (EventListener, bool) {
928        let note = || {
929            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
930            move || {
931                if let Some(reader_desc) = reader_desc.as_ref() {
932                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
933                } else {
934                    "try_read_task_cell (in progress, untracked)".to_string()
935                }
936            }
937        };
938        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
939            // Someone else is already computing the cell
940            let listener = in_progress.event.listen_with_note(note);
941            return (listener, false);
942        }
943        let in_progress = InProgressCellState::new(task_id, cell);
944        let listener = in_progress.event.listen_with_note(note);
945        let old = task.insert_in_progress_cells(cell, in_progress);
946        debug_assert!(old.is_none(), "InProgressCell already exists");
947        (listener, true)
948    }
949
950    fn snapshot_and_persist(
951        &self,
952        parent_span: Option<tracing::Id>,
953        reason: SnapshotReason,
954        turbo_tasks: &TurboTasks<TurboTasksBackend>,
955    ) -> Result<(Instant, bool), anyhow::Error> {
956        let snapshot_span =
957            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason.as_str())
958                .entered();
959        // Serialize snapshots. The internal protocol (snapshot_mode, snapshot
960        // request bit, suspended_operations) assumes only one snapshot runs at
961        // a time. Held for the entire snapshot lifecycle.
962        let _snapshot_in_progress = self.snapshot_in_progress.lock();
963        let start = Instant::now();
964        // SystemTime for wall-clock timestamps in trace events (milliseconds
965        // since epoch). Instant is monotonic but has no defined epoch, so it
966        // can't be used for cross-process trace correlation.
967        let wall_start = SystemTime::now();
968        debug_assert!(self.should_persist());
969
970        let mut snapshot_phase = {
971            let _span = tracing::info_span!("blocking").entered();
972            self.snapshot_coord.begin_snapshot()
973        };
974        // Enter snapshot mode, which atomically reads and resets the modified count.
975        // Checking after start_snapshot ensures no concurrent increments can race.
976        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
977
978        let suspended_operations = snapshot_phase.take_suspended_operations();
979
980        let snapshot_time = Instant::now();
981        drop(snapshot_phase);
982
983        if !has_modifications {
984            // No tasks modified since the last snapshot — drop the guard (which
985            // calls end_snapshot) and skip the expensive O(N) scan.
986            drop(snapshot_guard);
987            return Ok((start, false));
988        }
989
990        #[cfg(feature = "print_cache_item_size")]
991        #[derive(Default)]
992        struct TaskCacheStats {
993            data: usize,
994            #[cfg(feature = "print_cache_item_size_with_compressed")]
995            data_compressed: usize,
996            data_count: usize,
997            meta: usize,
998            #[cfg(feature = "print_cache_item_size_with_compressed")]
999            meta_compressed: usize,
1000            meta_count: usize,
1001            upper_count: usize,
1002            collectibles_count: usize,
1003            aggregated_collectibles_count: usize,
1004            children_count: usize,
1005            followers_count: usize,
1006            collectibles_dependents_count: usize,
1007            aggregated_dirty_containers_count: usize,
1008            output_size: usize,
1009        }
1010        /// Formats a byte size, optionally including the compressed size when the
1011        /// `print_cache_item_size_with_compressed` feature is enabled.
1012        #[cfg(feature = "print_cache_item_size")]
1013        struct FormatSizes {
1014            size: usize,
1015            #[cfg(feature = "print_cache_item_size_with_compressed")]
1016            compressed_size: usize,
1017        }
1018        #[cfg(feature = "print_cache_item_size")]
1019        impl std::fmt::Display for FormatSizes {
1020            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1021                use turbo_tasks::util::FormatBytes;
1022                #[cfg(feature = "print_cache_item_size_with_compressed")]
1023                {
1024                    write!(
1025                        f,
1026                        "{} ({} compressed)",
1027                        FormatBytes(self.size),
1028                        FormatBytes(self.compressed_size)
1029                    )
1030                }
1031                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1032                {
1033                    write!(f, "{}", FormatBytes(self.size))
1034                }
1035            }
1036        }
1037        #[cfg(feature = "print_cache_item_size")]
1038        impl TaskCacheStats {
1039            #[cfg(feature = "print_cache_item_size_with_compressed")]
1040            fn compressed_size(data: &[u8]) -> Result<usize> {
1041                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1042                    data,
1043                    &mut Vec::new(),
1044                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1045                )?)
1046            }
1047
1048            fn add_data(&mut self, data: &[u8]) {
1049                self.data += data.len();
1050                #[cfg(feature = "print_cache_item_size_with_compressed")]
1051                {
1052                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1053                }
1054                self.data_count += 1;
1055            }
1056
1057            fn add_meta(&mut self, data: &[u8]) {
1058                self.meta += data.len();
1059                #[cfg(feature = "print_cache_item_size_with_compressed")]
1060                {
1061                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1062                }
1063                self.meta_count += 1;
1064            }
1065
1066            fn add_counts(&mut self, storage: &TaskStorage) {
1067                let counts = storage.meta_counts();
1068                self.upper_count += counts.upper;
1069                self.collectibles_count += counts.collectibles;
1070                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1071                self.children_count += counts.children;
1072                self.followers_count += counts.followers;
1073                self.collectibles_dependents_count += counts.collectibles_dependents;
1074                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1075                if let Some(output) = storage.get_output() {
1076                    use turbo_bincode::turbo_bincode_encode;
1077
1078                    self.output_size += turbo_bincode_encode(&output)
1079                        .map(|data| data.len())
1080                        .unwrap_or(0);
1081                }
1082            }
1083
1084            /// Returns the task name used as the stats grouping key.
1085            fn task_name(storage: &TaskStorage) -> String {
1086                storage
1087                    .get_persistent_task_type()
1088                    .map(|t| t.to_string())
1089                    .unwrap_or_else(|| "<unknown>".to_string())
1090            }
1091
1092            /// Returns the primary sort key: compressed total when
1093            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1094            fn sort_key(&self) -> usize {
1095                #[cfg(feature = "print_cache_item_size_with_compressed")]
1096                {
1097                    self.data_compressed + self.meta_compressed
1098                }
1099                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1100                {
1101                    self.data + self.meta
1102                }
1103            }
1104
1105            fn format_total(&self) -> FormatSizes {
1106                FormatSizes {
1107                    size: self.data + self.meta,
1108                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1109                    compressed_size: self.data_compressed + self.meta_compressed,
1110                }
1111            }
1112
1113            fn format_data(&self) -> FormatSizes {
1114                FormatSizes {
1115                    size: self.data,
1116                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1117                    compressed_size: self.data_compressed,
1118                }
1119            }
1120
1121            fn format_avg_data(&self) -> FormatSizes {
1122                FormatSizes {
1123                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1124                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1125                    compressed_size: self
1126                        .data_compressed
1127                        .checked_div(self.data_count)
1128                        .unwrap_or(0),
1129                }
1130            }
1131
1132            fn format_meta(&self) -> FormatSizes {
1133                FormatSizes {
1134                    size: self.meta,
1135                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1136                    compressed_size: self.meta_compressed,
1137                }
1138            }
1139
1140            fn format_avg_meta(&self) -> FormatSizes {
1141                FormatSizes {
1142                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1143                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1144                    compressed_size: self
1145                        .meta_compressed
1146                        .checked_div(self.meta_count)
1147                        .unwrap_or(0),
1148                }
1149            }
1150        }
1151        #[cfg(feature = "print_cache_item_size")]
1152        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1153            Mutex::new(FxHashMap::default());
1154
1155        // Encode each task's modified categories. We only encode categories with `modified` set,
1156        // meaning the category was actually dirtied. Categories restored from disk but never
1157        // modified don't need re-persisting since the on-disk version is still valid.
1158        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1159        // flags were copied from the live task at snapshot creation time, reflecting which
1160        // categories were dirtied before the snapshot was taken.
1161        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1162            let encode_category = |task_id: TaskId,
1163                                   data: &TaskStorage,
1164                                   category: SpecificTaskDataCategory,
1165                                   buffer: &mut TurboBincodeBuffer|
1166             -> Option<TurboBincodeBuffer> {
1167                match encode_task_data(task_id, data, category, buffer) {
1168                    Ok(encoded) => {
1169                        #[cfg(feature = "print_cache_item_size")]
1170                        {
1171                            let mut stats = task_cache_stats.lock();
1172                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1173                            match category {
1174                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1175                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1176                            }
1177                        }
1178                        Some(encoded)
1179                    }
1180                    Err(err) => {
1181                        panic!(
1182                            "Serializing task {} failed ({:?}): {:?}",
1183                            self.debug_get_task_description(task_id),
1184                            category,
1185                            err
1186                        );
1187                    }
1188                }
1189            };
1190            if task_id.is_transient() {
1191                unreachable!("transient task_ids should never be enqueued to be persisted");
1192            }
1193
1194            let encode_meta = inner.flags.meta_modified();
1195            let encode_data = inner.flags.data_modified();
1196
1197            #[cfg(feature = "print_cache_item_size")]
1198            if encode_data || encode_meta {
1199                task_cache_stats
1200                    .lock()
1201                    .entry(TaskCacheStats::task_name(inner))
1202                    .or_default()
1203                    .add_counts(inner);
1204            }
1205
1206            let meta = if encode_meta {
1207                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1208            } else {
1209                None
1210            };
1211
1212            let data = if encode_data {
1213                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1214            } else {
1215                None
1216            };
1217            let task_type_hash = if inner.flags.new_task() {
1218                let task_type = inner.get_persistent_task_type().expect(
1219                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1220                     creation for persistent tasks uses a single ExecutionContextImpl for \
1221                     creating the task (which sets new_task) and connect_child (which sets \
1222                     persistent_task_type) and take_snapshot waits for all operations to complete \
1223                     or suspend before we start snapshotting.  So task creation will always set \
1224                     the task_type.",
1225                );
1226                Some(compute_task_type_hash(task_type))
1227            } else {
1228                None
1229            };
1230
1231            SnapshotItem {
1232                task_id,
1233                meta,
1234                data,
1235                task_type_hash,
1236            }
1237        };
1238
1239        let task_snapshots =
1240            self.storage
1241                .take_snapshot(snapshot_guard, &process, reason.drain_entries());
1242
1243        drop(snapshot_span);
1244        let snapshot_duration = start.elapsed();
1245        let task_count = task_snapshots.len();
1246
1247        if task_snapshots.is_empty() {
1248            // This should be impossible — if we got here, modified_count was nonzero, and every
1249            // modification that increments the count also failed during encoding.
1250            std::hint::cold_path();
1251            return Ok((snapshot_time, false));
1252        }
1253
1254        let persist_start = Instant::now();
1255        let span = tracing::info_span!(
1256            parent: parent_span,
1257            "persist",
1258            reason = reason.as_str(),
1259            data_items= tracing::field::Empty,
1260            meta_items= tracing::field::Empty,
1261            task_cache_items= tracing::field::Empty,
1262            next_task_id= tracing::field::Empty,)
1263        .entered();
1264        {
1265            // Tasks were already consumed by take_snapshot, so a future snapshot
1266            // would not re-persist them — returning an error signals to the caller
1267            // that further persist attempts would corrupt the task graph in storage.
1268            let SnapshotMeta {
1269                task_cache_items,
1270                data_items,
1271                meta_items,
1272                max_next_task_id,
1273            } = self
1274                .backing_storage
1275                .save_snapshot(suspended_operations, task_snapshots)?;
1276            span.record("data_items", data_items);
1277            span.record("meta_items", meta_items);
1278            span.record("task_cache_items", task_cache_items);
1279            span.record("next_task_id", max_next_task_id);
1280
1281            #[cfg(feature = "print_cache_item_size")]
1282            {
1283                let mut task_cache_stats = task_cache_stats
1284                    .into_inner()
1285                    .into_iter()
1286                    .collect::<Vec<_>>();
1287                if !task_cache_stats.is_empty() {
1288                    use turbo_tasks::util::FormatBytes;
1289
1290                    use crate::utils::markdown_table::print_markdown_table;
1291
1292                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1293                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1294                    });
1295
1296                    println!(
1297                        "Task cache stats: {}",
1298                        FormatSizes {
1299                            size: task_cache_stats
1300                                .iter()
1301                                .map(|(_, s)| s.data + s.meta)
1302                                .sum::<usize>(),
1303                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1304                            compressed_size: task_cache_stats
1305                                .iter()
1306                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1307                                .sum::<usize>()
1308                        },
1309                    );
1310
1311                    print_markdown_table(
1312                        [
1313                            "Task",
1314                            " Total Size",
1315                            " Data Size",
1316                            " Data Count x Avg",
1317                            " Data Count x Avg",
1318                            " Meta Size",
1319                            " Meta Count x Avg",
1320                            " Meta Count x Avg",
1321                            " Uppers",
1322                            " Coll",
1323                            " Agg Coll",
1324                            " Children",
1325                            " Followers",
1326                            " Coll Deps",
1327                            " Agg Dirty",
1328                            " Output Size",
1329                        ],
1330                        task_cache_stats.iter(),
1331                        |(task_desc, stats)| {
1332                            [
1333                                task_desc.to_string(),
1334                                format!(" {}", stats.format_total()),
1335                                format!(" {}", stats.format_data()),
1336                                format!(" {} x", stats.data_count),
1337                                format!("{}", stats.format_avg_data()),
1338                                format!(" {}", stats.format_meta()),
1339                                format!(" {} x", stats.meta_count),
1340                                format!("{}", stats.format_avg_meta()),
1341                                format!(" {}", stats.upper_count),
1342                                format!(" {}", stats.collectibles_count),
1343                                format!(" {}", stats.aggregated_collectibles_count),
1344                                format!(" {}", stats.children_count),
1345                                format!(" {}", stats.followers_count),
1346                                format!(" {}", stats.collectibles_dependents_count),
1347                                format!(" {}", stats.aggregated_dirty_containers_count),
1348                                format!(" {}", FormatBytes(stats.output_size)),
1349                            ]
1350                        },
1351                    );
1352                }
1353            }
1354        }
1355
1356        let elapsed = start.elapsed();
1357        let persist_duration = persist_start.elapsed();
1358        // avoid spamming the event queue with information about fast operations
1359        if elapsed > Duration::from_secs(10) {
1360            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1361                "Finished writing to filesystem cache".to_string(),
1362                elapsed,
1363            )));
1364        }
1365
1366        let wall_start_ms = wall_start
1367            .duration_since(SystemTime::UNIX_EPOCH)
1368            .unwrap_or_default()
1369            // as_millis_f64 is not stable yet
1370            .as_secs_f64()
1371            * 1000.0;
1372        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1373        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1374            "turbopack-persistence",
1375            wall_start_ms,
1376            wall_end_ms,
1377            vec![
1378                ("reason", serde_json::Value::from(reason.as_str())),
1379                (
1380                    "snapshot_duration_ms",
1381                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1382                ),
1383                (
1384                    "persist_duration_ms",
1385                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1386                ),
1387                ("task_count", serde_json::Value::from(task_count)),
1388            ],
1389        )));
1390
1391        Ok((snapshot_time, true))
1392    }
1393
1394    fn startup(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1395        if self.should_restore() {
1396            // Continue all uncompleted operations
1397            // They can't be interrupted by a snapshot since the snapshotting job has not been
1398            // scheduled yet.
1399            let uncompleted_operations = self
1400                .backing_storage
1401                .uncompleted_operations()
1402                .expect("Failed to get uncompleted operations");
1403            if !uncompleted_operations.is_empty() {
1404                let mut ctx = self.execute_context(turbo_tasks);
1405                for op in uncompleted_operations {
1406                    op.execute(&mut ctx);
1407                }
1408            }
1409        }
1410
1411        // Only when it should write regularly to the storage, we schedule the initial snapshot
1412        // job.
1413        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1414            // Schedule the snapshot job
1415            let _span = trace_span!("persisting background job").entered();
1416            let _span = tracing::info_span!("thread").entered();
1417            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1418        }
1419    }
1420
1421    fn stopping(&self) {
1422        self.stopping.store(true, Ordering::Release);
1423        self.stopping_event.notify(usize::MAX);
1424    }
1425
1426    #[allow(unused_variables)]
1427    fn stop(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1428        #[cfg(feature = "verify_aggregation_graph")]
1429        {
1430            self.is_idle.store(false, Ordering::Release);
1431            self.verify_aggregation_graph(turbo_tasks, false);
1432        }
1433        // eagerly drop the task cache before persisting
1434        self.storage.drop_task_cache();
1435        if self.should_persist() {
1436            // The task_cache is a pure perf cache backed by the DB and isn't read during the
1437            // stop snapshot (no task creation runs concurrently with stop). Drop it before
1438            // persisting to lower peak memory during the serialization/write.
1439            if let Err(err) =
1440                self.snapshot_and_persist(Span::current().into(), SnapshotReason::Stop, turbo_tasks)
1441            {
1442                eprintln!("Persisting failed during shutdown: {err:?}");
1443            }
1444        }
1445        self.storage.drop_contents();
1446        if let Err(err) = self.backing_storage.shutdown() {
1447            println!("Shutting down failed: {err}");
1448        }
1449    }
1450
1451    #[allow(unused_variables)]
1452    fn idle_start(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1453        self.idle_start_event.notify(usize::MAX);
1454
1455        #[cfg(feature = "verify_aggregation_graph")]
1456        {
1457            use tokio::select;
1458
1459            self.is_idle.store(true, Ordering::Release);
1460            // The spawned task reaches the backend through the pinned `turbo_tasks` handle
1461            // rather than cloning `self`: pinning keeps the `TurboTasks` (and therefore the
1462            // backend it owns) alive for the task's lifetime.
1463            let turbo_tasks = turbo_tasks.pin();
1464            tokio::task::spawn(async move {
1465                let backend = &turbo_tasks.backend();
1466                select! {
1467                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1468                        // do nothing
1469                    }
1470                    _ = backend.idle_end_event.listen() => {
1471                        return;
1472                    }
1473                }
1474                if !backend.is_idle.load(Ordering::Relaxed) {
1475                    return;
1476                }
1477                backend.verify_aggregation_graph(&turbo_tasks, true);
1478            });
1479        }
1480    }
1481
1482    fn idle_end(&self) {
1483        #[cfg(feature = "verify_aggregation_graph")]
1484        self.is_idle.store(false, Ordering::Release);
1485        self.idle_end_event.notify(usize::MAX);
1486    }
1487
1488    fn get_or_create_task(
1489        &self,
1490        native_fn: &'static NativeFunction,
1491        this: Option<RawVc>,
1492        arg: &mut dyn DynTaskInputsStorage,
1493        parent_task: Option<TaskId>,
1494        persistence: TaskPersistence,
1495        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1496    ) -> TaskId {
1497        let transient = matches!(persistence, TaskPersistence::Transient);
1498
1499        if transient
1500            && let Some(parent_task) = parent_task
1501            && !parent_task.is_transient()
1502        {
1503            let task_type = CachedTaskType {
1504                native_fn,
1505                this,
1506                arg: arg.take_box(),
1507            };
1508            self.panic_persistent_calling_transient(
1509                self.debug_get_task_description(parent_task),
1510                Some(&task_type),
1511                /* cell_id */ None,
1512            );
1513        }
1514
1515        let is_root = native_fn.is_root;
1516
1517        // Compute hash and shard index once from borrowed components (no heap allocation).
1518        let arg_ref = arg.as_ref();
1519        let hash = CachedTaskType::hash_from_components(
1520            self.storage.task_cache.hasher(),
1521            native_fn,
1522            this,
1523            arg_ref,
1524        );
1525        // Locate the shard once so that the read-only lookup and any
1526        // write-lock retry below share the same reference (saves a modulo +
1527        // memory lookup on the miss path).
1528        let shard = get_shard(&self.storage.task_cache, hash);
1529
1530        let mut ctx = self.execute_context(turbo_tasks);
1531        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1532        // Use a read lock rather than a write lock to avoid contention. connect_child
1533        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1534        if let Some(task_id) =
1535            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1536        {
1537            self.track_cache_hit_by_fn(native_fn);
1538            operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1539            return task_id;
1540        }
1541
1542        // Step 2: Check backing storage using borrowed components (no box needed yet).
1543
1544        // Task exists in backing storage.
1545        // We only need to insert it into the in-memory cache.
1546        let task_id = if !transient
1547            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1548        {
1549            self.track_cache_hit_by_fn(native_fn);
1550            // Step 3a: Insert into in-memory cache using the pre-located shard.
1551            // Use the existing Arc from storage to avoid a duplicate allocation.
1552            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1553                k.eq_components(native_fn, this, arg_ref)
1554            }) {
1555                RawEntry::Occupied(_) => {}
1556                RawEntry::Vacant(e) => {
1557                    e.insert(stored_type, task_id);
1558                }
1559            };
1560            task_id
1561        } else {
1562            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1563                k.eq_components(native_fn, this, arg_ref)
1564            }) {
1565                RawEntry::Occupied(e) => {
1566                    // Another thread beat us to creating this task — use their task_id.
1567                    // They will handle logging the new task as modified.
1568                    let task_id = *e.get();
1569                    drop(e);
1570                    self.track_cache_hit_by_fn(native_fn);
1571                    task_id
1572                }
1573                RawEntry::Vacant(e) => {
1574                    // Only now do we force the allocation.
1575                    // NOTE: if our caller had to perform resolution, then this will have already
1576                    // been boxed and take_box just takes it.
1577                    let task_type = CachedTaskTypeArc::new(CachedTaskType {
1578                        native_fn,
1579                        this,
1580                        arg: arg.take_box(),
1581                    });
1582                    let task_id = if transient {
1583                        self.transient_task_id_factory.get()
1584                    } else {
1585                        self.persisted_task_id_factory.get()
1586                    };
1587                    // Initialize storage BEFORE making task_id visible in the cache.
1588                    // This ensures any thread that reads task_id from the cache sees
1589                    // the storage entry already initialized (restored flags set).
1590                    self.storage
1591                        .initialize_new_task(task_id, Some(task_type.clone()));
1592                    // insert() consumes e, releasing the shard write lock.
1593                    e.insert(task_type, task_id);
1594                    self.track_cache_miss_by_fn(native_fn);
1595                    // Update the aggregation number before connecting the child
1596                    // We don't need this on any of the task recovery paths above because the
1597                    // aggregation number will already be set.
1598                    if is_root {
1599                        AggregationUpdateQueue::run(
1600                            AggregationUpdateJob::UpdateAggregationNumber {
1601                                task_id,
1602                                base_aggregation_number: u32::MAX,
1603                                distance: None,
1604                            },
1605                            &mut ctx,
1606                        );
1607                    } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1608                        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1609                        AggregationUpdateQueue::run(
1610                            AggregationUpdateJob::UpdateAggregationNumber {
1611                                task_id,
1612                                base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1613                                distance: None,
1614                            },
1615                            &mut ctx,
1616                        );
1617                    };
1618
1619                    task_id
1620                }
1621            }
1622        };
1623
1624        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1625
1626        task_id
1627    }
1628
1629    /// Generate an object that implements [`fmt::Display`] explaining why the given
1630    /// [`CachedTaskType`] is transient.
1631    fn debug_trace_transient_task(
1632        &self,
1633        task_type: &CachedTaskType,
1634        cell_id: Option<CellId>,
1635    ) -> DebugTraceTransientTask {
1636        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1637        // from tracing the same task many times, so use a visited_set
1638        fn inner_id(
1639            backend: &TurboTasksBackend,
1640            task_id: TaskId,
1641            cell_type_id: Option<ValueTypeId>,
1642            visited_set: &mut FxHashSet<TaskId>,
1643        ) -> DebugTraceTransientTask {
1644            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1645                if visited_set.contains(&task_id) {
1646                    let task_name = task_type.get_name();
1647                    DebugTraceTransientTask::Collapsed {
1648                        task_name,
1649                        cell_type_id,
1650                    }
1651                } else {
1652                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1653                }
1654            } else {
1655                DebugTraceTransientTask::Uncached { cell_type_id }
1656            }
1657        }
1658        fn inner_cached(
1659            backend: &TurboTasksBackend,
1660            task_type: &CachedTaskType,
1661            cell_type_id: Option<ValueTypeId>,
1662            visited_set: &mut FxHashSet<TaskId>,
1663        ) -> DebugTraceTransientTask {
1664            let task_name = task_type.get_name();
1665
1666            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1667                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1668                    // `task_id` should never be `None` at this point, as that would imply a
1669                    // non-local task is returning a local `Vc`...
1670                    // Just ignore if it happens, as we're likely already panicking.
1671                    return None;
1672                };
1673                if task_id.is_transient() {
1674                    Some(Box::new(inner_id(
1675                        backend,
1676                        task_id,
1677                        cause_self_raw_vc.try_get_type_id(),
1678                        visited_set,
1679                    )))
1680                } else {
1681                    None
1682                }
1683            });
1684            let cause_args = task_type
1685                .arg
1686                .get_raw_vcs()
1687                .into_iter()
1688                .filter_map(|raw_vc| {
1689                    let Some(task_id) = raw_vc.try_get_task_id() else {
1690                        // `task_id` should never be `None` (see comment above)
1691                        return None;
1692                    };
1693                    if !task_id.is_transient() {
1694                        return None;
1695                    }
1696                    Some((task_id, raw_vc.try_get_type_id()))
1697                })
1698                .collect::<IndexSet<_>>() // dedupe
1699                .into_iter()
1700                .map(|(task_id, cell_type_id)| {
1701                    inner_id(backend, task_id, cell_type_id, visited_set)
1702                })
1703                .collect();
1704
1705            DebugTraceTransientTask::Cached {
1706                task_name,
1707                cell_type_id,
1708                cause_self,
1709                cause_args,
1710            }
1711        }
1712        inner_cached(
1713            self,
1714            task_type,
1715            cell_id.map(|c| c.type_id),
1716            &mut FxHashSet::default(),
1717        )
1718    }
1719
1720    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1721        if !self.should_track_dependencies() {
1722            panic!("Dependency tracking is disabled so invalidation is not allowed");
1723        }
1724        operation::InvalidateOperation::run(
1725            smallvec![task_id],
1726            #[cfg(feature = "task_dirty_cause")]
1727            TaskDirtyCause::Invalidator,
1728            self.execute_context(turbo_tasks),
1729        );
1730    }
1731
1732    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1733        if !self.should_track_dependencies() {
1734            panic!("Dependency tracking is disabled so invalidation is not allowed");
1735        }
1736        operation::InvalidateOperation::run(
1737            tasks.iter().copied().collect(),
1738            #[cfg(feature = "task_dirty_cause")]
1739            TaskDirtyCause::Unknown,
1740            self.execute_context(turbo_tasks),
1741        );
1742    }
1743
1744    fn invalidate_tasks_set(
1745        &self,
1746        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1747        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1748    ) {
1749        if !self.should_track_dependencies() {
1750            panic!("Dependency tracking is disabled so invalidation is not allowed");
1751        }
1752        operation::InvalidateOperation::run(
1753            tasks.iter().copied().collect(),
1754            #[cfg(feature = "task_dirty_cause")]
1755            TaskDirtyCause::Unknown,
1756            self.execute_context(turbo_tasks),
1757        );
1758    }
1759
1760    fn invalidate_serialization(
1761        &self,
1762        task_id: TaskId,
1763        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1764    ) {
1765        if task_id.is_transient() {
1766            return;
1767        }
1768        let mut ctx = self.execute_context(turbo_tasks);
1769        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1770        task.invalidate_serialization();
1771    }
1772
1773    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1774        let task = self.storage.access_mut(task_id);
1775        if let Some(value) = task.get_persistent_task_type() {
1776            format!("{task_id:?} {}", value)
1777        } else if let Some(value) = task.get_transient_task_type() {
1778            format!("{task_id:?} {}", value)
1779        } else {
1780            format!("{task_id:?} unknown")
1781        }
1782    }
1783
1784    fn get_task_name(
1785        &self,
1786        task_id: TaskId,
1787        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1788    ) -> String {
1789        let mut ctx = self.execute_context(turbo_tasks);
1790        let task = ctx.task(task_id, TaskDataCategory::Data);
1791        if let Some(value) = task.get_persistent_task_type() {
1792            value.to_string()
1793        } else if let Some(value) = task.get_transient_task_type() {
1794            value.to_string()
1795        } else {
1796            "unknown".to_string()
1797        }
1798    }
1799
1800    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1801        let task = self.storage.access_mut(task_id);
1802        task.get_persistent_task_type().cloned()
1803    }
1804
1805    fn task_execution_canceled(
1806        &self,
1807        task_id: TaskId,
1808        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1809    ) {
1810        let mut ctx = self.execute_context(turbo_tasks);
1811        let mut task = ctx.task(task_id, TaskDataCategory::All);
1812        if let Some(in_progress) = task.take_in_progress() {
1813            match in_progress {
1814                InProgressState::Scheduled {
1815                    done_event,
1816                    reason: _,
1817                } => done_event.notify(usize::MAX),
1818                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1819                    done_event.notify(usize::MAX)
1820                }
1821                InProgressState::Canceled => {}
1822            }
1823        }
1824        // Notify any readers waiting on in-progress cells so their listeners
1825        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1826        let in_progress_cells = task.take_in_progress_cells();
1827        if let Some(ref cells) = in_progress_cells {
1828            for state in cells.values() {
1829                state.event.notify(usize::MAX);
1830            }
1831        }
1832
1833        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1834        // in the next session. Without this, any reader that encounters the cancelled task
1835        // records an error in its output. That error is persisted and would poison
1836        // subsequent builds. By marking the task session-dependent dirty, the next build
1837        // re-executes it, which invalidates dependents and corrects the stale errors.
1838        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1839            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1840        } else {
1841            None
1842        };
1843
1844        let old = task.set_in_progress(InProgressState::Canceled);
1845        debug_assert!(old.is_none(), "InProgress already exists");
1846        drop(task);
1847
1848        if let Some(data_update) = data_update {
1849            AggregationUpdateQueue::run(data_update, &mut ctx);
1850        }
1851
1852        drop(in_progress_cells);
1853    }
1854
1855    fn try_start_task_execution(
1856        &self,
1857        task_id: TaskId,
1858        priority: TaskPriority,
1859        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1860    ) -> Option<TaskExecutionSpec<'_>> {
1861        let execution_reason;
1862        let task_type;
1863        #[cfg(feature = "task_dirty_cause")]
1864        let cause;
1865        {
1866            let mut ctx = self.execute_context(turbo_tasks);
1867            let mut task = ctx.task(task_id, TaskDataCategory::All);
1868            task_type = task.get_task_type().to_owned();
1869            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1870            if let Some(tasks) = task.prefetch() {
1871                drop(task);
1872                ctx.prepare_tasks(tasks, "prefetch");
1873                task = ctx.task(task_id, TaskDataCategory::All);
1874            }
1875            let in_progress = task.take_in_progress()?;
1876            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1877                let old = task.set_in_progress(in_progress);
1878                debug_assert!(old.is_none(), "InProgress already exists");
1879                return None;
1880            };
1881            execution_reason = reason;
1882            #[cfg(feature = "task_dirty_cause")]
1883            {
1884                cause = match task.get_dirty() {
1885                    Some(Dirtyness::Dirty { cause, .. }) => Some(cause.clone()),
1886                    _ => None,
1887                };
1888            }
1889            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1890                InProgressStateInner {
1891                    stale: false,
1892                    once_task,
1893                    done_event,
1894                    marked_as_completed: false,
1895                    new_children: Default::default(),
1896                },
1897            )));
1898            debug_assert!(old.is_none(), "InProgress already exists");
1899
1900            // Make all current collectibles outdated (remove left-over outdated collectibles)
1901            enum Collectible {
1902                Current(CollectibleRef, i32),
1903                Outdated(CollectibleRef),
1904            }
1905            let collectibles = task
1906                .iter_collectibles()
1907                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1908                .chain(
1909                    task.iter_outdated_collectibles()
1910                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1911                )
1912                .collect::<Vec<_>>();
1913            for collectible in collectibles {
1914                match collectible {
1915                    Collectible::Current(collectible, value) => {
1916                        let _ = task.insert_outdated_collectible(collectible, value);
1917                    }
1918                    Collectible::Outdated(collectible) => {
1919                        if task
1920                            .collectibles()
1921                            .is_none_or(|m| m.get(&collectible).is_none())
1922                        {
1923                            task.remove_outdated_collectibles(&collectible);
1924                        }
1925                    }
1926                }
1927            }
1928
1929            if self.should_track_dependencies() {
1930                // Make all dependencies outdated
1931                // We copy to outdated which allows us to detect when we drop a dependency, but we
1932                // keep existing dependencies in place to avoid redoing the
1933                // dependency registration work if we re-read them
1934                let cell_dependencies = task.iter_cell_dependencies().collect();
1935                task.set_outdated_cell_dependencies(cell_dependencies);
1936                let cell_dependencies_hashed = task.iter_cell_dependencies_hashed().collect();
1937                task.set_outdated_cell_dependencies_hashed(cell_dependencies_hashed);
1938
1939                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1940                task.set_outdated_output_dependencies(outdated_output_dependencies);
1941            }
1942        }
1943
1944        let (span, future) = match task_type {
1945            TaskType::Cached(task_type) => {
1946                let CachedTaskType {
1947                    native_fn,
1948                    this,
1949                    arg,
1950                } = &*task_type;
1951                (
1952                    native_fn.span(
1953                        task_id.persistence(),
1954                        execution_reason,
1955                        priority,
1956                        #[cfg(feature = "task_dirty_cause")]
1957                        cause.as_ref(),
1958                    ),
1959                    native_fn.execute(*this, &**arg),
1960                )
1961            }
1962            TaskType::Transient(task_type) => {
1963                let span = tracing::trace_span!("turbo_tasks::root_task");
1964                let future = match &*task_type {
1965                    TransientTask::Root(f) => f(),
1966                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1967                };
1968                (span, future)
1969            }
1970        };
1971        Some(TaskExecutionSpec { future, span })
1972    }
1973
1974    /// Returns `Some(priority)` if the task became stale during execution and needs to be
1975    /// re-scheduled at the given priority. The caller (turbo-tasks manager) hands it to the
1976    /// priority runner so re-execution doesn't inherit the original schedule priority.
1977    fn task_execution_completed(
1978        &self,
1979        task_id: TaskId,
1980        result: Result<RawVc, TurboTasksExecutionError>,
1981        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1982        #[cfg(feature = "verify_determinism")] stateful: bool,
1983        has_invalidator: bool,
1984        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1985    ) -> Option<TaskPriority> {
1986        // Task completion is a 4 step process:
1987        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1988        //    aggregation number of the task and the new children.
1989        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1990        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1991        // 4. Shrink the task memory to reduce footprint of the task.
1992
1993        // Due to persistence it is possible that the process is cancelled after any step. This is
1994        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1995        // in-memory representation.
1996
1997        // The task might be invalidated during this process, so we need to check the stale flag
1998        // at the start of every step.
1999
2000        #[cfg(not(feature = "trace_task_details"))]
2001        let span = tracing::trace_span!(
2002            "task execution completed",
2003            new_children = tracing::field::Empty
2004        )
2005        .entered();
2006        #[cfg(feature = "trace_task_details")]
2007        let span = tracing::trace_span!(
2008            "task execution completed",
2009            task_id = display(task_id),
2010            result = match result.as_ref() {
2011                Ok(value) => display(either::Either::Left(value)),
2012                Err(err) => display(either::Either::Right(err)),
2013            },
2014            new_children = tracing::field::Empty,
2015            immutable = tracing::field::Empty,
2016            new_output = tracing::field::Empty,
2017            output_dependents = tracing::field::Empty,
2018            aggregation_number = tracing::field::Empty,
2019            stale = tracing::field::Empty,
2020        )
2021        .entered();
2022
2023        let is_error = result.is_err();
2024
2025        let mut ctx = self.execute_context(turbo_tasks);
2026
2027        let TaskExecutionCompletePrepareResult {
2028            new_children,
2029            is_now_immutable,
2030            #[cfg(feature = "verify_determinism")]
2031            no_output_set,
2032            new_output,
2033            #[cfg(feature = "task_dirty_cause")]
2034            function_id,
2035            output_dependent_tasks,
2036            is_recomputation,
2037            is_session_dependent,
2038        } = match self.task_execution_completed_prepare(
2039            &mut ctx,
2040            #[cfg(feature = "trace_task_details")]
2041            &span,
2042            task_id,
2043            result,
2044            cell_counters,
2045            #[cfg(feature = "verify_determinism")]
2046            stateful,
2047            has_invalidator,
2048        ) {
2049            Ok(r) => r,
2050            Err(stale_priority) => {
2051                // Task was stale and has been rescheduled
2052                #[cfg(feature = "trace_task_details")]
2053                span.record("stale", "prepare");
2054                return Some(stale_priority);
2055            }
2056        };
2057
2058        #[cfg(feature = "trace_task_details")]
2059        span.record("new_output", new_output.is_some());
2060        #[cfg(feature = "trace_task_details")]
2061        span.record("output_dependents", output_dependent_tasks.len());
2062
2063        // When restoring from filesystem cache the following might not be executed (since we can
2064        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2065        // would be executed again.
2066
2067        if !output_dependent_tasks.is_empty() {
2068            self.task_execution_completed_invalidate_output_dependent(
2069                &mut ctx,
2070                task_id,
2071                #[cfg(feature = "task_dirty_cause")]
2072                function_id,
2073                output_dependent_tasks,
2074            );
2075        }
2076
2077        let has_new_children = !new_children.is_empty();
2078        span.record("new_children", new_children.len());
2079
2080        if has_new_children {
2081            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2082        }
2083
2084        if has_new_children
2085            && let Some(stale_priority) =
2086                self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2087        {
2088            // Task was stale and has been rescheduled
2089            #[cfg(feature = "trace_task_details")]
2090            span.record("stale", "connect");
2091            return Some(stale_priority);
2092        }
2093
2094        let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2095            &mut ctx,
2096            task_id,
2097            #[cfg(feature = "verify_determinism")]
2098            no_output_set,
2099            new_output,
2100            is_now_immutable,
2101            is_session_dependent,
2102        );
2103        if let Some(stale_priority) = stale_priority {
2104            // Task was stale and has been rescheduled
2105            #[cfg(feature = "trace_task_details")]
2106            span.record("stale", "finish");
2107            return Some(stale_priority);
2108        }
2109
2110        let removed_data = self.task_execution_completed_cleanup(
2111            &mut ctx,
2112            task_id,
2113            cell_counters,
2114            is_error,
2115            is_recomputation,
2116        );
2117
2118        // Drop data outside of critical sections
2119        drop(removed_data);
2120        drop(in_progress_cells);
2121
2122        None
2123    }
2124
2125    fn task_execution_completed_prepare(
2126        &self,
2127        ctx: &mut impl ExecuteContext<'_>,
2128        #[cfg(feature = "trace_task_details")] span: &Span,
2129        task_id: TaskId,
2130        result: Result<RawVc, TurboTasksExecutionError>,
2131        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2132        #[cfg(feature = "verify_determinism")] stateful: bool,
2133        has_invalidator: bool,
2134    ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2135        let mut task = ctx.task(task_id, TaskDataCategory::All);
2136        let is_recomputation = task.is_dirty().is_none();
2137        // Without dependency tracking, the SessionDependent dirty state is never read (no session
2138        // restore), so skip the work
2139        let is_session_dependent = self.should_track_dependencies()
2140            && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2141        let Some(in_progress) = task.get_in_progress_mut() else {
2142            panic!("Task execution completed, but task is not in progress: {task:#?}");
2143        };
2144        if matches!(in_progress, InProgressState::Canceled) {
2145            return Ok(TaskExecutionCompletePrepareResult {
2146                new_children: Default::default(),
2147                is_now_immutable: false,
2148                #[cfg(feature = "verify_determinism")]
2149                no_output_set: false,
2150                #[cfg(feature = "task_dirty_cause")]
2151                function_id: None,
2152                new_output: None,
2153                output_dependent_tasks: Default::default(),
2154                is_recomputation,
2155                is_session_dependent,
2156            });
2157        }
2158        let &mut InProgressState::InProgress(box InProgressStateInner {
2159            stale,
2160            ref mut new_children,
2161            once_task: is_once_task,
2162            ..
2163        }) = in_progress
2164        else {
2165            panic!("Task execution completed, but task is not in progress: {task:#?}");
2166        };
2167
2168        // If the task is stale, reschedule it
2169        #[cfg(not(feature = "no_fast_stale"))]
2170        if stale && !is_once_task {
2171            let stale_priority = compute_stale_priority(&task);
2172            let Some(InProgressState::InProgress(box InProgressStateInner {
2173                done_event,
2174                mut new_children,
2175                ..
2176            })) = task.take_in_progress()
2177            else {
2178                unreachable!();
2179            };
2180            let old = task.set_in_progress(InProgressState::Scheduled {
2181                done_event,
2182                reason: TaskExecutionReason::Stale,
2183            });
2184            debug_assert!(old.is_none(), "InProgress already exists");
2185            // Remove old children from new_children to leave only the children that had their
2186            // active count increased
2187            for task in task.iter_children() {
2188                new_children.remove(&task);
2189            }
2190            drop(task);
2191
2192            // We need to undo the active count increase for the children since we throw away the
2193            // new_children list now.
2194            AggregationUpdateQueue::run(
2195                AggregationUpdateJob::DecreaseActiveCounts {
2196                    task_ids: new_children.into_iter().collect(),
2197                },
2198                ctx,
2199            );
2200            return Err(stale_priority);
2201        }
2202
2203        // take the children from the task to process them
2204        let mut new_children = take(new_children);
2205
2206        // get the function id for the dirty cause
2207        #[cfg(feature = "task_dirty_cause")]
2208        let function_id = match task.get_task_type() {
2209            TaskTypeRef::Cached(task_type) => {
2210                Some(turbo_tasks::registry::get_function_id(task_type.native_fn))
2211            }
2212            TaskTypeRef::Transient(_) => None,
2213        };
2214
2215        // handle stateful (only tracked when verify_determinism is enabled)
2216        #[cfg(feature = "verify_determinism")]
2217        if stateful {
2218            task.set_stateful(true);
2219        }
2220
2221        // handle has_invalidator
2222        if has_invalidator {
2223            task.set_invalidator(true);
2224        }
2225
2226        // handle cell counters: update max index and remove cells that are no longer used
2227        // On error, skip this update: the task may have failed before creating all cells it
2228        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2229        // based on partial counters would cause "cell no longer exists" errors for tasks that
2230        // still hold dependencies on those cells. The old cell data is preserved on error
2231        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2232        // that data is correct.
2233        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2234        // skips cell data removal on error.
2235        if result.is_ok() || is_recomputation {
2236            let old_counters: FxHashMap<_, _> = task
2237                .iter_cell_type_max_index()
2238                .map(|(&k, &v)| (k, v))
2239                .collect();
2240            let mut counters_to_remove = old_counters.clone();
2241
2242            for (&cell_type, &max_index) in cell_counters.iter() {
2243                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2244                    if old_max_index != max_index {
2245                        task.insert_cell_type_max_index(cell_type, max_index);
2246                    }
2247                } else {
2248                    task.insert_cell_type_max_index(cell_type, max_index);
2249                }
2250            }
2251            for (cell_type, _) in counters_to_remove {
2252                task.remove_cell_type_max_index(&cell_type);
2253            }
2254        }
2255
2256        let mut queue = AggregationUpdateQueue::new();
2257
2258        let mut old_edges = Vec::new();
2259
2260        let has_children = !new_children.is_empty();
2261        let is_immutable = task.immutable();
2262        let task_dependencies_for_immutable =
2263            // Task was previously marked as immutable
2264            if !is_immutable
2265            // Task is not session dependent (session dependent tasks can change between sessions)
2266            && !is_session_dependent
2267            // Task has no invalidator
2268            && !task.invalidator()
2269            // Task has no dependencies on collectibles
2270            && task.is_collectibles_dependencies_empty()
2271        {
2272            Some(
2273                // Collect all dependencies on tasks to check if all dependencies are immutable.
2274                task.iter_output_dependencies()
2275                    .chain(task.iter_cell_dependencies().map(|r| r.task))
2276                    .chain(task.iter_cell_dependencies_hashed().map(|(r, _)| r.task))
2277                    .collect::<FxHashSet<_>>(),
2278            )
2279        } else {
2280            None
2281        };
2282
2283        if has_children {
2284            // Prepare all new children
2285            let _aggregation_number =
2286                prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2287
2288            #[cfg(feature = "trace_task_details")]
2289            span.record("aggregation_number", _aggregation_number);
2290
2291            // Filter actual new children
2292            old_edges.extend(
2293                task.iter_children()
2294                    .filter(|task| !new_children.remove(task))
2295                    .map(OutdatedEdge::Child),
2296            );
2297        } else {
2298            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2299        }
2300
2301        old_edges.extend(
2302            task.iter_outdated_collectibles()
2303                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2304        );
2305
2306        if self.should_track_dependencies() {
2307            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2308            // At execution start, active deps are copied to outdated as a "before" snapshot.
2309            // During execution, new deps are added to active.
2310            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2311            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2312            // breaking dependency tracking.
2313            old_edges.extend(
2314                task.iter_outdated_cell_dependencies()
2315                    .map(OutdatedEdge::CellDependency),
2316            );
2317            old_edges.extend(
2318                task.iter_outdated_cell_dependencies_hashed()
2319                    .map(|(r, k)| OutdatedEdge::HashedCellDependency(r, k)),
2320            );
2321            old_edges.extend(
2322                task.iter_outdated_output_dependencies()
2323                    .map(OutdatedEdge::OutputDependency),
2324            );
2325        }
2326
2327        // Check if output need to be updated
2328        let current_output = task.get_output();
2329        #[cfg(feature = "verify_determinism")]
2330        let no_output_set = current_output.is_none();
2331        let new_output = match result {
2332            Ok(RawVc::TaskOutput(output_task_id)) => {
2333                if let Some(OutputValue::Output(current_task_id)) = current_output
2334                    && *current_task_id == output_task_id
2335                {
2336                    None
2337                } else {
2338                    Some(OutputValue::Output(output_task_id))
2339                }
2340            }
2341            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2342                if let Some(OutputValue::Cell(CellRef {
2343                    task: current_task_id,
2344                    cell: current_cell,
2345                })) = current_output
2346                    && *current_task_id == output_task_id
2347                    && *current_cell == cell
2348                {
2349                    None
2350                } else {
2351                    Some(OutputValue::Cell(CellRef {
2352                        task: output_task_id,
2353                        cell,
2354                    }))
2355                }
2356            }
2357            Ok(RawVc::LocalOutput(..)) => {
2358                panic!("Non-local tasks must not return a local Vc");
2359            }
2360            Err(err) => {
2361                if let Some(OutputValue::Error(old_error)) = current_output
2362                    && **old_error == err
2363                {
2364                    None
2365                } else {
2366                    Some(OutputValue::Error(Arc::new((&err).into())))
2367                }
2368            }
2369        };
2370        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2371        // When output has changed, grab the dependent tasks
2372        if new_output.is_some() && ctx.should_track_dependencies() {
2373            output_dependent_tasks = task.iter_output_dependent().collect();
2374        }
2375
2376        drop(task);
2377
2378        // Check if the task can be marked as immutable
2379        let mut is_now_immutable = false;
2380        if let Some(dependencies) = task_dependencies_for_immutable
2381            && dependencies
2382                .iter()
2383                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2384        {
2385            is_now_immutable = true;
2386        }
2387        #[cfg(feature = "trace_task_details")]
2388        span.record("immutable", is_immutable || is_now_immutable);
2389
2390        if !queue.is_empty() || !old_edges.is_empty() {
2391            #[cfg(any(
2392                feature = "trace_task_completion",
2393                feature = "trace_aggregation_update_stats"
2394            ))]
2395            let _span =
2396                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2397                    .entered();
2398            // Remove outdated edges first, before removing in_progress+dirty flag.
2399            // We need to make sure all outdated edges are removed before the task can potentially
2400            // be scheduled and executed again
2401            #[cfg(feature = "trace_aggregation_update_stats")]
2402            {
2403                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2404                _span.record("stats", tracing::field::debug(stats));
2405            }
2406            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2407            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2408        }
2409
2410        Ok(TaskExecutionCompletePrepareResult {
2411            new_children,
2412            is_now_immutable,
2413            #[cfg(feature = "verify_determinism")]
2414            no_output_set,
2415            #[cfg(feature = "task_dirty_cause")]
2416            function_id,
2417            new_output,
2418            output_dependent_tasks,
2419            is_recomputation,
2420            is_session_dependent,
2421        })
2422    }
2423
2424    fn task_execution_completed_invalidate_output_dependent(
2425        &self,
2426        ctx: &mut impl ExecuteContext<'_>,
2427        task_id: TaskId,
2428        #[cfg(feature = "task_dirty_cause")] function_id: Option<FunctionId>,
2429        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2430    ) {
2431        debug_assert!(!output_dependent_tasks.is_empty());
2432
2433        #[cfg(feature = "task_dirty_cause")]
2434        let cause = match function_id {
2435            Some(function) => TaskDirtyCause::OutputChange { function },
2436            None => TaskDirtyCause::RootOutputChange,
2437        };
2438
2439        if output_dependent_tasks.len() > 1 {
2440            ctx.prepare_tasks(
2441                output_dependent_tasks
2442                    .iter()
2443                    .map(|&id| (id, TaskDataCategory::All)),
2444                "invalidate output dependents",
2445            );
2446        }
2447
2448        fn process_output_dependents(
2449            ctx: &mut impl ExecuteContext<'_>,
2450            task_id: TaskId,
2451            #[cfg(feature = "task_dirty_cause")] cause: &TaskDirtyCause,
2452            dependent_task_id: TaskId,
2453            queue: &mut AggregationUpdateQueue,
2454        ) {
2455            #[cfg(feature = "trace_task_output_dependencies")]
2456            let span = tracing::trace_span!(
2457                "invalidate output dependency",
2458                task = %task_id,
2459                dependent_task = %dependent_task_id,
2460                result = tracing::field::Empty,
2461            )
2462            .entered();
2463            let mut make_stale = true;
2464            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2465            let transient_task_type = dependent.get_transient_task_type();
2466            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2467                // once tasks are never invalidated
2468                #[cfg(feature = "trace_task_output_dependencies")]
2469                span.record("result", "once task");
2470                return;
2471            }
2472            if dependent.outdated_output_dependencies_contains(&task_id) {
2473                #[cfg(feature = "trace_task_output_dependencies")]
2474                span.record("result", "outdated dependency");
2475                // output dependency is outdated, so it hasn't read the output yet
2476                // and doesn't need to be invalidated
2477                // But importantly we still need to make the task dirty as it should no longer
2478                // be considered as "recomputation".
2479                make_stale = false;
2480            } else if !dependent.output_dependencies_contains(&task_id) {
2481                // output dependency has been removed, so the task doesn't depend on the
2482                // output anymore and doesn't need to be invalidated
2483                #[cfg(feature = "trace_task_output_dependencies")]
2484                span.record("result", "no backward dependency");
2485                return;
2486            }
2487            make_task_dirty_internal(
2488                dependent,
2489                dependent_task_id,
2490                make_stale,
2491                #[cfg(feature = "task_dirty_cause")]
2492                cause.clone(),
2493                queue,
2494                ctx,
2495            );
2496            #[cfg(feature = "trace_task_output_dependencies")]
2497            span.record("result", "marked dirty");
2498        }
2499
2500        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2501            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2502            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2503            let _ = scope_and_block(chunks.len(), |scope| {
2504                for chunk in chunks {
2505                    let child_ctx = ctx.child_context();
2506                    #[cfg(feature = "task_dirty_cause")]
2507                    let cause = &cause;
2508                    scope.spawn(move || {
2509                        let mut ctx = child_ctx.create();
2510                        let mut queue = AggregationUpdateQueue::new();
2511                        for dependent_task_id in chunk {
2512                            process_output_dependents(
2513                                &mut ctx,
2514                                task_id,
2515                                #[cfg(feature = "task_dirty_cause")]
2516                                cause,
2517                                dependent_task_id,
2518                                &mut queue,
2519                            )
2520                        }
2521                        queue.execute(&mut ctx);
2522                    });
2523                }
2524            });
2525        } else {
2526            let mut queue = AggregationUpdateQueue::new();
2527            for dependent_task_id in output_dependent_tasks {
2528                process_output_dependents(
2529                    ctx,
2530                    task_id,
2531                    #[cfg(feature = "task_dirty_cause")]
2532                    &cause,
2533                    dependent_task_id,
2534                    &mut queue,
2535                );
2536            }
2537            queue.execute(ctx);
2538        }
2539    }
2540
2541    fn task_execution_completed_unfinished_children_dirty(
2542        &self,
2543        ctx: &mut impl ExecuteContext<'_>,
2544        new_children: &FxHashSet<TaskId>,
2545    ) {
2546        debug_assert!(!new_children.is_empty());
2547
2548        let mut queue = AggregationUpdateQueue::new();
2549        ctx.for_each_task_all(
2550            new_children.iter().copied(),
2551            "unfinished children dirty",
2552            |child_task, ctx| {
2553                if !child_task.has_output() {
2554                    let child_id = child_task.id();
2555                    make_task_dirty_internal(
2556                        child_task,
2557                        child_id,
2558                        false,
2559                        #[cfg(feature = "task_dirty_cause")]
2560                        TaskDirtyCause::InitialDirty,
2561                        &mut queue,
2562                        ctx,
2563                    );
2564                }
2565            },
2566        );
2567
2568        queue.execute(ctx);
2569    }
2570
2571    fn task_execution_completed_connect(
2572        &self,
2573        ctx: &mut impl ExecuteContext<'_>,
2574        task_id: TaskId,
2575        new_children: FxHashSet<TaskId>,
2576    ) -> Option<TaskPriority> {
2577        debug_assert!(!new_children.is_empty());
2578
2579        let mut task = ctx.task(task_id, TaskDataCategory::All);
2580        let Some(in_progress) = task.get_in_progress() else {
2581            panic!("Task execution completed, but task is not in progress: {task:#?}");
2582        };
2583        if matches!(in_progress, InProgressState::Canceled) {
2584            // Task was canceled in the meantime, so we don't connect the children
2585            return None;
2586        }
2587        let InProgressState::InProgress(box InProgressStateInner {
2588            #[cfg(not(feature = "no_fast_stale"))]
2589            stale,
2590            once_task: is_once_task,
2591            ..
2592        }) = in_progress
2593        else {
2594            panic!("Task execution completed, but task is not in progress: {task:#?}");
2595        };
2596
2597        // If the task is stale, reschedule it
2598        #[cfg(not(feature = "no_fast_stale"))]
2599        if *stale && !is_once_task {
2600            let stale_priority = compute_stale_priority(&task);
2601            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2602                task.take_in_progress()
2603            else {
2604                unreachable!();
2605            };
2606            let old = task.set_in_progress(InProgressState::Scheduled {
2607                done_event,
2608                reason: TaskExecutionReason::Stale,
2609            });
2610            debug_assert!(old.is_none(), "InProgress already exists");
2611            drop(task);
2612
2613            // All `new_children` are currently hold active with an active count and we need to undo
2614            // that. (We already filtered out the old children from that list)
2615            AggregationUpdateQueue::run(
2616                AggregationUpdateJob::DecreaseActiveCounts {
2617                    task_ids: new_children.into_iter().collect(),
2618                },
2619                ctx,
2620            );
2621            return Some(stale_priority);
2622        }
2623
2624        let has_active_count = ctx.should_track_activeness()
2625            && task
2626                .get_activeness()
2627                .is_some_and(|activeness| activeness.active_counter > 0);
2628        connect_children(
2629            ctx,
2630            task_id,
2631            task,
2632            new_children,
2633            has_active_count,
2634            ctx.should_track_activeness(),
2635        );
2636
2637        None
2638    }
2639
2640    #[allow(clippy::type_complexity)]
2641    fn task_execution_completed_finish(
2642        &self,
2643        ctx: &mut impl ExecuteContext<'_>,
2644        task_id: TaskId,
2645        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2646        new_output: Option<OutputValue>,
2647        is_now_immutable: bool,
2648        is_session_dependent: bool,
2649    ) -> (
2650        Option<TaskPriority>,
2651        Option<
2652            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2653        >,
2654    ) {
2655        let mut task = ctx.task(task_id, TaskDataCategory::All);
2656        let Some(in_progress) = task.take_in_progress() else {
2657            panic!("Task execution completed, but task is not in progress: {task:#?}");
2658        };
2659        if matches!(in_progress, InProgressState::Canceled) {
2660            // Task was canceled in the meantime, so we don't finish it
2661            return (None, None);
2662        }
2663        let InProgressState::InProgress(box InProgressStateInner {
2664            done_event,
2665            once_task: is_once_task,
2666            stale,
2667            marked_as_completed: _,
2668            new_children,
2669        }) = in_progress
2670        else {
2671            panic!("Task execution completed, but task is not in progress: {task:#?}");
2672        };
2673        debug_assert!(new_children.is_empty());
2674
2675        // If the task is stale, reschedule it
2676        if stale && !is_once_task {
2677            let stale_priority = compute_stale_priority(&task);
2678            let old = task.set_in_progress(InProgressState::Scheduled {
2679                done_event,
2680                reason: TaskExecutionReason::Stale,
2681            });
2682            debug_assert!(old.is_none(), "InProgress already exists");
2683            return (Some(stale_priority), None);
2684        }
2685
2686        // Set the output if it has changed
2687        let mut old_content = None;
2688        if let Some(value) = new_output {
2689            old_content = task.set_output(value);
2690        }
2691
2692        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2693        // to be invalidated and we can mark it as immutable.
2694        if is_now_immutable {
2695            task.set_immutable(true);
2696        }
2697
2698        // Notify in progress cells and remove all of them
2699        let in_progress_cells = task.take_in_progress_cells();
2700        if let Some(ref cells) = in_progress_cells {
2701            for state in cells.values() {
2702                state.event.notify(usize::MAX);
2703            }
2704        }
2705
2706        // Compute and apply the new dirty state, propagating to aggregating ancestors
2707        let new_dirtyness = if is_session_dependent {
2708            Some(Dirtyness::SessionDependent)
2709        } else {
2710            None
2711        };
2712        #[cfg(feature = "verify_determinism")]
2713        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2714        let data_update = task.update_dirty_state(new_dirtyness);
2715
2716        // Under verify_determinism we re-run a task whose dirty state or output unexpectedly
2717        // changed during execution to confirm determinism. The leaf priority keeps these
2718        // verification reruns at the highest priority.
2719        #[cfg(feature = "verify_determinism")]
2720        let stale_priority: Option<TaskPriority> =
2721            ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2722                .then(TaskPriority::leaf);
2723        #[cfg(not(feature = "verify_determinism"))]
2724        let stale_priority: Option<TaskPriority> = None;
2725        if stale_priority.is_some() {
2726            let old = task.set_in_progress(InProgressState::Scheduled {
2727                done_event,
2728                reason: TaskExecutionReason::Stale,
2729            });
2730            debug_assert!(old.is_none(), "InProgress already exists");
2731            drop(task);
2732        } else {
2733            drop(task);
2734
2735            // Notify dependent tasks that are waiting for this task to finish
2736            done_event.notify(usize::MAX);
2737        }
2738
2739        drop(old_content);
2740
2741        if let Some(data_update) = data_update {
2742            AggregationUpdateQueue::run(data_update, ctx);
2743        }
2744
2745        // We return so the data can be dropped outside of critical sections
2746        (stale_priority, in_progress_cells)
2747    }
2748
2749    fn task_execution_completed_cleanup(
2750        &self,
2751        ctx: &mut impl ExecuteContext<'_>,
2752        task_id: TaskId,
2753        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2754        is_error: bool,
2755        is_recomputation: bool,
2756    ) -> Vec<SharedReference> {
2757        let mut task = ctx.task(task_id, TaskDataCategory::All);
2758        let mut removed_cell_data = Vec::new();
2759        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2760        // after an error as it is likely transient and we want to keep the dependent tasks
2761        // clean to avoid re-executions.
2762        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2763        // skips cell_type_max_index updates on error.
2764        if !is_error || is_recomputation {
2765            // Remove no longer existing cells and
2766            // find all outdated data items (removed cells, outdated edges)
2767            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2768            // anyway and we want to avoid needless re-executions. When the cells become
2769            // used again, they are invalidated from the update cell operation.
2770            let to_remove: Vec<_> = task
2771                .iter_cell_data()
2772                .filter_map(|(cell, _)| {
2773                    cell_counters
2774                        .get(&cell.type_id)
2775                        .is_none_or(|start_index| cell.index >= *start_index)
2776                        .then_some(*cell)
2777                })
2778                .collect();
2779            removed_cell_data.reserve_exact(to_remove.len());
2780            for cell in to_remove {
2781                if let Some(data) = task.remove_cell_data(&cell) {
2782                    removed_cell_data.push(data);
2783                }
2784            }
2785            // Remove cell_data_hash entries for cells that no longer exist
2786            let to_remove_hash: Vec<_> = task
2787                .iter_cell_data_hash()
2788                .filter_map(|(cell, _)| {
2789                    cell_counters
2790                        .get(&cell.type_id)
2791                        .is_none_or(|start_index| cell.index >= *start_index)
2792                        .then_some(*cell)
2793                })
2794                .collect();
2795            for cell in to_remove_hash {
2796                task.remove_cell_data_hash(&cell);
2797            }
2798        }
2799
2800        // Clean up task storage after execution:
2801        // - Shrink collections marked with shrink_on_completion
2802        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2803        task.cleanup_after_execution();
2804
2805        drop(task);
2806
2807        // Return so we can drop outside of critical sections
2808        removed_cell_data
2809    }
2810
2811    /// Prints the standard message emitted when the background persisting process stops due to an
2812    /// unrecoverable write error. The caller is responsible for returning from the background job.
2813    fn log_unrecoverable_persist_error() {
2814        eprintln!(
2815            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2816             background persisting process."
2817        );
2818    }
2819
2820    fn run_backend_job<'a>(
2821        &'a self,
2822        job: TurboTasksBackendJob,
2823        turbo_tasks: &'a TurboTasks<TurboTasksBackend>,
2824    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2825        Box::pin(async move {
2826            match job {
2827                TurboTasksBackendJob::Snapshot => {
2828                    debug_assert!(self.should_persist());
2829
2830                    let mut last_snapshot = self.start_time;
2831                    let mut idle_start_listener = self.idle_start_event.listen();
2832                    let mut idle_end_listener = self.idle_end_event.listen();
2833                    // Whether to immediately set an idle timeout if possible.
2834                    // Set to false if we don't persist anything in a cycle.
2835                    let mut fresh_idle = true;
2836                    let mut evicted = false;
2837                    let mut is_first = true;
2838                    'outer: loop {
2839                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2840                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2841                        let idle_timeout = *IDLE_TIMEOUT;
2842                        let (time, mut reason) = if is_first {
2843                            (FIRST_SNAPSHOT_WAIT, SnapshotReason::InitialSnapshotTimeout)
2844                        } else {
2845                            (SNAPSHOT_INTERVAL, SnapshotReason::RegularSnapshotInterval)
2846                        };
2847
2848                        let until = last_snapshot + time;
2849                        if until > Instant::now() {
2850                            let mut stop_listener = self.stopping_event.listen();
2851                            if self.stopping.load(Ordering::Acquire) {
2852                                return;
2853                            }
2854                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2855                                Instant::now() + idle_timeout
2856                            } else {
2857                                far_future()
2858                            };
2859                            loop {
2860                                tokio::select! {
2861                                    _ = &mut stop_listener => {
2862                                        return;
2863                                    },
2864                                    _ = &mut idle_start_listener => {
2865                                        idle_time = Instant::now() + idle_timeout;
2866                                        idle_start_listener = self.idle_start_event.listen()
2867                                    },
2868                                    _ = &mut idle_end_listener => {
2869                                        idle_time = far_future();
2870                                        idle_end_listener = self.idle_end_event.listen()
2871                                    },
2872                                    _ = tokio::time::sleep_until(until) => {
2873                                        break;
2874                                    },
2875                                    _ = tokio::time::sleep_until(idle_time) => {
2876                                        if turbo_tasks.is_idle() {
2877                                            reason = SnapshotReason::IdleTimeout;
2878                                            break;
2879                                        }
2880                                    },
2881                                }
2882                            }
2883                        }
2884
2885                        // Create a root span shared by both the snapshot/persist
2886                        // work and the subsequent compaction so they appear
2887                        // grouped together in trace viewers.
2888                        let background_span =
2889                            tracing::info_span!(parent: None, "background snapshot");
2890                        match self.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2891                            Err(err) => {
2892                                // save_snapshot consumed persisted_task_cache_log entries;
2893                                // further snapshots would corrupt the task graph.
2894                                eprintln!("Persisting failed: {err:?}");
2895                                Self::log_unrecoverable_persist_error();
2896                                return;
2897                            }
2898                            Ok((snapshot_start, new_data)) => {
2899                                // if we see 'new_data' then the next idle transition is 'fresh'
2900                                fresh_idle = new_data;
2901                                is_first = false;
2902                                last_snapshot = snapshot_start;
2903
2904                                // Polls the idle-end event without blocking. Returns
2905                                // `true` and refreshes the listener if idle has ended,
2906                                // `false` if we are still idle.
2907                                macro_rules! check_idle_ended {
2908                                    () => {{
2909                                        tokio::select! {
2910                                            biased;
2911                                            _ = &mut idle_end_listener => {
2912                                                idle_end_listener = self.idle_end_event.listen();
2913                                                true
2914                                            },
2915                                            _ = std::future::ready(()) => false,
2916                                        }
2917                                    }};
2918                                }
2919                                // Evict persisted tasks from memory to reclaim space.
2920                                // Like compaction, this runs after snapshot_and_persist
2921                                // as a separate concern.
2922                                //
2923                                // TODO: improve eviction policy — current approach is a full sweep
2924                                // after every snapshot. Better strategies to consider:
2925                                //   - Memory pressure signals: only evict when RSS exceeds a
2926                                //     threshold rather than unconditionally.
2927                                //   - Recency data: track last-access time per task and evict
2928                                //     least-recently-used entries first rather than all at once.
2929                                //   - Eviction intensity: partial sweeps (evict a fraction of
2930                                //     eligible tasks per cycle) to reduce latency spikes.
2931                                // Evict when there is new data to persist (the common
2932                                // case) or on the very first snapshot after startup
2933                                // (data was already on disk from a prior run, so
2934                                // new_data may be false but in-memory state can still
2935                                // be evicted).
2936                                let mut ran_eviction = false;
2937                                if self.should_evict() && (new_data || !evicted) {
2938                                    if check_idle_ended!() {
2939                                        // need to start all the way over so we catch the next
2940                                        // signal
2941                                        continue 'outer;
2942                                    }
2943                                    evicted = true;
2944                                    ran_eviction = true;
2945                                    self.storage.evict_after_snapshot(background_span.id());
2946                                }
2947
2948                                // Compact while idle (up to limit), regardless of
2949                                // whether the snapshot had new data.
2950                                // `background_span` is not entered here because
2951                                // `EnteredSpan` is `!Send` and would prevent the
2952                                // future from being sent across threads when it
2953                                // suspends at the `select!` await below.
2954                                let mut ran_compaction = false;
2955                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2956                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2957                                    if check_idle_ended!() {
2958                                        continue 'outer;
2959                                    }
2960                                    // Enter the span only around the synchronous
2961                                    // compact() call so we never hold an
2962                                    // `EnteredSpan` across an await point.
2963                                    let _compact_span = tracing::info_span!(
2964                                        parent: background_span.id(),
2965                                        "compact database"
2966                                    )
2967                                    .entered();
2968                                    match self.backing_storage.compact() {
2969                                        Ok(true) => {
2970                                            ran_compaction = true;
2971                                        }
2972                                        Ok(false) => break,
2973                                        Err(err) => {
2974                                            eprintln!("Compaction failed: {err:?}");
2975                                            if self.backing_storage.has_unrecoverable_write_error()
2976                                            {
2977                                                Self::log_unrecoverable_persist_error();
2978                                                return;
2979                                            }
2980                                            break;
2981                                        }
2982                                    }
2983                                }
2984                                if check_idle_ended!() {
2985                                    continue 'outer;
2986                                }
2987                                // After running snapshotting/eviction/compaction we have churned a
2988                                // _lot_ of memory if we are still
2989                                // idle tell `mimalloc` that now would be a good time to release
2990                                // memory back to the OS
2991                                if new_data || ran_compaction || ran_eviction {
2992                                    turbo_tasks_malloc::TurboMalloc::collect(true);
2993                                }
2994                            }
2995                        }
2996                    }
2997                }
2998            }
2999        })
3000    }
3001
3002    fn try_read_own_task_cell(
3003        &self,
3004        task_id: TaskId,
3005        cell: CellId,
3006        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3007    ) -> Result<TypedCellContent> {
3008        let mut ctx = self.execute_context(turbo_tasks);
3009        let task = ctx.task(task_id, TaskDataCategory::Data);
3010        if let Some(content) = task.get_cell_data(&cell).cloned() {
3011            Ok(CellContent(Some(content)).into_typed(cell.type_id))
3012        } else {
3013            Ok(CellContent(None).into_typed(cell.type_id))
3014        }
3015    }
3016
3017    fn read_task_collectibles(
3018        &self,
3019        task_id: TaskId,
3020        collectible_type: TraitTypeId,
3021        reader_id: Option<TaskId>,
3022        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3023    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3024        let mut ctx = self.execute_context(turbo_tasks);
3025        let mut collectibles = AutoMap::default();
3026        {
3027            let mut task = ctx.task(task_id, TaskDataCategory::All);
3028            if task
3029                .get_persistent_task_type()
3030                .is_some_and(|t| !t.native_fn.is_root)
3031            {
3032                drop(task);
3033                panic!(
3034                    "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
3035                     is missing on the task.",
3036                    self.debug_get_task_description(task_id),
3037                    reader_id.map_or_else(
3038                        || "unknown".to_string(),
3039                        |r| self.debug_get_task_description(r)
3040                    )
3041                );
3042            }
3043            for (collectible, count) in task.iter_aggregated_collectibles() {
3044                if *count > 0 && collectible.collectible_type == collectible_type {
3045                    *collectibles
3046                        .entry(RawVc::TaskCell(
3047                            collectible.cell.task,
3048                            collectible.cell.cell,
3049                        ))
3050                        .or_insert(0) += 1;
3051                }
3052            }
3053            for (&collectible, &count) in task.iter_collectibles() {
3054                if collectible.collectible_type == collectible_type {
3055                    *collectibles
3056                        .entry(RawVc::TaskCell(
3057                            collectible.cell.task,
3058                            collectible.cell.cell,
3059                        ))
3060                        .or_insert(0) += count;
3061                }
3062            }
3063            if let Some(reader_id) = reader_id {
3064                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3065            }
3066        }
3067        if let Some(reader_id) = reader_id {
3068            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3069            let target = CollectiblesRef {
3070                task: task_id,
3071                collectible_type,
3072            };
3073            if !reader.remove_outdated_collectibles_dependencies(&target) {
3074                let _ = reader.add_collectibles_dependencies(target);
3075            }
3076        }
3077        collectibles
3078    }
3079
3080    fn emit_collectible(
3081        &self,
3082        collectible_type: TraitTypeId,
3083        collectible: RawVc,
3084        task_id: TaskId,
3085        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3086    ) {
3087        self.assert_valid_collectible(task_id, collectible);
3088
3089        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3090            panic!("Collectibles need to be resolved");
3091        };
3092        let cell = CellRef {
3093            task: collectible_task,
3094            cell,
3095        };
3096        operation::UpdateCollectibleOperation::run(
3097            task_id,
3098            CollectibleRef {
3099                collectible_type,
3100                cell,
3101            },
3102            1,
3103            self.execute_context(turbo_tasks),
3104        );
3105    }
3106
3107    fn unemit_collectible(
3108        &self,
3109        collectible_type: TraitTypeId,
3110        collectible: RawVc,
3111        count: u32,
3112        task_id: TaskId,
3113        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3114    ) {
3115        self.assert_valid_collectible(task_id, collectible);
3116
3117        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3118            panic!("Collectibles need to be resolved");
3119        };
3120        let cell = CellRef {
3121            task: collectible_task,
3122            cell,
3123        };
3124        operation::UpdateCollectibleOperation::run(
3125            task_id,
3126            CollectibleRef {
3127                collectible_type,
3128                cell,
3129            },
3130            -(i32::try_from(count).unwrap()),
3131            self.execute_context(turbo_tasks),
3132        );
3133    }
3134
3135    fn update_task_cell(
3136        &self,
3137        task_id: TaskId,
3138        cell: CellId,
3139        content: CellContent,
3140        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3141        content_hash: Option<CellHash>,
3142        verification_mode: VerificationMode,
3143        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3144    ) {
3145        operation::UpdateCellOperation::run(
3146            task_id,
3147            cell,
3148            content,
3149            updated_key_hashes,
3150            content_hash,
3151            verification_mode,
3152            self.execute_context(turbo_tasks),
3153        );
3154    }
3155
3156    fn mark_own_task_as_finished(&self, task: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
3157        let mut ctx = self.execute_context(turbo_tasks);
3158        let mut task = ctx.task(task, TaskDataCategory::Data);
3159        if let Some(InProgressState::InProgress(box InProgressStateInner {
3160            marked_as_completed,
3161            ..
3162        })) = task.get_in_progress_mut()
3163        {
3164            *marked_as_completed = true;
3165            // TODO this should remove the dirty state (also check session_dependent)
3166            // but this would break some assumptions for strongly consistent reads.
3167            // Client tasks are not connected yet, so we wouldn't wait for them.
3168            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3169        }
3170    }
3171
3172    fn connect_task(
3173        &self,
3174        task: TaskId,
3175        parent_task: Option<TaskId>,
3176        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3177    ) {
3178        self.assert_not_persistent_calling_transient(parent_task, task, None);
3179        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3180    }
3181
3182    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3183        let task_id = self.transient_task_id_factory.get();
3184        {
3185            let mut task = self.storage.access_mut(task_id);
3186            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3187        }
3188        #[cfg(feature = "verify_aggregation_graph")]
3189        self.root_tasks.lock().insert(task_id);
3190        task_id
3191    }
3192
3193    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
3194        #[cfg(feature = "verify_aggregation_graph")]
3195        self.root_tasks.lock().remove(&task_id);
3196
3197        let mut ctx = self.execute_context(turbo_tasks);
3198        let mut task = ctx.task(task_id, TaskDataCategory::All);
3199        let is_dirty = task.is_dirty();
3200        let has_dirty_containers = task.has_dirty_containers();
3201        if is_dirty.is_some() || has_dirty_containers {
3202            if let Some(activeness_state) = task.get_activeness_mut() {
3203                // We will finish the task, but it would be removed after the task is done
3204                activeness_state.unset_root_type();
3205                activeness_state.set_active_until_clean();
3206            };
3207        } else if let Some(activeness_state) = task.take_activeness() {
3208            // Technically nobody should be listening to this event, but just in case
3209            // we notify it anyway
3210            activeness_state.all_clean_event.notify(usize::MAX);
3211        }
3212    }
3213
3214    #[cfg(feature = "verify_aggregation_graph")]
3215    fn verify_aggregation_graph(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>, idle: bool) {
3216        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3217            return;
3218        }
3219        use std::{collections::VecDeque, env, io::stdout};
3220
3221        use crate::backend::operation::{get_uppers, is_aggregating_node};
3222
3223        let mut ctx = self.execute_context(turbo_tasks);
3224        let root_tasks = self.root_tasks.lock().clone();
3225
3226        for task_id in root_tasks.into_iter() {
3227            let mut queue = VecDeque::new();
3228            let mut visited = FxHashSet::default();
3229            let mut aggregated_nodes = FxHashSet::default();
3230            let mut collectibles = FxHashMap::default();
3231            let root_task_id = task_id;
3232            visited.insert(task_id);
3233            aggregated_nodes.insert(task_id);
3234            queue.push_back(task_id);
3235            let mut counter = 0;
3236            while let Some(task_id) = queue.pop_front() {
3237                counter += 1;
3238                if counter % 100000 == 0 {
3239                    println!(
3240                        "queue={}, visited={}, aggregated_nodes={}",
3241                        queue.len(),
3242                        visited.len(),
3243                        aggregated_nodes.len()
3244                    );
3245                }
3246                let task = ctx.task(task_id, TaskDataCategory::All);
3247                if idle && !self.is_idle.load(Ordering::Relaxed) {
3248                    return;
3249                }
3250
3251                let uppers = get_uppers(&task);
3252                if task_id != root_task_id
3253                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3254                {
3255                    panic!(
3256                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3257                         {:?})",
3258                        task_id,
3259                        task.get_task_description(),
3260                        uppers
3261                    );
3262                }
3263
3264                for (collectible, _) in task.iter_aggregated_collectibles() {
3265                    collectibles
3266                        .entry(*collectible)
3267                        .or_insert_with(|| (false, Vec::new()))
3268                        .1
3269                        .push(task_id);
3270                }
3271
3272                for (&collectible, &value) in task.iter_collectibles() {
3273                    if value > 0 {
3274                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3275                            *flag = true
3276                        } else {
3277                            panic!(
3278                                "Task {} has a collectible {:?} that is not in any upper task",
3279                                task_id, collectible
3280                            );
3281                        }
3282                    }
3283                }
3284
3285                let is_dirty = task.has_dirty();
3286                let has_dirty_container = task.has_dirty_containers();
3287                let should_be_in_upper = is_dirty || has_dirty_container;
3288
3289                let aggregation_number = get_aggregation_number(&task);
3290                if is_aggregating_node(aggregation_number) {
3291                    aggregated_nodes.insert(task_id);
3292                }
3293                // println!(
3294                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3295                //     ctx.get_task_description(task_id),
3296                //     uppers
3297                // );
3298
3299                for child_id in task.iter_children() {
3300                    // println!("{task_id}: child -> {child_id}");
3301                    if visited.insert(child_id) {
3302                        queue.push_back(child_id);
3303                    }
3304                }
3305                drop(task);
3306
3307                if should_be_in_upper {
3308                    for upper_id in uppers {
3309                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3310                        let in_upper = upper
3311                            .get_aggregated_dirty_containers(&task_id)
3312                            .is_some_and(|&dirty| dirty > 0);
3313                        if !in_upper {
3314                            let containers: Vec<_> = upper
3315                                .iter_aggregated_dirty_containers()
3316                                .map(|(&k, &v)| (k, v))
3317                                .collect();
3318                            let upper_task_desc = upper.get_task_description();
3319                            drop(upper);
3320                            panic!(
3321                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3322                                 ({})\nThese dirty containers are present:\n{:#?}",
3323                                task_id,
3324                                ctx.task(task_id, TaskDataCategory::Data)
3325                                    .get_task_description(),
3326                                upper_id,
3327                                upper_task_desc,
3328                                containers,
3329                            );
3330                        }
3331                    }
3332                }
3333            }
3334
3335            for (collectible, (flag, task_ids)) in collectibles {
3336                if !flag {
3337                    use std::io::Write;
3338                    let mut stdout = stdout().lock();
3339                    writeln!(
3340                        stdout,
3341                        "{:?} that is not emitted in any child task but in these aggregated \
3342                         tasks: {:#?}",
3343                        collectible,
3344                        task_ids
3345                            .iter()
3346                            .map(|t| format!(
3347                                "{t} {}",
3348                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3349                            ))
3350                            .collect::<Vec<_>>()
3351                    )
3352                    .unwrap();
3353
3354                    let task_id = collectible.cell.task;
3355                    let mut queue = {
3356                        let task = ctx.task(task_id, TaskDataCategory::All);
3357                        get_uppers(&task)
3358                    };
3359                    let mut visited = FxHashSet::default();
3360                    for &upper_id in queue.iter() {
3361                        visited.insert(upper_id);
3362                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3363                    }
3364                    while let Some(task_id) = queue.pop() {
3365                        let task = ctx.task(task_id, TaskDataCategory::All);
3366                        let desc = task.get_task_description();
3367                        let aggregated_collectible = task
3368                            .get_aggregated_collectibles(&collectible)
3369                            .copied()
3370                            .unwrap_or_default();
3371                        let uppers = get_uppers(&task);
3372                        drop(task);
3373                        writeln!(
3374                            stdout,
3375                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3376                        )
3377                        .unwrap();
3378                        if task_ids.contains(&task_id) {
3379                            writeln!(
3380                                stdout,
3381                                "Task has an upper connection to an aggregated task that doesn't \
3382                                 reference it. Upper connection is invalid!"
3383                            )
3384                            .unwrap();
3385                        }
3386                        for upper_id in uppers {
3387                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3388                            if !visited.contains(&upper_id) {
3389                                queue.push(upper_id);
3390                            }
3391                        }
3392                    }
3393                    panic!("See stdout for more details");
3394                }
3395            }
3396        }
3397    }
3398
3399    fn assert_not_persistent_calling_transient(
3400        &self,
3401        parent_id: Option<TaskId>,
3402        child_id: TaskId,
3403        cell_id: Option<CellId>,
3404    ) {
3405        if let Some(parent_id) = parent_id
3406            && !parent_id.is_transient()
3407            && child_id.is_transient()
3408        {
3409            self.panic_persistent_calling_transient(
3410                self.debug_get_task_description(parent_id),
3411                self.debug_get_cached_task_type(child_id).as_deref(),
3412                cell_id,
3413            );
3414        }
3415    }
3416
3417    fn panic_persistent_calling_transient(
3418        &self,
3419        parent: String,
3420        child: Option<&CachedTaskType>,
3421        cell_id: Option<CellId>,
3422    ) -> ! {
3423        let transient_reason = if let Some(child) = child {
3424            Cow::Owned(format!(
3425                " The callee is transient because it depends on:\n{}",
3426                self.debug_trace_transient_task(child, cell_id),
3427            ))
3428        } else {
3429            Cow::Borrowed("")
3430        };
3431        panic!(
3432            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3433            parent,
3434            child.map_or("unknown", |t| t.get_name()),
3435            transient_reason,
3436        );
3437    }
3438
3439    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3440        // these checks occur in a potentially hot codepath, but they're cheap
3441        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3442            // This should never happen: The collectible APIs use ResolvedVc
3443            let task_info = if let Some(col_task_ty) = collectible
3444                .try_get_task_id()
3445                .map(|t| self.debug_get_task_description(t))
3446            {
3447                Cow::Owned(format!(" (return type of {col_task_ty})"))
3448            } else {
3449                Cow::Borrowed("")
3450            };
3451            panic!("Collectible{task_info} must be a ResolvedVc")
3452        };
3453        if col_task_id.is_transient() && !task_id.is_transient() {
3454            let transient_reason =
3455                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3456                    Cow::Owned(format!(
3457                        ". The collectible is transient because it depends on:\n{}",
3458                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3459                    ))
3460                } else {
3461                    Cow::Borrowed("")
3462                };
3463            // this should never happen: How would a persistent function get a transient Vc?
3464            panic!(
3465                "Collectible is transient, transient collectibles cannot be emitted from \
3466                 persistent tasks{transient_reason}",
3467            )
3468        }
3469    }
3470}
3471
3472impl Backend for TurboTasksBackend {
3473    fn startup(&self, turbo_tasks: &TurboTasks<Self>) {
3474        self.startup(turbo_tasks);
3475    }
3476
3477    fn stopping(&self, _turbo_tasks: &TurboTasks<Self>) {
3478        self.stopping();
3479    }
3480
3481    fn stop(&self, turbo_tasks: &TurboTasks<Self>) {
3482        self.stop(turbo_tasks);
3483    }
3484
3485    fn idle_start(&self, turbo_tasks: &TurboTasks<Self>) {
3486        self.idle_start(turbo_tasks);
3487    }
3488
3489    fn idle_end(&self, _turbo_tasks: &TurboTasks<Self>) {
3490        self.idle_end();
3491    }
3492
3493    fn get_or_create_task(
3494        &self,
3495        native_fn: &'static NativeFunction,
3496        this: Option<RawVc>,
3497        arg: &mut dyn DynTaskInputsStorage,
3498        parent_task: Option<TaskId>,
3499        persistence: TaskPersistence,
3500        turbo_tasks: &TurboTasks<Self>,
3501    ) -> TaskId {
3502        self.get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3503    }
3504
3505    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3506        self.invalidate_task(task_id, turbo_tasks);
3507    }
3508
3509    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<Self>) {
3510        self.invalidate_tasks(tasks, turbo_tasks);
3511    }
3512
3513    fn invalidate_tasks_set(
3514        &self,
3515        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3516        turbo_tasks: &TurboTasks<Self>,
3517    ) {
3518        self.invalidate_tasks_set(tasks, turbo_tasks);
3519    }
3520
3521    fn invalidate_serialization(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3522        self.invalidate_serialization(task_id, turbo_tasks);
3523    }
3524
3525    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) {
3526        self.task_execution_canceled(task, turbo_tasks)
3527    }
3528
3529    fn try_start_task_execution(
3530        &self,
3531        task_id: TaskId,
3532        priority: TaskPriority,
3533        turbo_tasks: &TurboTasks<Self>,
3534    ) -> Option<TaskExecutionSpec<'_>> {
3535        self.try_start_task_execution(task_id, priority, turbo_tasks)
3536    }
3537
3538    fn task_execution_completed(
3539        &self,
3540        task_id: TaskId,
3541        result: Result<RawVc, TurboTasksExecutionError>,
3542        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3543        #[cfg(feature = "verify_determinism")] stateful: bool,
3544        has_invalidator: bool,
3545        turbo_tasks: &TurboTasks<Self>,
3546    ) -> Option<TaskPriority> {
3547        self.task_execution_completed(
3548            task_id,
3549            result,
3550            cell_counters,
3551            #[cfg(feature = "verify_determinism")]
3552            stateful,
3553            has_invalidator,
3554            turbo_tasks,
3555        )
3556    }
3557
3558    type BackendJob = TurboTasksBackendJob;
3559
3560    fn run_backend_job<'a>(
3561        &'a self,
3562        job: Self::BackendJob,
3563        turbo_tasks: &'a TurboTasks<Self>,
3564    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3565        self.run_backend_job(job, turbo_tasks)
3566    }
3567
3568    fn try_read_task_output(
3569        &self,
3570        task_id: TaskId,
3571        reader: Option<TaskId>,
3572        options: ReadOutputOptions,
3573        turbo_tasks: &TurboTasks<Self>,
3574    ) -> Result<Result<RawVc, EventListener>> {
3575        self.try_read_task_output(task_id, reader, options, turbo_tasks)
3576    }
3577
3578    fn try_read_task_cell(
3579        &self,
3580        task_id: TaskId,
3581        cell: CellId,
3582        reader: Option<TaskId>,
3583        options: ReadCellOptions,
3584        turbo_tasks: &TurboTasks<Self>,
3585    ) -> Result<Result<TypedCellContent, EventListener>> {
3586        self.try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3587    }
3588
3589    fn try_read_own_task_cell(
3590        &self,
3591        task_id: TaskId,
3592        cell: CellId,
3593        turbo_tasks: &TurboTasks<Self>,
3594    ) -> Result<TypedCellContent> {
3595        self.try_read_own_task_cell(task_id, cell, turbo_tasks)
3596    }
3597
3598    fn read_task_collectibles(
3599        &self,
3600        task_id: TaskId,
3601        collectible_type: TraitTypeId,
3602        reader: Option<TaskId>,
3603        turbo_tasks: &TurboTasks<Self>,
3604    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3605        self.read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3606    }
3607
3608    fn emit_collectible(
3609        &self,
3610        collectible_type: TraitTypeId,
3611        collectible: RawVc,
3612        task_id: TaskId,
3613        turbo_tasks: &TurboTasks<Self>,
3614    ) {
3615        self.emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3616    }
3617
3618    fn unemit_collectible(
3619        &self,
3620        collectible_type: TraitTypeId,
3621        collectible: RawVc,
3622        count: u32,
3623        task_id: TaskId,
3624        turbo_tasks: &TurboTasks<Self>,
3625    ) {
3626        self.unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3627    }
3628
3629    fn update_task_cell(
3630        &self,
3631        task_id: TaskId,
3632        cell: CellId,
3633        content: CellContent,
3634        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3635        content_hash: Option<CellHash>,
3636        verification_mode: VerificationMode,
3637        turbo_tasks: &TurboTasks<Self>,
3638    ) {
3639        self.update_task_cell(
3640            task_id,
3641            cell,
3642            content,
3643            updated_key_hashes,
3644            content_hash,
3645            verification_mode,
3646            turbo_tasks,
3647        );
3648    }
3649
3650    fn mark_own_task_as_finished(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3651        self.mark_own_task_as_finished(task_id, turbo_tasks);
3652    }
3653
3654    fn connect_task(
3655        &self,
3656        task: TaskId,
3657        parent_task: Option<TaskId>,
3658        turbo_tasks: &TurboTasks<Self>,
3659    ) {
3660        self.connect_task(task, parent_task, turbo_tasks);
3661    }
3662
3663    fn create_transient_task(
3664        &self,
3665        task_type: TransientTaskType,
3666        _turbo_tasks: &TurboTasks<Self>,
3667    ) -> TaskId {
3668        self.create_transient_task(task_type)
3669    }
3670
3671    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3672        self.dispose_root_task(task_id, turbo_tasks);
3673    }
3674
3675    fn task_statistics(&self) -> &TaskStatisticsApi {
3676        &self.task_statistics
3677    }
3678
3679    fn is_tracking_dependencies(&self) -> bool {
3680        self.options.dependency_tracking
3681    }
3682
3683    fn get_task_name(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) -> String {
3684        self.get_task_name(task, turbo_tasks)
3685    }
3686}
3687
3688enum DebugTraceTransientTask {
3689    Cached {
3690        task_name: &'static str,
3691        cell_type_id: Option<ValueTypeId>,
3692        cause_self: Option<Box<DebugTraceTransientTask>>,
3693        cause_args: Vec<DebugTraceTransientTask>,
3694    },
3695    /// This representation is used when this task is a duplicate of one previously shown
3696    Collapsed {
3697        task_name: &'static str,
3698        cell_type_id: Option<ValueTypeId>,
3699    },
3700    Uncached {
3701        cell_type_id: Option<ValueTypeId>,
3702    },
3703}
3704
3705impl DebugTraceTransientTask {
3706    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3707        let indent = "    ".repeat(level);
3708        f.write_str(&indent)?;
3709
3710        fn fmt_cell_type_id(
3711            f: &mut fmt::Formatter<'_>,
3712            cell_type_id: Option<ValueTypeId>,
3713        ) -> fmt::Result {
3714            if let Some(ty) = cell_type_id {
3715                write!(
3716                    f,
3717                    " (read cell of type {})",
3718                    get_value_type(ty).ty.global_name
3719                )
3720            } else {
3721                Ok(())
3722            }
3723        }
3724
3725        // write the name and type
3726        match self {
3727            Self::Cached {
3728                task_name,
3729                cell_type_id,
3730                ..
3731            }
3732            | Self::Collapsed {
3733                task_name,
3734                cell_type_id,
3735                ..
3736            } => {
3737                f.write_str(task_name)?;
3738                fmt_cell_type_id(f, *cell_type_id)?;
3739                if matches!(self, Self::Collapsed { .. }) {
3740                    f.write_str(" (collapsed)")?;
3741                }
3742            }
3743            Self::Uncached { cell_type_id } => {
3744                f.write_str("unknown transient task")?;
3745                fmt_cell_type_id(f, *cell_type_id)?;
3746            }
3747        }
3748        f.write_char('\n')?;
3749
3750        // write any extra "cause" information we might have
3751        if let Self::Cached {
3752            cause_self,
3753            cause_args,
3754            ..
3755        } = self
3756        {
3757            if let Some(c) = cause_self {
3758                writeln!(f, "{indent}  self:")?;
3759                c.fmt_indented(f, level + 1)?;
3760            }
3761            if !cause_args.is_empty() {
3762                writeln!(f, "{indent}  args:")?;
3763                for c in cause_args {
3764                    c.fmt_indented(f, level + 1)?;
3765                }
3766            }
3767        }
3768        Ok(())
3769    }
3770}
3771
3772impl fmt::Display for DebugTraceTransientTask {
3773    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3774        self.fmt_indented(f, 0)
3775    }
3776}
3777
3778// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3779fn far_future() -> Instant {
3780    // Roughly 30 years from now.
3781    // API does not provide a way to obtain max `Instant`
3782    // or convert specific date in the future to instant.
3783    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3784    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3785}
3786
3787/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3788/// buffer.
3789/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3790/// resulting buffer sizes.
3791///
3792/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3793/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3794/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3795/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3796/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3797/// spurious `Result` threading throughout the encode path.
3798fn encode_task_data(
3799    task: TaskId,
3800    data: &TaskStorage,
3801    category: SpecificTaskDataCategory,
3802    scratch_buffer: &mut TurboBincodeBuffer,
3803) -> Result<TurboBincodeBuffer> {
3804    scratch_buffer.clear();
3805    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3806    data.encode(category, &mut encoder)?;
3807
3808    if cfg!(feature = "verify_serialization") {
3809        TaskStorage::new()
3810            .decode(
3811                category,
3812                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3813            )
3814            .with_context(|| {
3815                format!(
3816                    "expected to be able to decode serialized data for '{category:?}' information \
3817                     for {task}"
3818                )
3819            })?;
3820    }
3821    Ok(SmallVec::from_slice(scratch_buffer))
3822}