Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9    borrow::Cow,
10    fmt::{self, Write},
11    future::Future,
12    hash::BuildHasherDefault,
13    mem::take,
14    pin::Pin,
15    sync::{
16        Arc, LazyLock,
17        atomic::{AtomicBool, Ordering},
18    },
19    time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32    CellId, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions,
33    ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, TaskExecutionReason,
34    TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic,
35    ValueTypeId,
36    backend::{
37        Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38        TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40        VerificationMode,
41    },
42    event::{Event, EventDescription, EventListener},
43    macro_helpers::NativeFunction,
44    message_queue::{TimingEvent, TraceEvent},
45    registry::get_value_type,
46    scope::scope_and_block,
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51#[cfg(feature = "task_dirty_cause")]
52use turbo_tasks::{FunctionId, TaskDirtyCause};
53
54pub use self::{
55    operation::AnyOperation,
56    storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
57};
58use crate::{
59    backend::{
60        operation::{
61            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64            connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65            prepare_new_children,
66        },
67        snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68        storage::Storage,
69        storage_schema::{TaskStorage, TaskStorageAccessors},
70    },
71    backing_storage::{BackingStorage, SnapshotItem, SnapshotMeta, compute_task_type_hash},
72    data::{
73        ActivenessState, CellDependency, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
74        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, TransientTask,
75    },
76    error::TaskError,
77    utils::{
78        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79        shard_amount::compute_shard_amount,
80    },
81};
82
83/// Threshold for parallelizing making dependent tasks dirty.
84/// If the number of dependent tasks exceeds this threshold,
85/// the operation will be parallelized.
86const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88/// Configurable idle timeout for snapshot persistence.
89/// Defaults to 2 seconds if not set or if the value is invalid.
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92        .ok()
93        .and_then(|v| v.parse::<u64>().ok())
94        .map(Duration::from_millis)
95        .unwrap_or(Duration::from_secs(2))
96});
97
98/// Priority used to re-schedule a task that became stale during execution.
99///
100/// Stale tasks must run again, but at a priority that reflects why they're being re-run rather
101/// than the (likely higher) priority of the original schedule. We use invalidation priority
102/// based on the task's leaf distance, parented under either the task's current dirty priority
103/// or `leaf()` if it is no longer dirty.
104fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105    TaskPriority::invalidation(
106        task.get_leaf_distance()
107            .copied()
108            .unwrap_or_default()
109            .distance,
110    )
111    .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115    /// Queries the storage for cache entries that don't exist locally.
116    ReadOnly,
117    /// Queries the storage for cache entries that don't exist locally.
118    /// Regularly pushes changes to the backing storage.
119    ReadWrite,
120    /// Queries the storage for cache entries that don't exist locally.
121    /// On shutdown, pushes all changes to the backing storage.
122    ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126    /// Enables dependency tracking.
127    ///
128    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
129    /// forever.
130    pub dependency_tracking: bool,
131
132    /// Enables active tracking.
133    ///
134    /// Automatically disabled when `dependency_tracking` is disabled.
135    ///
136    /// When disabled: All tasks are considered as active.
137    pub active_tracking: bool,
138
139    /// Enables the backing storage.
140    pub storage_mode: Option<StorageMode>,
141
142    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
143    /// datastructures. If `None`, it will use the available parallelism.
144    pub num_workers: Option<usize>,
145
146    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
147    pub small_preallocation: bool,
148
149    /// When enabled, evict all evictable tasks from in-memory storage after every snapshot.
150    /// This reclaims memory by clearing persisted data that can be re-loaded from disk on demand.
151    /// This is an EXPERIMENTAL FEATURE under development
152    pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156    fn default() -> Self {
157        Self {
158            dependency_tracking: true,
159            active_tracking: true,
160            storage_mode: Some(StorageMode::ReadWrite),
161            num_workers: None,
162            small_preallocation: false,
163            evict_after_snapshot: false,
164        }
165    }
166}
167
168pub enum TurboTasksBackendJob {
169    Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175    options: BackendOptions,
176
177    start_time: Instant,
178
179    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182    storage: Storage,
183
184    /// Coordinates the operation/snapshot interleaving protocol. See
185    /// [`SnapshotCoordinator`] for details.
186    snapshot_coord: SnapshotCoordinator,
187    /// Serializes calls to `snapshot_and_persist`. The coordinator's
188    /// `begin_snapshot` asserts that snapshots don't overlap; this mutex
189    /// enforces that contract for our two callers (background loop and
190    /// `stop_and_wait`).
191    snapshot_in_progress: Mutex<()>,
192
193    stopping: AtomicBool,
194    stopping_event: Event,
195    idle_start_event: Event,
196    idle_end_event: Event,
197    #[cfg(feature = "verify_aggregation_graph")]
198    is_idle: AtomicBool,
199
200    task_statistics: TaskStatisticsApi,
201
202    backing_storage: B,
203
204    #[cfg(feature = "verify_aggregation_graph")]
205    root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210        Self(Arc::new(TurboTasksBackendInner::new(
211            options,
212            backing_storage,
213        )))
214    }
215
216    pub fn backing_storage(&self) -> &B {
217        &self.0.backing_storage
218    }
219
220    /// Perform a snapshot and then evict all evictable tasks from memory.
221    ///
222    /// This is exposed for integration tests that need to verify the
223    /// snapshot → evict → restore cycle works correctly.
224    ///
225    /// Returns `(snapshot_had_new_data, eviction_counts)`.
226    pub fn snapshot_and_evict_for_testing(
227        &self,
228        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
229    ) -> (bool, EvictionCounts) {
230        self.0.snapshot_and_evict_for_testing(turbo_tasks)
231    }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237        if !options.dependency_tracking {
238            options.active_tracking = false;
239        }
240        let small_preallocation = options.small_preallocation;
241        let next_task_id = backing_storage
242            .next_free_task_id()
243            .expect("Failed to get task id");
244        Self {
245            options,
246            start_time: Instant::now(),
247            persisted_task_id_factory: IdFactoryWithReuse::new(
248                next_task_id,
249                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250            ),
251            transient_task_id_factory: IdFactoryWithReuse::new(
252                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253                TaskId::MAX,
254            ),
255            storage: Storage::new(shard_amount, small_preallocation),
256            snapshot_coord: SnapshotCoordinator::new(),
257            snapshot_in_progress: Mutex::new(()),
258            stopping: AtomicBool::new(false),
259            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262            #[cfg(feature = "verify_aggregation_graph")]
263            is_idle: AtomicBool::new(false),
264            task_statistics: TaskStatisticsApi::default(),
265            backing_storage,
266            #[cfg(feature = "verify_aggregation_graph")]
267            root_tasks: Default::default(),
268        }
269    }
270
271    fn execute_context<'a>(
272        &'a self,
273        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
274    ) -> impl ExecuteContext<'a> {
275        ExecuteContextImpl::new(self, turbo_tasks)
276    }
277
278    fn suspending_requested(&self) -> bool {
279        self.should_persist() && self.snapshot_coord.snapshot_pending()
280    }
281
282    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283        if self.should_persist() {
284            self.snapshot_coord.suspend_point(suspend);
285        }
286    }
287
288    pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289        if !self.should_persist() {
290            return OperationGuard::noop();
291        }
292        self.snapshot_coord.begin_operation()
293    }
294
295    fn should_persist(&self) -> bool {
296        matches!(
297            self.options.storage_mode,
298            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299        )
300    }
301
302    fn should_evict(&self) -> bool {
303        self.options.evict_after_snapshot && self.should_persist()
304    }
305
306    /// Perform a snapshot and then evict all evictable tasks from memory.
307    ///
308    /// This is exposed for integration tests that need to verify the
309    /// snapshot → evict → restore cycle works correctly.
310    ///
311    /// Returns `(snapshot_had_new_data, eviction_counts)`.
312    #[doc(hidden)]
313    pub fn snapshot_and_evict_for_testing(
314        &self,
315        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
316    ) -> (bool, EvictionCounts) {
317        assert!(
318            self.should_persist(),
319            "snapshot_and_evict requires persistence"
320        );
321        let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322        let had_new_data = match snapshot_result {
323            Ok((_, new_data)) => new_data,
324            Err(_) => {
325                // Snapshot/persist failed — skip eviction since the data may not
326                // be on disk yet. Evicting now could lose in-memory state that
327                // can't be restored.
328                return (false, EvictionCounts::default());
329            }
330        };
331        let counts = self.storage.evict_after_snapshot(None);
332        (had_new_data, counts)
333    }
334
335    fn should_restore(&self) -> bool {
336        self.options.storage_mode.is_some()
337    }
338
339    fn should_track_dependencies(&self) -> bool {
340        self.options.dependency_tracking
341    }
342
343    fn should_track_activeness(&self) -> bool {
344        self.options.active_tracking
345    }
346
347    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348        self.task_statistics
349            .map(|stats| stats.increment_cache_hit(native_fn));
350    }
351
352    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353        self.task_statistics
354            .map(|stats| stats.increment_cache_miss(native_fn));
355    }
356
357    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
358    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
359    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
360    /// references for lazy name resolution.
361    fn task_error_to_turbo_tasks_execution_error(
362        &self,
363        error: &TaskError,
364        ctx: &mut impl ExecuteContext<'_>,
365    ) -> TurboTasksExecutionError {
366        match error {
367            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369                message: item.message.clone(),
370                source: item
371                    .source
372                    .as_ref()
373                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374            })),
375            TaskError::LocalTaskContext(local_task_context) => {
376                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377                    name: local_task_context.name.clone(),
378                    source: local_task_context
379                        .source
380                        .as_ref()
381                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382                }))
383            }
384            TaskError::TaskChain(chain) => {
385                let task_id = chain.last().unwrap();
386                let error = {
387                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
388                    if let Some(OutputValue::Error(error)) = task.get_output() {
389                        Some(error.clone())
390                    } else {
391                        None
392                    }
393                };
394                let error = error.map_or_else(
395                    || {
396                        // Eventual consistency will cause errors to no longer be available
397                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399                                "Error no longer available",
400                            )),
401                            location: None,
402                        }))
403                    },
404                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405                );
406                let mut current_error = error;
407                for &task_id in chain.iter().rev() {
408                    current_error =
409                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410                            task_id,
411                            source: Some(current_error),
412                            turbo_tasks: ctx.turbo_tasks(),
413                        }));
414                }
415                current_error
416            }
417        }
418    }
419}
420
421/// Intermediate result of step 1 of task execution completion.
422struct TaskExecutionCompletePrepareResult {
423    pub new_children: FxHashSet<TaskId>,
424    pub is_now_immutable: bool,
425    #[cfg(feature = "verify_determinism")]
426    pub no_output_set: bool,
427    #[cfg(feature = "task_dirty_cause")]
428    pub function_id: Option<FunctionId>,
429    pub new_output: Option<OutputValue>,
430    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
431    pub is_recomputation: bool,
432    pub is_session_dependent: bool,
433}
434
435// Operations
436impl<B: BackingStorage> TurboTasksBackendInner<B> {
437    fn try_read_task_output(
438        self: &Arc<Self>,
439        task_id: TaskId,
440        reader: Option<TaskId>,
441        options: ReadOutputOptions,
442        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
443    ) -> Result<Result<RawVc, EventListener>> {
444        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
445
446        let mut ctx = self.execute_context(turbo_tasks);
447        let need_reader_task = if self.should_track_dependencies()
448            && !matches!(options.tracking, ReadTracking::Untracked)
449            && reader.is_some_and(|reader_id| reader_id != task_id)
450            && let Some(reader_id) = reader
451            && reader_id != task_id
452        {
453            Some(reader_id)
454        } else {
455            None
456        };
457        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
458            // Having a task_pair here is not optimal, but otherwise this would lead to a race
459            // condition. See below.
460            // TODO(sokra): solve that in a more performant way.
461            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
462            (task, Some(reader))
463        } else {
464            (ctx.task(task_id, TaskDataCategory::All), None)
465        };
466
467        fn listen_to_done_event(
468            reader_description: Option<EventDescription>,
469            tracking: ReadTracking,
470            done_event: &Event,
471        ) -> EventListener {
472            done_event.listen_with_note(move || {
473                move || {
474                    if let Some(reader_description) = reader_description.as_ref() {
475                        format!(
476                            "try_read_task_output from {} ({})",
477                            reader_description, tracking
478                        )
479                    } else {
480                        format!("try_read_task_output ({})", tracking)
481                    }
482                }
483            })
484        }
485
486        fn check_in_progress(
487            task: &impl TaskGuard,
488            reader_description: Option<EventDescription>,
489            tracking: ReadTracking,
490        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
491        {
492            match task.get_in_progress() {
493                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
494                    listen_to_done_event(reader_description, tracking, done_event),
495                ))),
496                Some(InProgressState::InProgress(box InProgressStateInner {
497                    done_event, ..
498                })) => Some(Ok(Err(listen_to_done_event(
499                    reader_description,
500                    tracking,
501                    done_event,
502                )))),
503                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
504                    "{} was canceled",
505                    task.get_task_description()
506                ))),
507                None => None,
508            }
509        }
510
511        if matches!(options.consistency, ReadConsistency::Strong) {
512            if task
513                .get_persistent_task_type()
514                .is_some_and(|t| !t.native_fn.is_root)
515            {
516                drop(task);
517                drop(reader_task);
518                panic!(
519                    "Strongly consistent read of non-root task {} (reader: {}). The `root` \
520                     attribute is missing on the task.",
521                    self.debug_get_task_description(task_id),
522                    reader.map_or_else(
523                        || "unknown".to_string(),
524                        |r| self.debug_get_task_description(r)
525                    )
526                );
527            }
528
529            let is_dirty = task.is_dirty();
530
531            // Check the dirty count of the root node
532            let has_dirty_containers = task.has_dirty_containers();
533            if has_dirty_containers || is_dirty.is_some() {
534                let activeness = task.get_activeness_mut();
535                let mut task_ids_to_schedule: Vec<_> = Vec::new();
536                // When there are dirty task, subscribe to the all_clean_event
537                let activeness = if let Some(activeness) = activeness {
538                    // This makes sure all tasks stay active and this task won't stale.
539                    // active_until_clean is automatically removed when this
540                    // task is clean.
541                    activeness.set_active_until_clean();
542                    activeness
543                } else {
544                    // If we don't have a root state, add one. This also makes sure all tasks stay
545                    // active and this task won't stale. active_until_clean
546                    // is automatically removed when this task is clean.
547                    if ctx.should_track_activeness() {
548                        // A newly added Activeness need to make sure to schedule the tasks
549                        task_ids_to_schedule = task.dirty_containers().collect();
550                        task_ids_to_schedule.push(task_id);
551                    }
552                    let activeness =
553                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
554                    activeness.set_active_until_clean();
555                    activeness
556                };
557                let listener = activeness.all_clean_event.listen_with_note(move || {
558                    let this = self.clone();
559                    let tt = turbo_tasks.pin();
560                    move || {
561                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
562                        let mut ctx = this.execute_context(tt);
563                        let mut visited = FxHashSet::default();
564                        fn indent(s: &str) -> String {
565                            s.split_inclusive('\n')
566                                .flat_map(|line: &str| ["  ", line].into_iter())
567                                .collect::<String>()
568                        }
569                        fn get_info(
570                            ctx: &mut impl ExecuteContext<'_>,
571                            task_id: TaskId,
572                            parent_and_count: Option<(TaskId, i32)>,
573                            visited: &mut FxHashSet<TaskId>,
574                        ) -> String {
575                            let task = ctx.task(task_id, TaskDataCategory::All);
576                            let is_dirty = task.is_dirty();
577                            let in_progress =
578                                task.get_in_progress()
579                                    .map_or("not in progress", |p| match p {
580                                        InProgressState::InProgress(_) => "in progress",
581                                        InProgressState::Scheduled { .. } => "scheduled",
582                                        InProgressState::Canceled => "canceled",
583                                    });
584                            let activeness = task.get_activeness().map_or_else(
585                                || "not active".to_string(),
586                                |activeness| format!("{activeness:?}"),
587                            );
588                            let aggregation_number = get_aggregation_number(&task);
589                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
590                            {
591                                let uppers = get_uppers(&task);
592                                !uppers.contains(&parent_task_id)
593                            } else {
594                                false
595                            };
596
597                            // Check the dirty count of the root node
598                            let has_dirty_containers = task.has_dirty_containers();
599
600                            let task_description = task.get_task_description();
601                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
602                                format!(", dirty({parent_priority})")
603                            } else {
604                                String::new()
605                            };
606                            let has_dirty_containers_label = if has_dirty_containers {
607                                ", dirty containers"
608                            } else {
609                                ""
610                            };
611                            let count = if let Some((_, count)) = parent_and_count {
612                                format!(" {count}")
613                            } else {
614                                String::new()
615                            };
616                            let mut info = format!(
617                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
618                                 {in_progress}, \
619                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
620                            );
621                            let children: Vec<_> = task.dirty_containers_with_count().collect();
622                            drop(task);
623
624                            if missing_upper {
625                                info.push_str("\n  ERROR: missing upper connection");
626                            }
627
628                            if has_dirty_containers || !children.is_empty() {
629                                writeln!(info, "\n  dirty tasks:").unwrap();
630
631                                for (child_task_id, count) in children {
632                                    let task_description = ctx
633                                        .task(child_task_id, TaskDataCategory::Data)
634                                        .get_task_description();
635                                    if visited.insert(child_task_id) {
636                                        let child_info = get_info(
637                                            ctx,
638                                            child_task_id,
639                                            Some((task_id, count)),
640                                            visited,
641                                        );
642                                        info.push_str(&indent(&child_info));
643                                        if !info.ends_with('\n') {
644                                            info.push('\n');
645                                        }
646                                    } else {
647                                        writeln!(
648                                            info,
649                                            "  {child_task_id} {task_description} {count} \
650                                             (already visited)"
651                                        )
652                                        .unwrap();
653                                    }
654                                }
655                            }
656                            info
657                        }
658                        let info = get_info(&mut ctx, task_id, None, &mut visited);
659                        format!(
660                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
661                        )
662                    }
663                });
664                drop(reader_task);
665                drop(task);
666                if !task_ids_to_schedule.is_empty() {
667                    let mut queue = AggregationUpdateQueue::new();
668                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
669                    queue.execute(&mut ctx);
670                }
671
672                return Ok(Err(listener));
673            }
674        }
675
676        let reader_description = reader_task
677            .as_ref()
678            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
679        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
680        {
681            return value;
682        }
683
684        if let Some(output) = task.get_output() {
685            let result = match output {
686                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
687                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
688                OutputValue::Error(error) => Err(error.clone()),
689            };
690            if let Some(mut reader_task) = reader_task.take()
691                && options.tracking.should_track(result.is_err())
692                && (!task.immutable() || cfg!(feature = "verify_immutable"))
693            {
694                #[cfg(feature = "trace_task_output_dependencies")]
695                let _span = tracing::trace_span!(
696                    "add output dependency",
697                    task = %task_id,
698                    dependent_task = ?reader
699                )
700                .entered();
701                let mut queue = LeafDistanceUpdateQueue::new();
702                let reader = reader.unwrap();
703                if task.add_output_dependent(reader) {
704                    // Ensure that dependent leaf distance is strictly monotonic increasing
705                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
706                    let reader_leaf_distance =
707                        reader_task.get_leaf_distance().copied().unwrap_or_default();
708                    if reader_leaf_distance.distance <= leaf_distance.distance {
709                        queue.push(
710                            reader,
711                            leaf_distance.distance,
712                            leaf_distance.max_distance_in_buffer,
713                        );
714                    }
715                }
716
717                drop(task);
718
719                // Note: We use `task_pair` earlier to lock the task and its reader at the same
720                // time. If we didn't and just locked the reader here, an invalidation could occur
721                // between grabbing the locks. If that happened, and if the task is "outdated" or
722                // doesn't have the dependency edge yet, the invalidation would be lost.
723
724                if !reader_task.remove_outdated_output_dependencies(&task_id) {
725                    let _ = reader_task.add_output_dependencies(task_id);
726                }
727                drop(reader_task);
728
729                queue.execute(&mut ctx);
730            } else {
731                drop(task);
732            }
733
734            return result.map_err(|error| {
735                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
736                    .with_task_context(task_id, turbo_tasks.pin())
737                    .into()
738            });
739        }
740        drop(reader_task);
741
742        let note = EventDescription::new(|| {
743            move || {
744                if let Some(reader) = reader_description.as_ref() {
745                    format!("try_read_task_output (recompute) from {reader}",)
746                } else {
747                    "try_read_task_output (recompute, untracked)".to_string()
748                }
749            }
750        });
751
752        // Output doesn't exist. We need to schedule the task to compute it.
753        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
754            TaskExecutionReason::OutputNotAvailable,
755            EventDescription::new(|| task.get_task_desc_fn()),
756            note,
757        );
758
759        // It's not possible that the task is InProgress at this point. If it is InProgress {
760        // done: true } it must have Output and would early return.
761        let old = task.set_in_progress(in_progress_state);
762        debug_assert!(old.is_none(), "InProgress already exists");
763        ctx.schedule_task(task, TaskPriority::Recomputation);
764
765        Ok(Err(listener))
766    }
767
768    fn try_read_task_cell(
769        &self,
770        task_id: TaskId,
771        reader: Option<TaskId>,
772        cell: CellId,
773        options: ReadCellOptions,
774        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
775    ) -> Result<Result<TypedCellContent, EventListener>> {
776        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
777
778        fn add_cell_dependency(
779            task_id: TaskId,
780            mut task: impl TaskGuard,
781            reader: Option<TaskId>,
782            reader_task: Option<impl TaskGuard>,
783            cell: CellId,
784            key: Option<u64>,
785        ) {
786            if let Some(mut reader_task) = reader_task
787                && (!task.immutable() || cfg!(feature = "verify_immutable"))
788            {
789                let reader = reader.unwrap();
790                let _ = task
791                    .add_cell_dependents(CellDependency::new(CellRef { task: reader, cell }, key));
792                drop(task);
793
794                // Note: We use `task_pair` earlier to lock the task and its reader at the same
795                // time. If we didn't and just locked the reader here, an invalidation could occur
796                // between grabbing the locks. If that happened, and if the task is "outdated" or
797                // doesn't have the dependency edge yet, the invalidation would be lost.
798
799                let target = CellRef {
800                    task: task_id,
801                    cell,
802                };
803                let dep = CellDependency::new(target, key);
804                if !reader_task.remove_outdated_cell_dependencies(&dep) {
805                    let _ = reader_task.add_cell_dependencies(dep);
806                }
807                drop(reader_task);
808            }
809        }
810
811        let ReadCellOptions {
812            tracking,
813            final_read_hint,
814        } = options;
815
816        let mut ctx = self.execute_context(turbo_tasks);
817        let (mut task, reader_task) = if self.should_track_dependencies()
818            && !matches!(tracking, ReadCellTracking::Untracked)
819            && let Some(reader_id) = reader
820            && reader_id != task_id
821        {
822            // Having a task_pair here is not optimal, but otherwise this would lead to a race
823            // condition. See below.
824            // TODO(sokra): solve that in a more performant way.
825            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
826            (task, Some(reader))
827        } else {
828            (ctx.task(task_id, TaskDataCategory::All), None)
829        };
830
831        let content = if final_read_hint {
832            task.remove_cell_data(&cell)
833        } else {
834            task.get_cell_data(&cell).cloned()
835        };
836        if let Some(content) = content {
837            if tracking.should_track(false) {
838                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
839            }
840            return Ok(Ok(TypedCellContent(
841                cell.type_id,
842                CellContent(Some(content)),
843            )));
844        }
845
846        let in_progress = task.get_in_progress();
847        if matches!(
848            in_progress,
849            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
850        ) {
851            return Ok(Err(self
852                .listen_to_cell(&mut task, task_id, &reader_task, cell)
853                .0));
854        }
855        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
856
857        // Check cell index range (cell might not exist at all)
858        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
859        let Some(max_id) = max_id else {
860            let task_desc = task.get_task_description();
861            if tracking.should_track(true) {
862                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
863            }
864            bail!(
865                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
866            );
867        };
868        if cell.index >= max_id {
869            let task_desc = task.get_task_description();
870            if tracking.should_track(true) {
871                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
872            }
873            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
874        }
875
876        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
877        // task to get the cell content.
878
879        // Bail early if the task was cancelled — no point in registering a listener
880        // on a task that won't execute again.
881        if is_cancelled {
882            bail!("{} was canceled", task.get_task_description());
883        }
884
885        // Listen to the cell and potentially schedule the task
886        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
887        drop(reader_task);
888        if !new_listener {
889            return Ok(Err(listener));
890        }
891
892        let _span = tracing::trace_span!(
893            "recomputation",
894            cell_type = get_value_type(cell.type_id).ty.global_name,
895            cell_index = cell.index
896        )
897        .entered();
898
899        let _ = task.add_scheduled(
900            TaskExecutionReason::CellNotAvailable,
901            EventDescription::new(|| task.get_task_desc_fn()),
902        );
903        ctx.schedule_task(task, TaskPriority::Recomputation);
904
905        Ok(Err(listener))
906    }
907
908    fn listen_to_cell(
909        &self,
910        task: &mut impl TaskGuard,
911        task_id: TaskId,
912        reader_task: &Option<impl TaskGuard>,
913        cell: CellId,
914    ) -> (EventListener, bool) {
915        let note = || {
916            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
917            move || {
918                if let Some(reader_desc) = reader_desc.as_ref() {
919                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
920                } else {
921                    "try_read_task_cell (in progress, untracked)".to_string()
922                }
923            }
924        };
925        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
926            // Someone else is already computing the cell
927            let listener = in_progress.event.listen_with_note(note);
928            return (listener, false);
929        }
930        let in_progress = InProgressCellState::new(task_id, cell);
931        let listener = in_progress.event.listen_with_note(note);
932        let old = task.insert_in_progress_cells(cell, in_progress);
933        debug_assert!(old.is_none(), "InProgressCell already exists");
934        (listener, true)
935    }
936
937    fn snapshot_and_persist(
938        &self,
939        parent_span: Option<tracing::Id>,
940        reason: &str,
941        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
942    ) -> Result<(Instant, bool), anyhow::Error> {
943        let snapshot_span =
944            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
945                .entered();
946        // Serialize snapshots. The internal protocol (snapshot_mode, snapshot
947        // request bit, suspended_operations) assumes only one snapshot runs at
948        // a time. Held for the entire snapshot lifecycle.
949        let _snapshot_in_progress = self.snapshot_in_progress.lock();
950        let start = Instant::now();
951        // SystemTime for wall-clock timestamps in trace events (milliseconds
952        // since epoch). Instant is monotonic but has no defined epoch, so it
953        // can't be used for cross-process trace correlation.
954        let wall_start = SystemTime::now();
955        debug_assert!(self.should_persist());
956
957        let mut snapshot_phase = {
958            let _span = tracing::info_span!("blocking").entered();
959            self.snapshot_coord.begin_snapshot()
960        };
961        // Enter snapshot mode, which atomically reads and resets the modified count.
962        // Checking after start_snapshot ensures no concurrent increments can race.
963        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
964
965        let suspended_operations = snapshot_phase.take_suspended_operations();
966
967        let snapshot_time = Instant::now();
968        drop(snapshot_phase);
969
970        if !has_modifications {
971            // No tasks modified since the last snapshot — drop the guard (which
972            // calls end_snapshot) and skip the expensive O(N) scan.
973            drop(snapshot_guard);
974            return Ok((start, false));
975        }
976
977        #[cfg(feature = "print_cache_item_size")]
978        #[derive(Default)]
979        struct TaskCacheStats {
980            data: usize,
981            #[cfg(feature = "print_cache_item_size_with_compressed")]
982            data_compressed: usize,
983            data_count: usize,
984            meta: usize,
985            #[cfg(feature = "print_cache_item_size_with_compressed")]
986            meta_compressed: usize,
987            meta_count: usize,
988            upper_count: usize,
989            collectibles_count: usize,
990            aggregated_collectibles_count: usize,
991            children_count: usize,
992            followers_count: usize,
993            collectibles_dependents_count: usize,
994            aggregated_dirty_containers_count: usize,
995            output_size: usize,
996        }
997        /// Formats a byte size, optionally including the compressed size when the
998        /// `print_cache_item_size_with_compressed` feature is enabled.
999        #[cfg(feature = "print_cache_item_size")]
1000        struct FormatSizes {
1001            size: usize,
1002            #[cfg(feature = "print_cache_item_size_with_compressed")]
1003            compressed_size: usize,
1004        }
1005        #[cfg(feature = "print_cache_item_size")]
1006        impl std::fmt::Display for FormatSizes {
1007            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1008                use turbo_tasks::util::FormatBytes;
1009                #[cfg(feature = "print_cache_item_size_with_compressed")]
1010                {
1011                    write!(
1012                        f,
1013                        "{} ({} compressed)",
1014                        FormatBytes(self.size),
1015                        FormatBytes(self.compressed_size)
1016                    )
1017                }
1018                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1019                {
1020                    write!(f, "{}", FormatBytes(self.size))
1021                }
1022            }
1023        }
1024        #[cfg(feature = "print_cache_item_size")]
1025        impl TaskCacheStats {
1026            #[cfg(feature = "print_cache_item_size_with_compressed")]
1027            fn compressed_size(data: &[u8]) -> Result<usize> {
1028                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1029                    data,
1030                    &mut Vec::new(),
1031                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1032                )?)
1033            }
1034
1035            fn add_data(&mut self, data: &[u8]) {
1036                self.data += data.len();
1037                #[cfg(feature = "print_cache_item_size_with_compressed")]
1038                {
1039                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1040                }
1041                self.data_count += 1;
1042            }
1043
1044            fn add_meta(&mut self, data: &[u8]) {
1045                self.meta += data.len();
1046                #[cfg(feature = "print_cache_item_size_with_compressed")]
1047                {
1048                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1049                }
1050                self.meta_count += 1;
1051            }
1052
1053            fn add_counts(&mut self, storage: &TaskStorage) {
1054                let counts = storage.meta_counts();
1055                self.upper_count += counts.upper;
1056                self.collectibles_count += counts.collectibles;
1057                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1058                self.children_count += counts.children;
1059                self.followers_count += counts.followers;
1060                self.collectibles_dependents_count += counts.collectibles_dependents;
1061                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1062                if let Some(output) = storage.get_output() {
1063                    use turbo_bincode::turbo_bincode_encode;
1064
1065                    self.output_size += turbo_bincode_encode(&output)
1066                        .map(|data| data.len())
1067                        .unwrap_or(0);
1068                }
1069            }
1070
1071            /// Returns the task name used as the stats grouping key.
1072            fn task_name(storage: &TaskStorage) -> String {
1073                storage
1074                    .get_persistent_task_type()
1075                    .map(|t| t.to_string())
1076                    .unwrap_or_else(|| "<unknown>".to_string())
1077            }
1078
1079            /// Returns the primary sort key: compressed total when
1080            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1081            fn sort_key(&self) -> usize {
1082                #[cfg(feature = "print_cache_item_size_with_compressed")]
1083                {
1084                    self.data_compressed + self.meta_compressed
1085                }
1086                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1087                {
1088                    self.data + self.meta
1089                }
1090            }
1091
1092            fn format_total(&self) -> FormatSizes {
1093                FormatSizes {
1094                    size: self.data + self.meta,
1095                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1096                    compressed_size: self.data_compressed + self.meta_compressed,
1097                }
1098            }
1099
1100            fn format_data(&self) -> FormatSizes {
1101                FormatSizes {
1102                    size: self.data,
1103                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1104                    compressed_size: self.data_compressed,
1105                }
1106            }
1107
1108            fn format_avg_data(&self) -> FormatSizes {
1109                FormatSizes {
1110                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1111                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1112                    compressed_size: self
1113                        .data_compressed
1114                        .checked_div(self.data_count)
1115                        .unwrap_or(0),
1116                }
1117            }
1118
1119            fn format_meta(&self) -> FormatSizes {
1120                FormatSizes {
1121                    size: self.meta,
1122                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1123                    compressed_size: self.meta_compressed,
1124                }
1125            }
1126
1127            fn format_avg_meta(&self) -> FormatSizes {
1128                FormatSizes {
1129                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1130                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1131                    compressed_size: self
1132                        .meta_compressed
1133                        .checked_div(self.meta_count)
1134                        .unwrap_or(0),
1135                }
1136            }
1137        }
1138        #[cfg(feature = "print_cache_item_size")]
1139        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1140            Mutex::new(FxHashMap::default());
1141
1142        // Encode each task's modified categories. We only encode categories with `modified` set,
1143        // meaning the category was actually dirtied. Categories restored from disk but never
1144        // modified don't need re-persisting since the on-disk version is still valid.
1145        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1146        // flags were copied from the live task at snapshot creation time, reflecting which
1147        // categories were dirtied before the snapshot was taken.
1148        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1149            let encode_category = |task_id: TaskId,
1150                                   data: &TaskStorage,
1151                                   category: SpecificTaskDataCategory,
1152                                   buffer: &mut TurboBincodeBuffer|
1153             -> Option<TurboBincodeBuffer> {
1154                match encode_task_data(task_id, data, category, buffer) {
1155                    Ok(encoded) => {
1156                        #[cfg(feature = "print_cache_item_size")]
1157                        {
1158                            let mut stats = task_cache_stats.lock();
1159                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1160                            match category {
1161                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1162                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1163                            }
1164                        }
1165                        Some(encoded)
1166                    }
1167                    Err(err) => {
1168                        panic!(
1169                            "Serializing task {} failed ({:?}): {:?}",
1170                            self.debug_get_task_description(task_id),
1171                            category,
1172                            err
1173                        );
1174                    }
1175                }
1176            };
1177            if task_id.is_transient() {
1178                unreachable!("transient task_ids should never be enqueued to be persisted");
1179            }
1180
1181            let encode_meta = inner.flags.meta_modified();
1182            let encode_data = inner.flags.data_modified();
1183
1184            #[cfg(feature = "print_cache_item_size")]
1185            if encode_data || encode_meta {
1186                task_cache_stats
1187                    .lock()
1188                    .entry(TaskCacheStats::task_name(inner))
1189                    .or_default()
1190                    .add_counts(inner);
1191            }
1192
1193            let meta = if encode_meta {
1194                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1195            } else {
1196                None
1197            };
1198
1199            let data = if encode_data {
1200                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1201            } else {
1202                None
1203            };
1204            let task_type_hash = if inner.flags.new_task() {
1205                let task_type = inner.get_persistent_task_type().expect(
1206                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1207                     creation for persistent tasks uses a single ExecutionContextImpl for \
1208                     creating the task (which sets new_task) and connect_child (which sets \
1209                     persistent_task_type) and take_snapshot waits for all operations to complete \
1210                     or suspend before we start snapshotting.  So task creation will always set \
1211                     the task_type.",
1212                );
1213                Some(compute_task_type_hash(task_type))
1214            } else {
1215                None
1216            };
1217
1218            SnapshotItem {
1219                task_id,
1220                meta,
1221                data,
1222                task_type_hash,
1223            }
1224        };
1225
1226        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1227
1228        drop(snapshot_span);
1229        let snapshot_duration = start.elapsed();
1230        let task_count = task_snapshots.len();
1231
1232        if task_snapshots.is_empty() {
1233            // This should be impossible — if we got here, modified_count was nonzero, and every
1234            // modification that increments the count also failed during encoding.
1235            std::hint::cold_path();
1236            return Ok((snapshot_time, false));
1237        }
1238
1239        let persist_start = Instant::now();
1240        let span = tracing::info_span!(
1241            parent: parent_span,
1242            "persist",
1243            reason = reason,
1244            data_items= tracing::field::Empty,
1245            meta_items= tracing::field::Empty,
1246            task_cache_items= tracing::field::Empty,
1247            next_task_id= tracing::field::Empty,)
1248        .entered();
1249        {
1250            // Tasks were already consumed by take_snapshot, so a future snapshot
1251            // would not re-persist them — returning an error signals to the caller
1252            // that further persist attempts would corrupt the task graph in storage.
1253            let SnapshotMeta {
1254                task_cache_items,
1255                data_items,
1256                meta_items,
1257                max_next_task_id,
1258            } = self
1259                .backing_storage
1260                .save_snapshot(suspended_operations, task_snapshots)?;
1261            span.record("data_items", data_items);
1262            span.record("meta_items", meta_items);
1263            span.record("task_cache_items", task_cache_items);
1264            span.record("next_task_id", max_next_task_id);
1265
1266            #[cfg(feature = "print_cache_item_size")]
1267            {
1268                let mut task_cache_stats = task_cache_stats
1269                    .into_inner()
1270                    .into_iter()
1271                    .collect::<Vec<_>>();
1272                if !task_cache_stats.is_empty() {
1273                    use turbo_tasks::util::FormatBytes;
1274
1275                    use crate::utils::markdown_table::print_markdown_table;
1276
1277                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1278                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1279                    });
1280
1281                    println!(
1282                        "Task cache stats: {}",
1283                        FormatSizes {
1284                            size: task_cache_stats
1285                                .iter()
1286                                .map(|(_, s)| s.data + s.meta)
1287                                .sum::<usize>(),
1288                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1289                            compressed_size: task_cache_stats
1290                                .iter()
1291                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1292                                .sum::<usize>()
1293                        },
1294                    );
1295
1296                    print_markdown_table(
1297                        [
1298                            "Task",
1299                            " Total Size",
1300                            " Data Size",
1301                            " Data Count x Avg",
1302                            " Data Count x Avg",
1303                            " Meta Size",
1304                            " Meta Count x Avg",
1305                            " Meta Count x Avg",
1306                            " Uppers",
1307                            " Coll",
1308                            " Agg Coll",
1309                            " Children",
1310                            " Followers",
1311                            " Coll Deps",
1312                            " Agg Dirty",
1313                            " Output Size",
1314                        ],
1315                        task_cache_stats.iter(),
1316                        |(task_desc, stats)| {
1317                            [
1318                                task_desc.to_string(),
1319                                format!(" {}", stats.format_total()),
1320                                format!(" {}", stats.format_data()),
1321                                format!(" {} x", stats.data_count),
1322                                format!("{}", stats.format_avg_data()),
1323                                format!(" {}", stats.format_meta()),
1324                                format!(" {} x", stats.meta_count),
1325                                format!("{}", stats.format_avg_meta()),
1326                                format!(" {}", stats.upper_count),
1327                                format!(" {}", stats.collectibles_count),
1328                                format!(" {}", stats.aggregated_collectibles_count),
1329                                format!(" {}", stats.children_count),
1330                                format!(" {}", stats.followers_count),
1331                                format!(" {}", stats.collectibles_dependents_count),
1332                                format!(" {}", stats.aggregated_dirty_containers_count),
1333                                format!(" {}", FormatBytes(stats.output_size)),
1334                            ]
1335                        },
1336                    );
1337                }
1338            }
1339        }
1340
1341        let elapsed = start.elapsed();
1342        let persist_duration = persist_start.elapsed();
1343        // avoid spamming the event queue with information about fast operations
1344        if elapsed > Duration::from_secs(10) {
1345            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1346                "Finished writing to filesystem cache".to_string(),
1347                elapsed,
1348            )));
1349        }
1350
1351        let wall_start_ms = wall_start
1352            .duration_since(SystemTime::UNIX_EPOCH)
1353            .unwrap_or_default()
1354            // as_millis_f64 is not stable yet
1355            .as_secs_f64()
1356            * 1000.0;
1357        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1358        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1359            "turbopack-persistence",
1360            wall_start_ms,
1361            wall_end_ms,
1362            vec![
1363                ("reason", serde_json::Value::from(reason)),
1364                (
1365                    "snapshot_duration_ms",
1366                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1367                ),
1368                (
1369                    "persist_duration_ms",
1370                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1371                ),
1372                ("task_count", serde_json::Value::from(task_count)),
1373            ],
1374        )));
1375
1376        Ok((snapshot_time, true))
1377    }
1378
1379    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1380        if self.should_restore() {
1381            // Continue all uncompleted operations
1382            // They can't be interrupted by a snapshot since the snapshotting job has not been
1383            // scheduled yet.
1384            let uncompleted_operations = self
1385                .backing_storage
1386                .uncompleted_operations()
1387                .expect("Failed to get uncompleted operations");
1388            if !uncompleted_operations.is_empty() {
1389                let mut ctx = self.execute_context(turbo_tasks);
1390                for op in uncompleted_operations {
1391                    op.execute(&mut ctx);
1392                }
1393            }
1394        }
1395
1396        // Only when it should write regularly to the storage, we schedule the initial snapshot
1397        // job.
1398        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1399            // Schedule the snapshot job
1400            let _span = trace_span!("persisting background job").entered();
1401            let _span = tracing::info_span!("thread").entered();
1402            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1403        }
1404    }
1405
1406    fn stopping(&self) {
1407        self.stopping.store(true, Ordering::Release);
1408        self.stopping_event.notify(usize::MAX);
1409    }
1410
1411    #[allow(unused_variables)]
1412    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1413        #[cfg(feature = "verify_aggregation_graph")]
1414        {
1415            self.is_idle.store(false, Ordering::Release);
1416            self.verify_aggregation_graph(turbo_tasks, false);
1417        }
1418        if self.should_persist()
1419            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1420        {
1421            eprintln!("Persisting failed during shutdown: {err:?}");
1422        }
1423        self.storage.drop_contents();
1424        if let Err(err) = self.backing_storage.shutdown() {
1425            println!("Shutting down failed: {err}");
1426        }
1427    }
1428
1429    #[allow(unused_variables)]
1430    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1431        self.idle_start_event.notify(usize::MAX);
1432
1433        #[cfg(feature = "verify_aggregation_graph")]
1434        {
1435            use tokio::select;
1436
1437            self.is_idle.store(true, Ordering::Release);
1438            let this = self.clone();
1439            let turbo_tasks = turbo_tasks.pin();
1440            tokio::task::spawn(async move {
1441                select! {
1442                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1443                        // do nothing
1444                    }
1445                    _ = this.idle_end_event.listen() => {
1446                        return;
1447                    }
1448                }
1449                if !this.is_idle.load(Ordering::Relaxed) {
1450                    return;
1451                }
1452                this.verify_aggregation_graph(&*turbo_tasks, true);
1453            });
1454        }
1455    }
1456
1457    fn idle_end(&self) {
1458        #[cfg(feature = "verify_aggregation_graph")]
1459        self.is_idle.store(false, Ordering::Release);
1460        self.idle_end_event.notify(usize::MAX);
1461    }
1462
1463    fn get_or_create_task(
1464        &self,
1465        native_fn: &'static NativeFunction,
1466        this: Option<RawVc>,
1467        arg: &mut dyn StackDynTaskInputs,
1468        parent_task: Option<TaskId>,
1469        persistence: TaskPersistence,
1470        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1471    ) -> TaskId {
1472        let transient = matches!(persistence, TaskPersistence::Transient);
1473
1474        if transient
1475            && let Some(parent_task) = parent_task
1476            && !parent_task.is_transient()
1477        {
1478            let task_type = CachedTaskType {
1479                native_fn,
1480                this,
1481                arg: arg.take_box(),
1482            };
1483            self.panic_persistent_calling_transient(
1484                self.debug_get_task_description(parent_task),
1485                Some(&task_type),
1486                /* cell_id */ None,
1487            );
1488        }
1489
1490        let is_root = native_fn.is_root;
1491
1492        // Compute hash and shard index once from borrowed components (no heap allocation).
1493        let arg_ref = arg.as_ref();
1494        let hash = CachedTaskType::hash_from_components(
1495            self.storage.task_cache.hasher(),
1496            native_fn,
1497            this,
1498            arg_ref,
1499        );
1500        // Locate the shard once so that the read-only lookup and any
1501        // write-lock retry below share the same reference (saves a modulo +
1502        // memory lookup on the miss path).
1503        let shard = get_shard(&self.storage.task_cache, hash);
1504
1505        let mut ctx = self.execute_context(turbo_tasks);
1506        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1507        // Use a read lock rather than a write lock to avoid contention. connect_child
1508        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1509        if let Some(task_id) =
1510            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1511        {
1512            self.track_cache_hit_by_fn(native_fn);
1513            operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1514            return task_id;
1515        }
1516
1517        // Step 2: Check backing storage using borrowed components (no box needed yet).
1518
1519        // Task exists in backing storage.
1520        // We only need to insert it into the in-memory cache.
1521        let task_id = if !transient
1522            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1523        {
1524            self.track_cache_hit_by_fn(native_fn);
1525            // Step 3a: Insert into in-memory cache using the pre-located shard.
1526            // Use the existing Arc from storage to avoid a duplicate allocation.
1527            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1528                k.eq_components(native_fn, this, arg_ref)
1529            }) {
1530                RawEntry::Occupied(_) => {}
1531                RawEntry::Vacant(e) => {
1532                    e.insert(stored_type, task_id);
1533                }
1534            };
1535            task_id
1536        } else {
1537            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1538                k.eq_components(native_fn, this, arg_ref)
1539            }) {
1540                RawEntry::Occupied(e) => {
1541                    // Another thread beat us to creating this task — use their task_id.
1542                    // They will handle logging the new task as modified.
1543                    let task_id = *e.get();
1544                    drop(e);
1545                    self.track_cache_hit_by_fn(native_fn);
1546                    task_id
1547                }
1548                RawEntry::Vacant(e) => {
1549                    // Only now do we force the allocation.
1550                    // NOTE: if our caller had to perform resolution, then this will have already
1551                    // been boxed and take_box just takes it.
1552                    let task_type = CachedTaskTypeArc::new(CachedTaskType {
1553                        native_fn,
1554                        this,
1555                        arg: arg.take_box(),
1556                    });
1557                    let task_id = if transient {
1558                        self.transient_task_id_factory.get()
1559                    } else {
1560                        self.persisted_task_id_factory.get()
1561                    };
1562                    // Initialize storage BEFORE making task_id visible in the cache.
1563                    // This ensures any thread that reads task_id from the cache sees
1564                    // the storage entry already initialized (restored flags set).
1565                    self.storage
1566                        .initialize_new_task(task_id, Some(task_type.clone()));
1567                    // insert() consumes e, releasing the shard write lock.
1568                    e.insert(task_type, task_id);
1569                    self.track_cache_miss_by_fn(native_fn);
1570                    // Update the aggregation number before connecting the child
1571                    // We don't need this on any of the task recovery paths above because the
1572                    // aggregation number will already be set.
1573                    if is_root {
1574                        AggregationUpdateQueue::run(
1575                            AggregationUpdateJob::UpdateAggregationNumber {
1576                                task_id,
1577                                base_aggregation_number: u32::MAX,
1578                                distance: None,
1579                            },
1580                            &mut ctx,
1581                        );
1582                    } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1583                        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1584                        AggregationUpdateQueue::run(
1585                            AggregationUpdateJob::UpdateAggregationNumber {
1586                                task_id,
1587                                base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1588                                distance: None,
1589                            },
1590                            &mut ctx,
1591                        );
1592                    };
1593
1594                    task_id
1595                }
1596            }
1597        };
1598
1599        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1600
1601        task_id
1602    }
1603
1604    /// Generate an object that implements [`fmt::Display`] explaining why the given
1605    /// [`CachedTaskType`] is transient.
1606    fn debug_trace_transient_task(
1607        &self,
1608        task_type: &CachedTaskType,
1609        cell_id: Option<CellId>,
1610    ) -> DebugTraceTransientTask {
1611        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1612        // from tracing the same task many times, so use a visited_set
1613        fn inner_id(
1614            backend: &TurboTasksBackendInner<impl BackingStorage>,
1615            task_id: TaskId,
1616            cell_type_id: Option<ValueTypeId>,
1617            visited_set: &mut FxHashSet<TaskId>,
1618        ) -> DebugTraceTransientTask {
1619            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1620                if visited_set.contains(&task_id) {
1621                    let task_name = task_type.get_name();
1622                    DebugTraceTransientTask::Collapsed {
1623                        task_name,
1624                        cell_type_id,
1625                    }
1626                } else {
1627                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1628                }
1629            } else {
1630                DebugTraceTransientTask::Uncached { cell_type_id }
1631            }
1632        }
1633        fn inner_cached(
1634            backend: &TurboTasksBackendInner<impl BackingStorage>,
1635            task_type: &CachedTaskType,
1636            cell_type_id: Option<ValueTypeId>,
1637            visited_set: &mut FxHashSet<TaskId>,
1638        ) -> DebugTraceTransientTask {
1639            let task_name = task_type.get_name();
1640
1641            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1642                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1643                    // `task_id` should never be `None` at this point, as that would imply a
1644                    // non-local task is returning a local `Vc`...
1645                    // Just ignore if it happens, as we're likely already panicking.
1646                    return None;
1647                };
1648                if task_id.is_transient() {
1649                    Some(Box::new(inner_id(
1650                        backend,
1651                        task_id,
1652                        cause_self_raw_vc.try_get_type_id(),
1653                        visited_set,
1654                    )))
1655                } else {
1656                    None
1657                }
1658            });
1659            let cause_args = task_type
1660                .arg
1661                .get_raw_vcs()
1662                .into_iter()
1663                .filter_map(|raw_vc| {
1664                    let Some(task_id) = raw_vc.try_get_task_id() else {
1665                        // `task_id` should never be `None` (see comment above)
1666                        return None;
1667                    };
1668                    if !task_id.is_transient() {
1669                        return None;
1670                    }
1671                    Some((task_id, raw_vc.try_get_type_id()))
1672                })
1673                .collect::<IndexSet<_>>() // dedupe
1674                .into_iter()
1675                .map(|(task_id, cell_type_id)| {
1676                    inner_id(backend, task_id, cell_type_id, visited_set)
1677                })
1678                .collect();
1679
1680            DebugTraceTransientTask::Cached {
1681                task_name,
1682                cell_type_id,
1683                cause_self,
1684                cause_args,
1685            }
1686        }
1687        inner_cached(
1688            self,
1689            task_type,
1690            cell_id.map(|c| c.type_id),
1691            &mut FxHashSet::default(),
1692        )
1693    }
1694
1695    fn invalidate_task(
1696        &self,
1697        task_id: TaskId,
1698        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1699    ) {
1700        if !self.should_track_dependencies() {
1701            panic!("Dependency tracking is disabled so invalidation is not allowed");
1702        }
1703        operation::InvalidateOperation::run(
1704            smallvec![task_id],
1705            #[cfg(feature = "task_dirty_cause")]
1706            TaskDirtyCause::Invalidator,
1707            self.execute_context(turbo_tasks),
1708        );
1709    }
1710
1711    fn invalidate_tasks(
1712        &self,
1713        tasks: &[TaskId],
1714        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1715    ) {
1716        if !self.should_track_dependencies() {
1717            panic!("Dependency tracking is disabled so invalidation is not allowed");
1718        }
1719        operation::InvalidateOperation::run(
1720            tasks.iter().copied().collect(),
1721            #[cfg(feature = "task_dirty_cause")]
1722            TaskDirtyCause::Unknown,
1723            self.execute_context(turbo_tasks),
1724        );
1725    }
1726
1727    fn invalidate_tasks_set(
1728        &self,
1729        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1730        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1731    ) {
1732        if !self.should_track_dependencies() {
1733            panic!("Dependency tracking is disabled so invalidation is not allowed");
1734        }
1735        operation::InvalidateOperation::run(
1736            tasks.iter().copied().collect(),
1737            #[cfg(feature = "task_dirty_cause")]
1738            TaskDirtyCause::Unknown,
1739            self.execute_context(turbo_tasks),
1740        );
1741    }
1742
1743    fn invalidate_serialization(
1744        &self,
1745        task_id: TaskId,
1746        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1747    ) {
1748        if task_id.is_transient() {
1749            return;
1750        }
1751        let mut ctx = self.execute_context(turbo_tasks);
1752        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1753        task.invalidate_serialization();
1754    }
1755
1756    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1757        let task = self.storage.access_mut(task_id);
1758        if let Some(value) = task.get_persistent_task_type() {
1759            format!("{task_id:?} {}", value)
1760        } else if let Some(value) = task.get_transient_task_type() {
1761            format!("{task_id:?} {}", value)
1762        } else {
1763            format!("{task_id:?} unknown")
1764        }
1765    }
1766
1767    fn get_task_name(
1768        &self,
1769        task_id: TaskId,
1770        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1771    ) -> String {
1772        let mut ctx = self.execute_context(turbo_tasks);
1773        let task = ctx.task(task_id, TaskDataCategory::Data);
1774        if let Some(value) = task.get_persistent_task_type() {
1775            value.to_string()
1776        } else if let Some(value) = task.get_transient_task_type() {
1777            value.to_string()
1778        } else {
1779            "unknown".to_string()
1780        }
1781    }
1782
1783    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1784        let task = self.storage.access_mut(task_id);
1785        task.get_persistent_task_type().cloned()
1786    }
1787
1788    fn task_execution_canceled(
1789        &self,
1790        task_id: TaskId,
1791        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1792    ) {
1793        let mut ctx = self.execute_context(turbo_tasks);
1794        let mut task = ctx.task(task_id, TaskDataCategory::All);
1795        if let Some(in_progress) = task.take_in_progress() {
1796            match in_progress {
1797                InProgressState::Scheduled {
1798                    done_event,
1799                    reason: _,
1800                } => done_event.notify(usize::MAX),
1801                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1802                    done_event.notify(usize::MAX)
1803                }
1804                InProgressState::Canceled => {}
1805            }
1806        }
1807        // Notify any readers waiting on in-progress cells so their listeners
1808        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1809        let in_progress_cells = task.take_in_progress_cells();
1810        if let Some(ref cells) = in_progress_cells {
1811            for state in cells.values() {
1812                state.event.notify(usize::MAX);
1813            }
1814        }
1815
1816        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1817        // in the next session. Without this, any reader that encounters the cancelled task
1818        // records an error in its output. That error is persisted and would poison
1819        // subsequent builds. By marking the task session-dependent dirty, the next build
1820        // re-executes it, which invalidates dependents and corrects the stale errors.
1821        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1822            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1823        } else {
1824            None
1825        };
1826
1827        let old = task.set_in_progress(InProgressState::Canceled);
1828        debug_assert!(old.is_none(), "InProgress already exists");
1829        drop(task);
1830
1831        if let Some(data_update) = data_update {
1832            AggregationUpdateQueue::run(data_update, &mut ctx);
1833        }
1834
1835        drop(in_progress_cells);
1836    }
1837
1838    fn try_start_task_execution(
1839        &self,
1840        task_id: TaskId,
1841        priority: TaskPriority,
1842        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1843    ) -> Option<TaskExecutionSpec<'_>> {
1844        let execution_reason;
1845        let task_type;
1846        #[cfg(feature = "task_dirty_cause")]
1847        let cause;
1848        {
1849            let mut ctx = self.execute_context(turbo_tasks);
1850            let mut task = ctx.task(task_id, TaskDataCategory::All);
1851            task_type = task.get_task_type().to_owned();
1852            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1853            if let Some(tasks) = task.prefetch() {
1854                drop(task);
1855                ctx.prepare_tasks(tasks, "prefetch");
1856                task = ctx.task(task_id, TaskDataCategory::All);
1857            }
1858            let in_progress = task.take_in_progress()?;
1859            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1860                let old = task.set_in_progress(in_progress);
1861                debug_assert!(old.is_none(), "InProgress already exists");
1862                return None;
1863            };
1864            execution_reason = reason;
1865            #[cfg(feature = "task_dirty_cause")]
1866            {
1867                cause = match task.get_dirty() {
1868                    Some(Dirtyness::Dirty { cause, .. }) => Some(cause.clone()),
1869                    _ => None,
1870                };
1871            }
1872            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1873                InProgressStateInner {
1874                    stale: false,
1875                    once_task,
1876                    done_event,
1877                    marked_as_completed: false,
1878                    new_children: Default::default(),
1879                },
1880            )));
1881            debug_assert!(old.is_none(), "InProgress already exists");
1882
1883            // Make all current collectibles outdated (remove left-over outdated collectibles)
1884            enum Collectible {
1885                Current(CollectibleRef, i32),
1886                Outdated(CollectibleRef),
1887            }
1888            let collectibles = task
1889                .iter_collectibles()
1890                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1891                .chain(
1892                    task.iter_outdated_collectibles()
1893                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1894                )
1895                .collect::<Vec<_>>();
1896            for collectible in collectibles {
1897                match collectible {
1898                    Collectible::Current(collectible, value) => {
1899                        let _ = task.insert_outdated_collectible(collectible, value);
1900                    }
1901                    Collectible::Outdated(collectible) => {
1902                        if task
1903                            .collectibles()
1904                            .is_none_or(|m| m.get(&collectible).is_none())
1905                        {
1906                            task.remove_outdated_collectibles(&collectible);
1907                        }
1908                    }
1909                }
1910            }
1911
1912            if self.should_track_dependencies() {
1913                // Make all dependencies outdated
1914                let cell_dependencies = task.iter_cell_dependencies().collect();
1915                task.set_outdated_cell_dependencies(cell_dependencies);
1916
1917                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1918                task.set_outdated_output_dependencies(outdated_output_dependencies);
1919            }
1920        }
1921
1922        let (span, future) = match task_type {
1923            TaskType::Cached(task_type) => {
1924                let CachedTaskType {
1925                    native_fn,
1926                    this,
1927                    arg,
1928                } = &*task_type;
1929                (
1930                    native_fn.span(
1931                        task_id.persistence(),
1932                        execution_reason,
1933                        priority,
1934                        #[cfg(feature = "task_dirty_cause")]
1935                        cause.as_ref(),
1936                    ),
1937                    native_fn.execute(*this, &**arg),
1938                )
1939            }
1940            TaskType::Transient(task_type) => {
1941                let span = tracing::trace_span!("turbo_tasks::root_task");
1942                let future = match &*task_type {
1943                    TransientTask::Root(f) => f(),
1944                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1945                };
1946                (span, future)
1947            }
1948        };
1949        Some(TaskExecutionSpec { future, span })
1950    }
1951
1952    /// Returns `Some(priority)` if the task became stale during execution and needs to be
1953    /// re-scheduled at the given priority. The caller (turbo-tasks manager) hands it to the
1954    /// priority runner so re-execution doesn't inherit the original schedule priority.
1955    fn task_execution_completed(
1956        &self,
1957        task_id: TaskId,
1958        result: Result<RawVc, TurboTasksExecutionError>,
1959        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1960        #[cfg(feature = "verify_determinism")] stateful: bool,
1961        has_invalidator: bool,
1962        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1963    ) -> Option<TaskPriority> {
1964        // Task completion is a 4 step process:
1965        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1966        //    aggregation number of the task and the new children.
1967        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1968        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1969        // 4. Shrink the task memory to reduce footprint of the task.
1970
1971        // Due to persistence it is possible that the process is cancelled after any step. This is
1972        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1973        // in-memory representation.
1974
1975        // The task might be invalidated during this process, so we need to check the stale flag
1976        // at the start of every step.
1977
1978        #[cfg(not(feature = "trace_task_details"))]
1979        let span = tracing::trace_span!(
1980            "task execution completed",
1981            new_children = tracing::field::Empty
1982        )
1983        .entered();
1984        #[cfg(feature = "trace_task_details")]
1985        let span = tracing::trace_span!(
1986            "task execution completed",
1987            task_id = display(task_id),
1988            result = match result.as_ref() {
1989                Ok(value) => display(either::Either::Left(value)),
1990                Err(err) => display(either::Either::Right(err)),
1991            },
1992            new_children = tracing::field::Empty,
1993            immutable = tracing::field::Empty,
1994            new_output = tracing::field::Empty,
1995            output_dependents = tracing::field::Empty,
1996            aggregation_number = tracing::field::Empty,
1997            stale = tracing::field::Empty,
1998        )
1999        .entered();
2000
2001        let is_error = result.is_err();
2002
2003        let mut ctx = self.execute_context(turbo_tasks);
2004
2005        let TaskExecutionCompletePrepareResult {
2006            new_children,
2007            is_now_immutable,
2008            #[cfg(feature = "verify_determinism")]
2009            no_output_set,
2010            new_output,
2011            #[cfg(feature = "task_dirty_cause")]
2012            function_id,
2013            output_dependent_tasks,
2014            is_recomputation,
2015            is_session_dependent,
2016        } = match self.task_execution_completed_prepare(
2017            &mut ctx,
2018            #[cfg(feature = "trace_task_details")]
2019            &span,
2020            task_id,
2021            result,
2022            cell_counters,
2023            #[cfg(feature = "verify_determinism")]
2024            stateful,
2025            has_invalidator,
2026        ) {
2027            Ok(r) => r,
2028            Err(stale_priority) => {
2029                // Task was stale and has been rescheduled
2030                #[cfg(feature = "trace_task_details")]
2031                span.record("stale", "prepare");
2032                return Some(stale_priority);
2033            }
2034        };
2035
2036        #[cfg(feature = "trace_task_details")]
2037        span.record("new_output", new_output.is_some());
2038        #[cfg(feature = "trace_task_details")]
2039        span.record("output_dependents", output_dependent_tasks.len());
2040
2041        // When restoring from filesystem cache the following might not be executed (since we can
2042        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2043        // would be executed again.
2044
2045        if !output_dependent_tasks.is_empty() {
2046            self.task_execution_completed_invalidate_output_dependent(
2047                &mut ctx,
2048                task_id,
2049                #[cfg(feature = "task_dirty_cause")]
2050                function_id,
2051                output_dependent_tasks,
2052            );
2053        }
2054
2055        let has_new_children = !new_children.is_empty();
2056        span.record("new_children", new_children.len());
2057
2058        if has_new_children {
2059            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2060        }
2061
2062        if has_new_children
2063            && let Some(stale_priority) =
2064                self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2065        {
2066            // Task was stale and has been rescheduled
2067            #[cfg(feature = "trace_task_details")]
2068            span.record("stale", "connect");
2069            return Some(stale_priority);
2070        }
2071
2072        let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2073            &mut ctx,
2074            task_id,
2075            #[cfg(feature = "verify_determinism")]
2076            no_output_set,
2077            new_output,
2078            is_now_immutable,
2079            is_session_dependent,
2080        );
2081        if let Some(stale_priority) = stale_priority {
2082            // Task was stale and has been rescheduled
2083            #[cfg(feature = "trace_task_details")]
2084            span.record("stale", "finish");
2085            return Some(stale_priority);
2086        }
2087
2088        let removed_data = self.task_execution_completed_cleanup(
2089            &mut ctx,
2090            task_id,
2091            cell_counters,
2092            is_error,
2093            is_recomputation,
2094        );
2095
2096        // Drop data outside of critical sections
2097        drop(removed_data);
2098        drop(in_progress_cells);
2099
2100        None
2101    }
2102
2103    fn task_execution_completed_prepare(
2104        &self,
2105        ctx: &mut impl ExecuteContext<'_>,
2106        #[cfg(feature = "trace_task_details")] span: &Span,
2107        task_id: TaskId,
2108        result: Result<RawVc, TurboTasksExecutionError>,
2109        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2110        #[cfg(feature = "verify_determinism")] stateful: bool,
2111        has_invalidator: bool,
2112    ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2113        let mut task = ctx.task(task_id, TaskDataCategory::All);
2114        let is_recomputation = task.is_dirty().is_none();
2115        // Without dependency tracking, the SessionDependent dirty state is never read (no session
2116        // restore), so skip the work
2117        let is_session_dependent = self.should_track_dependencies()
2118            && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2119        let Some(in_progress) = task.get_in_progress_mut() else {
2120            panic!("Task execution completed, but task is not in progress: {task:#?}");
2121        };
2122        if matches!(in_progress, InProgressState::Canceled) {
2123            return Ok(TaskExecutionCompletePrepareResult {
2124                new_children: Default::default(),
2125                is_now_immutable: false,
2126                #[cfg(feature = "verify_determinism")]
2127                no_output_set: false,
2128                #[cfg(feature = "task_dirty_cause")]
2129                function_id: None,
2130                new_output: None,
2131                output_dependent_tasks: Default::default(),
2132                is_recomputation,
2133                is_session_dependent,
2134            });
2135        }
2136        let &mut InProgressState::InProgress(box InProgressStateInner {
2137            stale,
2138            ref mut new_children,
2139            once_task: is_once_task,
2140            ..
2141        }) = in_progress
2142        else {
2143            panic!("Task execution completed, but task is not in progress: {task:#?}");
2144        };
2145
2146        // If the task is stale, reschedule it
2147        #[cfg(not(feature = "no_fast_stale"))]
2148        if stale && !is_once_task {
2149            let stale_priority = compute_stale_priority(&task);
2150            let Some(InProgressState::InProgress(box InProgressStateInner {
2151                done_event,
2152                mut new_children,
2153                ..
2154            })) = task.take_in_progress()
2155            else {
2156                unreachable!();
2157            };
2158            let old = task.set_in_progress(InProgressState::Scheduled {
2159                done_event,
2160                reason: TaskExecutionReason::Stale,
2161            });
2162            debug_assert!(old.is_none(), "InProgress already exists");
2163            // Remove old children from new_children to leave only the children that had their
2164            // active count increased
2165            for task in task.iter_children() {
2166                new_children.remove(&task);
2167            }
2168            drop(task);
2169
2170            // We need to undo the active count increase for the children since we throw away the
2171            // new_children list now.
2172            AggregationUpdateQueue::run(
2173                AggregationUpdateJob::DecreaseActiveCounts {
2174                    task_ids: new_children.into_iter().collect(),
2175                },
2176                ctx,
2177            );
2178            return Err(stale_priority);
2179        }
2180
2181        // take the children from the task to process them
2182        let mut new_children = take(new_children);
2183
2184        // get the function id for the dirty cause
2185        #[cfg(feature = "task_dirty_cause")]
2186        let function_id = match task.get_task_type() {
2187            TaskTypeRef::Cached(task_type) => {
2188                Some(turbo_tasks::registry::get_function_id(task_type.native_fn))
2189            }
2190            TaskTypeRef::Transient(_) => None,
2191        };
2192
2193        // handle stateful (only tracked when verify_determinism is enabled)
2194        #[cfg(feature = "verify_determinism")]
2195        if stateful {
2196            task.set_stateful(true);
2197        }
2198
2199        // handle has_invalidator
2200        if has_invalidator {
2201            task.set_invalidator(true);
2202        }
2203
2204        // handle cell counters: update max index and remove cells that are no longer used
2205        // On error, skip this update: the task may have failed before creating all cells it
2206        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2207        // based on partial counters would cause "cell no longer exists" errors for tasks that
2208        // still hold dependencies on those cells. The old cell data is preserved on error
2209        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2210        // that data is correct.
2211        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2212        // skips cell data removal on error.
2213        if result.is_ok() || is_recomputation {
2214            let old_counters: FxHashMap<_, _> = task
2215                .iter_cell_type_max_index()
2216                .map(|(&k, &v)| (k, v))
2217                .collect();
2218            let mut counters_to_remove = old_counters.clone();
2219
2220            for (&cell_type, &max_index) in cell_counters.iter() {
2221                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2222                    if old_max_index != max_index {
2223                        task.insert_cell_type_max_index(cell_type, max_index);
2224                    }
2225                } else {
2226                    task.insert_cell_type_max_index(cell_type, max_index);
2227                }
2228            }
2229            for (cell_type, _) in counters_to_remove {
2230                task.remove_cell_type_max_index(&cell_type);
2231            }
2232        }
2233
2234        let mut queue = AggregationUpdateQueue::new();
2235
2236        let mut old_edges = Vec::new();
2237
2238        let has_children = !new_children.is_empty();
2239        let is_immutable = task.immutable();
2240        let task_dependencies_for_immutable =
2241            // Task was previously marked as immutable
2242            if !is_immutable
2243            // Task is not session dependent (session dependent tasks can change between sessions)
2244            && !is_session_dependent
2245            // Task has no invalidator
2246            && !task.invalidator()
2247            // Task has no dependencies on collectibles
2248            && task.is_collectibles_dependencies_empty()
2249        {
2250            Some(
2251                // Collect all dependencies on tasks to check if all dependencies are immutable
2252                task.iter_output_dependencies()
2253                    .chain(task.iter_cell_dependencies().map(|dep| dep.cell_ref().task))
2254                    .collect::<FxHashSet<_>>(),
2255            )
2256        } else {
2257            None
2258        };
2259
2260        if has_children {
2261            // Prepare all new children
2262            let _aggregation_number =
2263                prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2264
2265            #[cfg(feature = "trace_task_details")]
2266            span.record("aggregation_number", _aggregation_number);
2267
2268            // Filter actual new children
2269            old_edges.extend(
2270                task.iter_children()
2271                    .filter(|task| !new_children.remove(task))
2272                    .map(OutdatedEdge::Child),
2273            );
2274        } else {
2275            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2276        }
2277
2278        old_edges.extend(
2279            task.iter_outdated_collectibles()
2280                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2281        );
2282
2283        if self.should_track_dependencies() {
2284            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2285            // At execution start, active deps are copied to outdated as a "before" snapshot.
2286            // During execution, new deps are added to active.
2287            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2288            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2289            // breaking dependency tracking.
2290            old_edges.extend(
2291                task.iter_outdated_cell_dependencies()
2292                    .map(OutdatedEdge::CellDependency),
2293            );
2294            old_edges.extend(
2295                task.iter_outdated_output_dependencies()
2296                    .map(OutdatedEdge::OutputDependency),
2297            );
2298        }
2299
2300        // Check if output need to be updated
2301        let current_output = task.get_output();
2302        #[cfg(feature = "verify_determinism")]
2303        let no_output_set = current_output.is_none();
2304        let new_output = match result {
2305            Ok(RawVc::TaskOutput(output_task_id)) => {
2306                if let Some(OutputValue::Output(current_task_id)) = current_output
2307                    && *current_task_id == output_task_id
2308                {
2309                    None
2310                } else {
2311                    Some(OutputValue::Output(output_task_id))
2312                }
2313            }
2314            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2315                if let Some(OutputValue::Cell(CellRef {
2316                    task: current_task_id,
2317                    cell: current_cell,
2318                })) = current_output
2319                    && *current_task_id == output_task_id
2320                    && *current_cell == cell
2321                {
2322                    None
2323                } else {
2324                    Some(OutputValue::Cell(CellRef {
2325                        task: output_task_id,
2326                        cell,
2327                    }))
2328                }
2329            }
2330            Ok(RawVc::LocalOutput(..)) => {
2331                panic!("Non-local tasks must not return a local Vc");
2332            }
2333            Err(err) => {
2334                if let Some(OutputValue::Error(old_error)) = current_output
2335                    && **old_error == err
2336                {
2337                    None
2338                } else {
2339                    Some(OutputValue::Error(Arc::new((&err).into())))
2340                }
2341            }
2342        };
2343        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2344        // When output has changed, grab the dependent tasks
2345        if new_output.is_some() && ctx.should_track_dependencies() {
2346            output_dependent_tasks = task.iter_output_dependent().collect();
2347        }
2348
2349        drop(task);
2350
2351        // Check if the task can be marked as immutable
2352        let mut is_now_immutable = false;
2353        if let Some(dependencies) = task_dependencies_for_immutable
2354            && dependencies
2355                .iter()
2356                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2357        {
2358            is_now_immutable = true;
2359        }
2360        #[cfg(feature = "trace_task_details")]
2361        span.record("immutable", is_immutable || is_now_immutable);
2362
2363        if !queue.is_empty() || !old_edges.is_empty() {
2364            #[cfg(any(
2365                feature = "trace_task_completion",
2366                feature = "trace_aggregation_update_stats"
2367            ))]
2368            let _span =
2369                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2370                    .entered();
2371            // Remove outdated edges first, before removing in_progress+dirty flag.
2372            // We need to make sure all outdated edges are removed before the task can potentially
2373            // be scheduled and executed again
2374            #[cfg(feature = "trace_aggregation_update_stats")]
2375            {
2376                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2377                _span.record("stats", tracing::field::debug(stats));
2378            }
2379            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2380            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2381        }
2382
2383        Ok(TaskExecutionCompletePrepareResult {
2384            new_children,
2385            is_now_immutable,
2386            #[cfg(feature = "verify_determinism")]
2387            no_output_set,
2388            #[cfg(feature = "task_dirty_cause")]
2389            function_id,
2390            new_output,
2391            output_dependent_tasks,
2392            is_recomputation,
2393            is_session_dependent,
2394        })
2395    }
2396
2397    fn task_execution_completed_invalidate_output_dependent(
2398        &self,
2399        ctx: &mut impl ExecuteContext<'_>,
2400        task_id: TaskId,
2401        #[cfg(feature = "task_dirty_cause")] function_id: Option<FunctionId>,
2402        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2403    ) {
2404        debug_assert!(!output_dependent_tasks.is_empty());
2405
2406        #[cfg(feature = "task_dirty_cause")]
2407        let cause = match function_id {
2408            Some(function) => TaskDirtyCause::OutputChange { function },
2409            None => TaskDirtyCause::RootOutputChange,
2410        };
2411
2412        if output_dependent_tasks.len() > 1 {
2413            ctx.prepare_tasks(
2414                output_dependent_tasks
2415                    .iter()
2416                    .map(|&id| (id, TaskDataCategory::All)),
2417                "invalidate output dependents",
2418            );
2419        }
2420
2421        fn process_output_dependents(
2422            ctx: &mut impl ExecuteContext<'_>,
2423            task_id: TaskId,
2424            #[cfg(feature = "task_dirty_cause")] cause: &TaskDirtyCause,
2425            dependent_task_id: TaskId,
2426            queue: &mut AggregationUpdateQueue,
2427        ) {
2428            #[cfg(feature = "trace_task_output_dependencies")]
2429            let span = tracing::trace_span!(
2430                "invalidate output dependency",
2431                task = %task_id,
2432                dependent_task = %dependent_task_id,
2433                result = tracing::field::Empty,
2434            )
2435            .entered();
2436            let mut make_stale = true;
2437            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2438            let transient_task_type = dependent.get_transient_task_type();
2439            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2440                // once tasks are never invalidated
2441                #[cfg(feature = "trace_task_output_dependencies")]
2442                span.record("result", "once task");
2443                return;
2444            }
2445            if dependent.outdated_output_dependencies_contains(&task_id) {
2446                #[cfg(feature = "trace_task_output_dependencies")]
2447                span.record("result", "outdated dependency");
2448                // output dependency is outdated, so it hasn't read the output yet
2449                // and doesn't need to be invalidated
2450                // But importantly we still need to make the task dirty as it should no longer
2451                // be considered as "recomputation".
2452                make_stale = false;
2453            } else if !dependent.output_dependencies_contains(&task_id) {
2454                // output dependency has been removed, so the task doesn't depend on the
2455                // output anymore and doesn't need to be invalidated
2456                #[cfg(feature = "trace_task_output_dependencies")]
2457                span.record("result", "no backward dependency");
2458                return;
2459            }
2460            make_task_dirty_internal(
2461                dependent,
2462                dependent_task_id,
2463                make_stale,
2464                #[cfg(feature = "task_dirty_cause")]
2465                cause.clone(),
2466                queue,
2467                ctx,
2468            );
2469            #[cfg(feature = "trace_task_output_dependencies")]
2470            span.record("result", "marked dirty");
2471        }
2472
2473        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2474            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2475            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2476            let _ = scope_and_block(chunks.len(), |scope| {
2477                for chunk in chunks {
2478                    let child_ctx = ctx.child_context();
2479                    #[cfg(feature = "task_dirty_cause")]
2480                    let cause = &cause;
2481                    scope.spawn(move || {
2482                        let mut ctx = child_ctx.create();
2483                        let mut queue = AggregationUpdateQueue::new();
2484                        for dependent_task_id in chunk {
2485                            process_output_dependents(
2486                                &mut ctx,
2487                                task_id,
2488                                #[cfg(feature = "task_dirty_cause")]
2489                                cause,
2490                                dependent_task_id,
2491                                &mut queue,
2492                            )
2493                        }
2494                        queue.execute(&mut ctx);
2495                    });
2496                }
2497            });
2498        } else {
2499            let mut queue = AggregationUpdateQueue::new();
2500            for dependent_task_id in output_dependent_tasks {
2501                process_output_dependents(
2502                    ctx,
2503                    task_id,
2504                    #[cfg(feature = "task_dirty_cause")]
2505                    &cause,
2506                    dependent_task_id,
2507                    &mut queue,
2508                );
2509            }
2510            queue.execute(ctx);
2511        }
2512    }
2513
2514    fn task_execution_completed_unfinished_children_dirty(
2515        &self,
2516        ctx: &mut impl ExecuteContext<'_>,
2517        new_children: &FxHashSet<TaskId>,
2518    ) {
2519        debug_assert!(!new_children.is_empty());
2520
2521        let mut queue = AggregationUpdateQueue::new();
2522        ctx.for_each_task_all(
2523            new_children.iter().copied(),
2524            "unfinished children dirty",
2525            |child_task, ctx| {
2526                if !child_task.has_output() {
2527                    let child_id = child_task.id();
2528                    make_task_dirty_internal(
2529                        child_task,
2530                        child_id,
2531                        false,
2532                        #[cfg(feature = "task_dirty_cause")]
2533                        TaskDirtyCause::InitialDirty,
2534                        &mut queue,
2535                        ctx,
2536                    );
2537                }
2538            },
2539        );
2540
2541        queue.execute(ctx);
2542    }
2543
2544    fn task_execution_completed_connect(
2545        &self,
2546        ctx: &mut impl ExecuteContext<'_>,
2547        task_id: TaskId,
2548        new_children: FxHashSet<TaskId>,
2549    ) -> Option<TaskPriority> {
2550        debug_assert!(!new_children.is_empty());
2551
2552        let mut task = ctx.task(task_id, TaskDataCategory::All);
2553        let Some(in_progress) = task.get_in_progress() else {
2554            panic!("Task execution completed, but task is not in progress: {task:#?}");
2555        };
2556        if matches!(in_progress, InProgressState::Canceled) {
2557            // Task was canceled in the meantime, so we don't connect the children
2558            return None;
2559        }
2560        let InProgressState::InProgress(box InProgressStateInner {
2561            #[cfg(not(feature = "no_fast_stale"))]
2562            stale,
2563            once_task: is_once_task,
2564            ..
2565        }) = in_progress
2566        else {
2567            panic!("Task execution completed, but task is not in progress: {task:#?}");
2568        };
2569
2570        // If the task is stale, reschedule it
2571        #[cfg(not(feature = "no_fast_stale"))]
2572        if *stale && !is_once_task {
2573            let stale_priority = compute_stale_priority(&task);
2574            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2575                task.take_in_progress()
2576            else {
2577                unreachable!();
2578            };
2579            let old = task.set_in_progress(InProgressState::Scheduled {
2580                done_event,
2581                reason: TaskExecutionReason::Stale,
2582            });
2583            debug_assert!(old.is_none(), "InProgress already exists");
2584            drop(task);
2585
2586            // All `new_children` are currently hold active with an active count and we need to undo
2587            // that. (We already filtered out the old children from that list)
2588            AggregationUpdateQueue::run(
2589                AggregationUpdateJob::DecreaseActiveCounts {
2590                    task_ids: new_children.into_iter().collect(),
2591                },
2592                ctx,
2593            );
2594            return Some(stale_priority);
2595        }
2596
2597        let has_active_count = ctx.should_track_activeness()
2598            && task
2599                .get_activeness()
2600                .is_some_and(|activeness| activeness.active_counter > 0);
2601        connect_children(
2602            ctx,
2603            task_id,
2604            task,
2605            new_children,
2606            has_active_count,
2607            ctx.should_track_activeness(),
2608        );
2609
2610        None
2611    }
2612
2613    #[allow(clippy::type_complexity)]
2614    fn task_execution_completed_finish(
2615        &self,
2616        ctx: &mut impl ExecuteContext<'_>,
2617        task_id: TaskId,
2618        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2619        new_output: Option<OutputValue>,
2620        is_now_immutable: bool,
2621        is_session_dependent: bool,
2622    ) -> (
2623        Option<TaskPriority>,
2624        Option<
2625            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2626        >,
2627    ) {
2628        let mut task = ctx.task(task_id, TaskDataCategory::All);
2629        let Some(in_progress) = task.take_in_progress() else {
2630            panic!("Task execution completed, but task is not in progress: {task:#?}");
2631        };
2632        if matches!(in_progress, InProgressState::Canceled) {
2633            // Task was canceled in the meantime, so we don't finish it
2634            return (None, None);
2635        }
2636        let InProgressState::InProgress(box InProgressStateInner {
2637            done_event,
2638            once_task: is_once_task,
2639            stale,
2640            marked_as_completed: _,
2641            new_children,
2642        }) = in_progress
2643        else {
2644            panic!("Task execution completed, but task is not in progress: {task:#?}");
2645        };
2646        debug_assert!(new_children.is_empty());
2647
2648        // If the task is stale, reschedule it
2649        if stale && !is_once_task {
2650            let stale_priority = compute_stale_priority(&task);
2651            let old = task.set_in_progress(InProgressState::Scheduled {
2652                done_event,
2653                reason: TaskExecutionReason::Stale,
2654            });
2655            debug_assert!(old.is_none(), "InProgress already exists");
2656            return (Some(stale_priority), None);
2657        }
2658
2659        // Set the output if it has changed
2660        let mut old_content = None;
2661        if let Some(value) = new_output {
2662            old_content = task.set_output(value);
2663        }
2664
2665        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2666        // to be invalidated and we can mark it as immutable.
2667        if is_now_immutable {
2668            task.set_immutable(true);
2669        }
2670
2671        // Notify in progress cells and remove all of them
2672        let in_progress_cells = task.take_in_progress_cells();
2673        if let Some(ref cells) = in_progress_cells {
2674            for state in cells.values() {
2675                state.event.notify(usize::MAX);
2676            }
2677        }
2678
2679        // Compute and apply the new dirty state, propagating to aggregating ancestors
2680        let new_dirtyness = if is_session_dependent {
2681            Some(Dirtyness::SessionDependent)
2682        } else {
2683            None
2684        };
2685        #[cfg(feature = "verify_determinism")]
2686        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2687        let data_update = task.update_dirty_state(new_dirtyness);
2688
2689        // Under verify_determinism we re-run a task whose dirty state or output unexpectedly
2690        // changed during execution to confirm determinism. The leaf priority keeps these
2691        // verification reruns at the highest priority.
2692        #[cfg(feature = "verify_determinism")]
2693        let stale_priority: Option<TaskPriority> =
2694            ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2695                .then(TaskPriority::leaf);
2696        #[cfg(not(feature = "verify_determinism"))]
2697        let stale_priority: Option<TaskPriority> = None;
2698        if stale_priority.is_some() {
2699            let old = task.set_in_progress(InProgressState::Scheduled {
2700                done_event,
2701                reason: TaskExecutionReason::Stale,
2702            });
2703            debug_assert!(old.is_none(), "InProgress already exists");
2704            drop(task);
2705        } else {
2706            drop(task);
2707
2708            // Notify dependent tasks that are waiting for this task to finish
2709            done_event.notify(usize::MAX);
2710        }
2711
2712        drop(old_content);
2713
2714        if let Some(data_update) = data_update {
2715            AggregationUpdateQueue::run(data_update, ctx);
2716        }
2717
2718        // We return so the data can be dropped outside of critical sections
2719        (stale_priority, in_progress_cells)
2720    }
2721
2722    fn task_execution_completed_cleanup(
2723        &self,
2724        ctx: &mut impl ExecuteContext<'_>,
2725        task_id: TaskId,
2726        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2727        is_error: bool,
2728        is_recomputation: bool,
2729    ) -> Vec<SharedReference> {
2730        let mut task = ctx.task(task_id, TaskDataCategory::All);
2731        let mut removed_cell_data = Vec::new();
2732        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2733        // after an error as it is likely transient and we want to keep the dependent tasks
2734        // clean to avoid re-executions.
2735        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2736        // skips cell_type_max_index updates on error.
2737        if !is_error || is_recomputation {
2738            // Remove no longer existing cells and
2739            // find all outdated data items (removed cells, outdated edges)
2740            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2741            // anyway and we want to avoid needless re-executions. When the cells become
2742            // used again, they are invalidated from the update cell operation.
2743            let to_remove: Vec<_> = task
2744                .iter_cell_data()
2745                .filter_map(|(cell, _)| {
2746                    cell_counters
2747                        .get(&cell.type_id)
2748                        .is_none_or(|start_index| cell.index >= *start_index)
2749                        .then_some(*cell)
2750                })
2751                .collect();
2752            removed_cell_data.reserve_exact(to_remove.len());
2753            for cell in to_remove {
2754                if let Some(data) = task.remove_cell_data(&cell) {
2755                    removed_cell_data.push(data);
2756                }
2757            }
2758            // Remove cell_data_hash entries for cells that no longer exist
2759            let to_remove_hash: Vec<_> = task
2760                .iter_cell_data_hash()
2761                .filter_map(|(cell, _)| {
2762                    cell_counters
2763                        .get(&cell.type_id)
2764                        .is_none_or(|start_index| cell.index >= *start_index)
2765                        .then_some(*cell)
2766                })
2767                .collect();
2768            for cell in to_remove_hash {
2769                task.remove_cell_data_hash(&cell);
2770            }
2771        }
2772
2773        // Clean up task storage after execution:
2774        // - Shrink collections marked with shrink_on_completion
2775        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2776        task.cleanup_after_execution();
2777
2778        drop(task);
2779
2780        // Return so we can drop outside of critical sections
2781        removed_cell_data
2782    }
2783
2784    /// Prints the standard message emitted when the background persisting process stops due to an
2785    /// unrecoverable write error. The caller is responsible for returning from the background job.
2786    fn log_unrecoverable_persist_error() {
2787        eprintln!(
2788            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2789             background persisting process."
2790        );
2791    }
2792
2793    fn run_backend_job<'a>(
2794        self: &'a Arc<Self>,
2795        job: TurboTasksBackendJob,
2796        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2797    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2798        Box::pin(async move {
2799            match job {
2800                TurboTasksBackendJob::Snapshot => {
2801                    debug_assert!(self.should_persist());
2802
2803                    let mut last_snapshot = self.start_time;
2804                    let mut idle_start_listener = self.idle_start_event.listen();
2805                    let mut idle_end_listener = self.idle_end_event.listen();
2806                    // Whether to immediately set an idle timeout if possible.
2807                    // Set to false if we don't persist anything in a cycle.
2808                    let mut fresh_idle = true;
2809                    let mut evicted = false;
2810                    let mut is_first = true;
2811                    'outer: loop {
2812                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2813                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2814                        let idle_timeout = *IDLE_TIMEOUT;
2815                        let (time, mut reason) = if is_first {
2816                            (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2817                        } else {
2818                            (SNAPSHOT_INTERVAL, "regular snapshot interval")
2819                        };
2820
2821                        let until = last_snapshot + time;
2822                        if until > Instant::now() {
2823                            let mut stop_listener = self.stopping_event.listen();
2824                            if self.stopping.load(Ordering::Acquire) {
2825                                return;
2826                            }
2827                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2828                                Instant::now() + idle_timeout
2829                            } else {
2830                                far_future()
2831                            };
2832                            loop {
2833                                tokio::select! {
2834                                    _ = &mut stop_listener => {
2835                                        return;
2836                                    },
2837                                    _ = &mut idle_start_listener => {
2838                                        idle_time = Instant::now() + idle_timeout;
2839                                        idle_start_listener = self.idle_start_event.listen()
2840                                    },
2841                                    _ = &mut idle_end_listener => {
2842                                        idle_time = far_future();
2843                                        idle_end_listener = self.idle_end_event.listen()
2844                                    },
2845                                    _ = tokio::time::sleep_until(until) => {
2846                                        break;
2847                                    },
2848                                    _ = tokio::time::sleep_until(idle_time) => {
2849                                        if turbo_tasks.is_idle() {
2850                                            reason = "idle timeout";
2851                                            break;
2852                                        }
2853                                    },
2854                                }
2855                            }
2856                        }
2857
2858                        let this = self.clone();
2859                        // Create a root span shared by both the snapshot/persist
2860                        // work and the subsequent compaction so they appear
2861                        // grouped together in trace viewers.
2862                        let background_span =
2863                            tracing::info_span!(parent: None, "background snapshot");
2864                        match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2865                            Err(err) => {
2866                                // save_snapshot consumed persisted_task_cache_log entries;
2867                                // further snapshots would corrupt the task graph.
2868                                eprintln!("Persisting failed: {err:?}");
2869                                Self::log_unrecoverable_persist_error();
2870                                return;
2871                            }
2872                            Ok((snapshot_start, new_data)) => {
2873                                // if we see 'new_data' then the next idle transition is 'fresh'
2874                                fresh_idle = new_data;
2875                                is_first = false;
2876                                last_snapshot = snapshot_start;
2877
2878                                // Polls the idle-end event without blocking. Returns
2879                                // `true` and refreshes the listener if idle has ended,
2880                                // `false` if we are still idle.
2881                                macro_rules! check_idle_ended {
2882                                    () => {{
2883                                        tokio::select! {
2884                                            biased;
2885                                            _ = &mut idle_end_listener => {
2886                                                idle_end_listener = self.idle_end_event.listen();
2887                                                true
2888                                            },
2889                                            _ = std::future::ready(()) => false,
2890                                        }
2891                                    }};
2892                                }
2893                                // Evict persisted tasks from memory to reclaim space.
2894                                // Like compaction, this runs after snapshot_and_persist
2895                                // as a separate concern.
2896                                //
2897                                // TODO: improve eviction policy — current approach is a full sweep
2898                                // after every snapshot. Better strategies to consider:
2899                                //   - Memory pressure signals: only evict when RSS exceeds a
2900                                //     threshold rather than unconditionally.
2901                                //   - Recency data: track last-access time per task and evict
2902                                //     least-recently-used entries first rather than all at once.
2903                                //   - Eviction intensity: partial sweeps (evict a fraction of
2904                                //     eligible tasks per cycle) to reduce latency spikes.
2905                                // Evict when there is new data to persist (the common
2906                                // case) or on the very first snapshot after startup
2907                                // (data was already on disk from a prior run, so
2908                                // new_data may be false but in-memory state can still
2909                                // be evicted).
2910                                let mut ran_eviction = false;
2911                                if this.should_evict() && (new_data || !evicted) {
2912                                    if check_idle_ended!() {
2913                                        // need to start all the way over so we catch the next
2914                                        // signal
2915                                        continue 'outer;
2916                                    }
2917                                    evicted = true;
2918                                    ran_eviction = true;
2919                                    this.storage.evict_after_snapshot(background_span.id());
2920                                }
2921
2922                                // Compact while idle (up to limit), regardless of
2923                                // whether the snapshot had new data.
2924                                // `background_span` is not entered here because
2925                                // `EnteredSpan` is `!Send` and would prevent the
2926                                // future from being sent across threads when it
2927                                // suspends at the `select!` await below.
2928                                let mut ran_compaction = false;
2929                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2930                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2931                                    if check_idle_ended!() {
2932                                        continue 'outer;
2933                                    }
2934                                    // Enter the span only around the synchronous
2935                                    // compact() call so we never hold an
2936                                    // `EnteredSpan` across an await point.
2937                                    let _compact_span = tracing::info_span!(
2938                                        parent: background_span.id(),
2939                                        "compact database"
2940                                    )
2941                                    .entered();
2942                                    match self.backing_storage.compact() {
2943                                        Ok(true) => {
2944                                            ran_compaction = true;
2945                                        }
2946                                        Ok(false) => break,
2947                                        Err(err) => {
2948                                            eprintln!("Compaction failed: {err:?}");
2949                                            if self.backing_storage.has_unrecoverable_write_error()
2950                                            {
2951                                                Self::log_unrecoverable_persist_error();
2952                                                return;
2953                                            }
2954                                            break;
2955                                        }
2956                                    }
2957                                }
2958                                if check_idle_ended!() {
2959                                    continue 'outer;
2960                                }
2961                                // After running snapshotting/eviction/compaction we have churned a
2962                                // _lot_ of memory if we are still
2963                                // idle tell `mimalloc` that now would be a good time to release
2964                                // memory back to the OS
2965                                if new_data || ran_compaction || ran_eviction {
2966                                    turbo_tasks_malloc::TurboMalloc::collect(true);
2967                                }
2968                            }
2969                        }
2970                    }
2971                }
2972            }
2973        })
2974    }
2975
2976    fn try_read_own_task_cell(
2977        &self,
2978        task_id: TaskId,
2979        cell: CellId,
2980        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2981    ) -> Result<TypedCellContent> {
2982        let mut ctx = self.execute_context(turbo_tasks);
2983        let task = ctx.task(task_id, TaskDataCategory::Data);
2984        if let Some(content) = task.get_cell_data(&cell).cloned() {
2985            Ok(CellContent(Some(content)).into_typed(cell.type_id))
2986        } else {
2987            Ok(CellContent(None).into_typed(cell.type_id))
2988        }
2989    }
2990
2991    fn read_task_collectibles(
2992        &self,
2993        task_id: TaskId,
2994        collectible_type: TraitTypeId,
2995        reader_id: Option<TaskId>,
2996        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2997    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2998        let mut ctx = self.execute_context(turbo_tasks);
2999        let mut collectibles = AutoMap::default();
3000        {
3001            let mut task = ctx.task(task_id, TaskDataCategory::All);
3002            if task
3003                .get_persistent_task_type()
3004                .is_some_and(|t| !t.native_fn.is_root)
3005            {
3006                drop(task);
3007                panic!(
3008                    "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
3009                     is missing on the task.",
3010                    self.debug_get_task_description(task_id),
3011                    reader_id.map_or_else(
3012                        || "unknown".to_string(),
3013                        |r| self.debug_get_task_description(r)
3014                    )
3015                );
3016            }
3017            for (collectible, count) in task.iter_aggregated_collectibles() {
3018                if *count > 0 && collectible.collectible_type == collectible_type {
3019                    *collectibles
3020                        .entry(RawVc::TaskCell(
3021                            collectible.cell.task,
3022                            collectible.cell.cell,
3023                        ))
3024                        .or_insert(0) += 1;
3025                }
3026            }
3027            for (&collectible, &count) in task.iter_collectibles() {
3028                if collectible.collectible_type == collectible_type {
3029                    *collectibles
3030                        .entry(RawVc::TaskCell(
3031                            collectible.cell.task,
3032                            collectible.cell.cell,
3033                        ))
3034                        .or_insert(0) += count;
3035                }
3036            }
3037            if let Some(reader_id) = reader_id {
3038                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3039            }
3040        }
3041        if let Some(reader_id) = reader_id {
3042            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3043            let target = CollectiblesRef {
3044                task: task_id,
3045                collectible_type,
3046            };
3047            if !reader.remove_outdated_collectibles_dependencies(&target) {
3048                let _ = reader.add_collectibles_dependencies(target);
3049            }
3050        }
3051        collectibles
3052    }
3053
3054    fn emit_collectible(
3055        &self,
3056        collectible_type: TraitTypeId,
3057        collectible: RawVc,
3058        task_id: TaskId,
3059        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3060    ) {
3061        self.assert_valid_collectible(task_id, collectible);
3062
3063        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3064            panic!("Collectibles need to be resolved");
3065        };
3066        let cell = CellRef {
3067            task: collectible_task,
3068            cell,
3069        };
3070        operation::UpdateCollectibleOperation::run(
3071            task_id,
3072            CollectibleRef {
3073                collectible_type,
3074                cell,
3075            },
3076            1,
3077            self.execute_context(turbo_tasks),
3078        );
3079    }
3080
3081    fn unemit_collectible(
3082        &self,
3083        collectible_type: TraitTypeId,
3084        collectible: RawVc,
3085        count: u32,
3086        task_id: TaskId,
3087        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3088    ) {
3089        self.assert_valid_collectible(task_id, collectible);
3090
3091        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3092            panic!("Collectibles need to be resolved");
3093        };
3094        let cell = CellRef {
3095            task: collectible_task,
3096            cell,
3097        };
3098        operation::UpdateCollectibleOperation::run(
3099            task_id,
3100            CollectibleRef {
3101                collectible_type,
3102                cell,
3103            },
3104            -(i32::try_from(count).unwrap()),
3105            self.execute_context(turbo_tasks),
3106        );
3107    }
3108
3109    fn update_task_cell(
3110        &self,
3111        task_id: TaskId,
3112        cell: CellId,
3113        content: CellContent,
3114        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3115        content_hash: Option<CellHash>,
3116        verification_mode: VerificationMode,
3117        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3118    ) {
3119        operation::UpdateCellOperation::run(
3120            task_id,
3121            cell,
3122            content,
3123            updated_key_hashes,
3124            content_hash,
3125            verification_mode,
3126            self.execute_context(turbo_tasks),
3127        );
3128    }
3129
3130    fn mark_own_task_as_finished(
3131        &self,
3132        task: TaskId,
3133        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3134    ) {
3135        let mut ctx = self.execute_context(turbo_tasks);
3136        let mut task = ctx.task(task, TaskDataCategory::Data);
3137        if let Some(InProgressState::InProgress(box InProgressStateInner {
3138            marked_as_completed,
3139            ..
3140        })) = task.get_in_progress_mut()
3141        {
3142            *marked_as_completed = true;
3143            // TODO this should remove the dirty state (also check session_dependent)
3144            // but this would break some assumptions for strongly consistent reads.
3145            // Client tasks are not connected yet, so we wouldn't wait for them.
3146            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3147        }
3148    }
3149
3150    fn connect_task(
3151        &self,
3152        task: TaskId,
3153        parent_task: Option<TaskId>,
3154        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3155    ) {
3156        self.assert_not_persistent_calling_transient(parent_task, task, None);
3157        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3158    }
3159
3160    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3161        let task_id = self.transient_task_id_factory.get();
3162        {
3163            let mut task = self.storage.access_mut(task_id);
3164            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3165        }
3166        #[cfg(feature = "verify_aggregation_graph")]
3167        self.root_tasks.lock().insert(task_id);
3168        task_id
3169    }
3170
3171    fn dispose_root_task(
3172        &self,
3173        task_id: TaskId,
3174        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3175    ) {
3176        #[cfg(feature = "verify_aggregation_graph")]
3177        self.root_tasks.lock().remove(&task_id);
3178
3179        let mut ctx = self.execute_context(turbo_tasks);
3180        let mut task = ctx.task(task_id, TaskDataCategory::All);
3181        let is_dirty = task.is_dirty();
3182        let has_dirty_containers = task.has_dirty_containers();
3183        if is_dirty.is_some() || has_dirty_containers {
3184            if let Some(activeness_state) = task.get_activeness_mut() {
3185                // We will finish the task, but it would be removed after the task is done
3186                activeness_state.unset_root_type();
3187                activeness_state.set_active_until_clean();
3188            };
3189        } else if let Some(activeness_state) = task.take_activeness() {
3190            // Technically nobody should be listening to this event, but just in case
3191            // we notify it anyway
3192            activeness_state.all_clean_event.notify(usize::MAX);
3193        }
3194    }
3195
3196    #[cfg(feature = "verify_aggregation_graph")]
3197    fn verify_aggregation_graph(
3198        &self,
3199        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3200        idle: bool,
3201    ) {
3202        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3203            return;
3204        }
3205        use std::{collections::VecDeque, env, io::stdout};
3206
3207        use crate::backend::operation::{get_uppers, is_aggregating_node};
3208
3209        let mut ctx = self.execute_context(turbo_tasks);
3210        let root_tasks = self.root_tasks.lock().clone();
3211
3212        for task_id in root_tasks.into_iter() {
3213            let mut queue = VecDeque::new();
3214            let mut visited = FxHashSet::default();
3215            let mut aggregated_nodes = FxHashSet::default();
3216            let mut collectibles = FxHashMap::default();
3217            let root_task_id = task_id;
3218            visited.insert(task_id);
3219            aggregated_nodes.insert(task_id);
3220            queue.push_back(task_id);
3221            let mut counter = 0;
3222            while let Some(task_id) = queue.pop_front() {
3223                counter += 1;
3224                if counter % 100000 == 0 {
3225                    println!(
3226                        "queue={}, visited={}, aggregated_nodes={}",
3227                        queue.len(),
3228                        visited.len(),
3229                        aggregated_nodes.len()
3230                    );
3231                }
3232                let task = ctx.task(task_id, TaskDataCategory::All);
3233                if idle && !self.is_idle.load(Ordering::Relaxed) {
3234                    return;
3235                }
3236
3237                let uppers = get_uppers(&task);
3238                if task_id != root_task_id
3239                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3240                {
3241                    panic!(
3242                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3243                         {:?})",
3244                        task_id,
3245                        task.get_task_description(),
3246                        uppers
3247                    );
3248                }
3249
3250                for (collectible, _) in task.iter_aggregated_collectibles() {
3251                    collectibles
3252                        .entry(*collectible)
3253                        .or_insert_with(|| (false, Vec::new()))
3254                        .1
3255                        .push(task_id);
3256                }
3257
3258                for (&collectible, &value) in task.iter_collectibles() {
3259                    if value > 0 {
3260                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3261                            *flag = true
3262                        } else {
3263                            panic!(
3264                                "Task {} has a collectible {:?} that is not in any upper task",
3265                                task_id, collectible
3266                            );
3267                        }
3268                    }
3269                }
3270
3271                let is_dirty = task.has_dirty();
3272                let has_dirty_container = task.has_dirty_containers();
3273                let should_be_in_upper = is_dirty || has_dirty_container;
3274
3275                let aggregation_number = get_aggregation_number(&task);
3276                if is_aggregating_node(aggregation_number) {
3277                    aggregated_nodes.insert(task_id);
3278                }
3279                // println!(
3280                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3281                //     ctx.get_task_description(task_id),
3282                //     uppers
3283                // );
3284
3285                for child_id in task.iter_children() {
3286                    // println!("{task_id}: child -> {child_id}");
3287                    if visited.insert(child_id) {
3288                        queue.push_back(child_id);
3289                    }
3290                }
3291                drop(task);
3292
3293                if should_be_in_upper {
3294                    for upper_id in uppers {
3295                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3296                        let in_upper = upper
3297                            .get_aggregated_dirty_containers(&task_id)
3298                            .is_some_and(|&dirty| dirty > 0);
3299                        if !in_upper {
3300                            let containers: Vec<_> = upper
3301                                .iter_aggregated_dirty_containers()
3302                                .map(|(&k, &v)| (k, v))
3303                                .collect();
3304                            let upper_task_desc = upper.get_task_description();
3305                            drop(upper);
3306                            panic!(
3307                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3308                                 ({})\nThese dirty containers are present:\n{:#?}",
3309                                task_id,
3310                                ctx.task(task_id, TaskDataCategory::Data)
3311                                    .get_task_description(),
3312                                upper_id,
3313                                upper_task_desc,
3314                                containers,
3315                            );
3316                        }
3317                    }
3318                }
3319            }
3320
3321            for (collectible, (flag, task_ids)) in collectibles {
3322                if !flag {
3323                    use std::io::Write;
3324                    let mut stdout = stdout().lock();
3325                    writeln!(
3326                        stdout,
3327                        "{:?} that is not emitted in any child task but in these aggregated \
3328                         tasks: {:#?}",
3329                        collectible,
3330                        task_ids
3331                            .iter()
3332                            .map(|t| format!(
3333                                "{t} {}",
3334                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3335                            ))
3336                            .collect::<Vec<_>>()
3337                    )
3338                    .unwrap();
3339
3340                    let task_id = collectible.cell.task;
3341                    let mut queue = {
3342                        let task = ctx.task(task_id, TaskDataCategory::All);
3343                        get_uppers(&task)
3344                    };
3345                    let mut visited = FxHashSet::default();
3346                    for &upper_id in queue.iter() {
3347                        visited.insert(upper_id);
3348                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3349                    }
3350                    while let Some(task_id) = queue.pop() {
3351                        let task = ctx.task(task_id, TaskDataCategory::All);
3352                        let desc = task.get_task_description();
3353                        let aggregated_collectible = task
3354                            .get_aggregated_collectibles(&collectible)
3355                            .copied()
3356                            .unwrap_or_default();
3357                        let uppers = get_uppers(&task);
3358                        drop(task);
3359                        writeln!(
3360                            stdout,
3361                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3362                        )
3363                        .unwrap();
3364                        if task_ids.contains(&task_id) {
3365                            writeln!(
3366                                stdout,
3367                                "Task has an upper connection to an aggregated task that doesn't \
3368                                 reference it. Upper connection is invalid!"
3369                            )
3370                            .unwrap();
3371                        }
3372                        for upper_id in uppers {
3373                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3374                            if !visited.contains(&upper_id) {
3375                                queue.push(upper_id);
3376                            }
3377                        }
3378                    }
3379                    panic!("See stdout for more details");
3380                }
3381            }
3382        }
3383    }
3384
3385    fn assert_not_persistent_calling_transient(
3386        &self,
3387        parent_id: Option<TaskId>,
3388        child_id: TaskId,
3389        cell_id: Option<CellId>,
3390    ) {
3391        if let Some(parent_id) = parent_id
3392            && !parent_id.is_transient()
3393            && child_id.is_transient()
3394        {
3395            self.panic_persistent_calling_transient(
3396                self.debug_get_task_description(parent_id),
3397                self.debug_get_cached_task_type(child_id).as_deref(),
3398                cell_id,
3399            );
3400        }
3401    }
3402
3403    fn panic_persistent_calling_transient(
3404        &self,
3405        parent: String,
3406        child: Option<&CachedTaskType>,
3407        cell_id: Option<CellId>,
3408    ) -> ! {
3409        let transient_reason = if let Some(child) = child {
3410            Cow::Owned(format!(
3411                " The callee is transient because it depends on:\n{}",
3412                self.debug_trace_transient_task(child, cell_id),
3413            ))
3414        } else {
3415            Cow::Borrowed("")
3416        };
3417        panic!(
3418            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3419            parent,
3420            child.map_or("unknown", |t| t.get_name()),
3421            transient_reason,
3422        );
3423    }
3424
3425    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3426        // these checks occur in a potentially hot codepath, but they're cheap
3427        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3428            // This should never happen: The collectible APIs use ResolvedVc
3429            let task_info = if let Some(col_task_ty) = collectible
3430                .try_get_task_id()
3431                .map(|t| self.debug_get_task_description(t))
3432            {
3433                Cow::Owned(format!(" (return type of {col_task_ty})"))
3434            } else {
3435                Cow::Borrowed("")
3436            };
3437            panic!("Collectible{task_info} must be a ResolvedVc")
3438        };
3439        if col_task_id.is_transient() && !task_id.is_transient() {
3440            let transient_reason =
3441                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3442                    Cow::Owned(format!(
3443                        ". The collectible is transient because it depends on:\n{}",
3444                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3445                    ))
3446                } else {
3447                    Cow::Borrowed("")
3448                };
3449            // this should never happen: How would a persistent function get a transient Vc?
3450            panic!(
3451                "Collectible is transient, transient collectibles cannot be emitted from \
3452                 persistent tasks{transient_reason}",
3453            )
3454        }
3455    }
3456}
3457
3458impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3459    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3460        self.0.startup(turbo_tasks);
3461    }
3462
3463    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3464        self.0.stopping();
3465    }
3466
3467    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3468        self.0.stop(turbo_tasks);
3469    }
3470
3471    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3472        self.0.idle_start(turbo_tasks);
3473    }
3474
3475    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3476        self.0.idle_end();
3477    }
3478
3479    fn get_or_create_task(
3480        &self,
3481        native_fn: &'static NativeFunction,
3482        this: Option<RawVc>,
3483        arg: &mut dyn StackDynTaskInputs,
3484        parent_task: Option<TaskId>,
3485        persistence: TaskPersistence,
3486        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3487    ) -> TaskId {
3488        self.0
3489            .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3490    }
3491
3492    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3493        self.0.invalidate_task(task_id, turbo_tasks);
3494    }
3495
3496    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3497        self.0.invalidate_tasks(tasks, turbo_tasks);
3498    }
3499
3500    fn invalidate_tasks_set(
3501        &self,
3502        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3503        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3504    ) {
3505        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3506    }
3507
3508    fn invalidate_serialization(
3509        &self,
3510        task_id: TaskId,
3511        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3512    ) {
3513        self.0.invalidate_serialization(task_id, turbo_tasks);
3514    }
3515
3516    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3517        self.0.task_execution_canceled(task, turbo_tasks)
3518    }
3519
3520    fn try_start_task_execution(
3521        &self,
3522        task_id: TaskId,
3523        priority: TaskPriority,
3524        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3525    ) -> Option<TaskExecutionSpec<'_>> {
3526        self.0
3527            .try_start_task_execution(task_id, priority, turbo_tasks)
3528    }
3529
3530    fn task_execution_completed(
3531        &self,
3532        task_id: TaskId,
3533        result: Result<RawVc, TurboTasksExecutionError>,
3534        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3535        #[cfg(feature = "verify_determinism")] stateful: bool,
3536        has_invalidator: bool,
3537        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3538    ) -> Option<TaskPriority> {
3539        self.0.task_execution_completed(
3540            task_id,
3541            result,
3542            cell_counters,
3543            #[cfg(feature = "verify_determinism")]
3544            stateful,
3545            has_invalidator,
3546            turbo_tasks,
3547        )
3548    }
3549
3550    type BackendJob = TurboTasksBackendJob;
3551
3552    fn run_backend_job<'a>(
3553        &'a self,
3554        job: Self::BackendJob,
3555        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3556    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3557        self.0.run_backend_job(job, turbo_tasks)
3558    }
3559
3560    fn try_read_task_output(
3561        &self,
3562        task_id: TaskId,
3563        reader: Option<TaskId>,
3564        options: ReadOutputOptions,
3565        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3566    ) -> Result<Result<RawVc, EventListener>> {
3567        self.0
3568            .try_read_task_output(task_id, reader, options, turbo_tasks)
3569    }
3570
3571    fn try_read_task_cell(
3572        &self,
3573        task_id: TaskId,
3574        cell: CellId,
3575        reader: Option<TaskId>,
3576        options: ReadCellOptions,
3577        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3578    ) -> Result<Result<TypedCellContent, EventListener>> {
3579        self.0
3580            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3581    }
3582
3583    fn try_read_own_task_cell(
3584        &self,
3585        task_id: TaskId,
3586        cell: CellId,
3587        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3588    ) -> Result<TypedCellContent> {
3589        self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3590    }
3591
3592    fn read_task_collectibles(
3593        &self,
3594        task_id: TaskId,
3595        collectible_type: TraitTypeId,
3596        reader: Option<TaskId>,
3597        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3598    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3599        self.0
3600            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3601    }
3602
3603    fn emit_collectible(
3604        &self,
3605        collectible_type: TraitTypeId,
3606        collectible: RawVc,
3607        task_id: TaskId,
3608        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3609    ) {
3610        self.0
3611            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3612    }
3613
3614    fn unemit_collectible(
3615        &self,
3616        collectible_type: TraitTypeId,
3617        collectible: RawVc,
3618        count: u32,
3619        task_id: TaskId,
3620        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3621    ) {
3622        self.0
3623            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3624    }
3625
3626    fn update_task_cell(
3627        &self,
3628        task_id: TaskId,
3629        cell: CellId,
3630        content: CellContent,
3631        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3632        content_hash: Option<CellHash>,
3633        verification_mode: VerificationMode,
3634        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3635    ) {
3636        self.0.update_task_cell(
3637            task_id,
3638            cell,
3639            content,
3640            updated_key_hashes,
3641            content_hash,
3642            verification_mode,
3643            turbo_tasks,
3644        );
3645    }
3646
3647    fn mark_own_task_as_finished(
3648        &self,
3649        task_id: TaskId,
3650        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3651    ) {
3652        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3653    }
3654
3655    fn connect_task(
3656        &self,
3657        task: TaskId,
3658        parent_task: Option<TaskId>,
3659        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3660    ) {
3661        self.0.connect_task(task, parent_task, turbo_tasks);
3662    }
3663
3664    fn create_transient_task(
3665        &self,
3666        task_type: TransientTaskType,
3667        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3668    ) -> TaskId {
3669        self.0.create_transient_task(task_type)
3670    }
3671
3672    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3673        self.0.dispose_root_task(task_id, turbo_tasks);
3674    }
3675
3676    fn task_statistics(&self) -> &TaskStatisticsApi {
3677        &self.0.task_statistics
3678    }
3679
3680    fn is_tracking_dependencies(&self) -> bool {
3681        self.0.options.dependency_tracking
3682    }
3683
3684    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3685        self.0.get_task_name(task, turbo_tasks)
3686    }
3687}
3688
3689enum DebugTraceTransientTask {
3690    Cached {
3691        task_name: &'static str,
3692        cell_type_id: Option<ValueTypeId>,
3693        cause_self: Option<Box<DebugTraceTransientTask>>,
3694        cause_args: Vec<DebugTraceTransientTask>,
3695    },
3696    /// This representation is used when this task is a duplicate of one previously shown
3697    Collapsed {
3698        task_name: &'static str,
3699        cell_type_id: Option<ValueTypeId>,
3700    },
3701    Uncached {
3702        cell_type_id: Option<ValueTypeId>,
3703    },
3704}
3705
3706impl DebugTraceTransientTask {
3707    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3708        let indent = "    ".repeat(level);
3709        f.write_str(&indent)?;
3710
3711        fn fmt_cell_type_id(
3712            f: &mut fmt::Formatter<'_>,
3713            cell_type_id: Option<ValueTypeId>,
3714        ) -> fmt::Result {
3715            if let Some(ty) = cell_type_id {
3716                write!(
3717                    f,
3718                    " (read cell of type {})",
3719                    get_value_type(ty).ty.global_name
3720                )
3721            } else {
3722                Ok(())
3723            }
3724        }
3725
3726        // write the name and type
3727        match self {
3728            Self::Cached {
3729                task_name,
3730                cell_type_id,
3731                ..
3732            }
3733            | Self::Collapsed {
3734                task_name,
3735                cell_type_id,
3736                ..
3737            } => {
3738                f.write_str(task_name)?;
3739                fmt_cell_type_id(f, *cell_type_id)?;
3740                if matches!(self, Self::Collapsed { .. }) {
3741                    f.write_str(" (collapsed)")?;
3742                }
3743            }
3744            Self::Uncached { cell_type_id } => {
3745                f.write_str("unknown transient task")?;
3746                fmt_cell_type_id(f, *cell_type_id)?;
3747            }
3748        }
3749        f.write_char('\n')?;
3750
3751        // write any extra "cause" information we might have
3752        if let Self::Cached {
3753            cause_self,
3754            cause_args,
3755            ..
3756        } = self
3757        {
3758            if let Some(c) = cause_self {
3759                writeln!(f, "{indent}  self:")?;
3760                c.fmt_indented(f, level + 1)?;
3761            }
3762            if !cause_args.is_empty() {
3763                writeln!(f, "{indent}  args:")?;
3764                for c in cause_args {
3765                    c.fmt_indented(f, level + 1)?;
3766                }
3767            }
3768        }
3769        Ok(())
3770    }
3771}
3772
3773impl fmt::Display for DebugTraceTransientTask {
3774    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3775        self.fmt_indented(f, 0)
3776    }
3777}
3778
3779// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3780fn far_future() -> Instant {
3781    // Roughly 30 years from now.
3782    // API does not provide a way to obtain max `Instant`
3783    // or convert specific date in the future to instant.
3784    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3785    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3786}
3787
3788/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3789/// buffer.
3790/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3791/// resulting buffer sizes.
3792///
3793/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3794/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3795/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3796/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3797/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3798/// spurious `Result` threading throughout the encode path.
3799fn encode_task_data(
3800    task: TaskId,
3801    data: &TaskStorage,
3802    category: SpecificTaskDataCategory,
3803    scratch_buffer: &mut TurboBincodeBuffer,
3804) -> Result<TurboBincodeBuffer> {
3805    scratch_buffer.clear();
3806    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3807    data.encode(category, &mut encoder)?;
3808
3809    if cfg!(feature = "verify_serialization") {
3810        TaskStorage::new()
3811            .decode(
3812                category,
3813                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3814            )
3815            .with_context(|| {
3816                format!(
3817                    "expected to be able to decode serialized data for '{category:?}' information \
3818                     for {task}"
3819                )
3820            })?;
3821    }
3822    Ok(SmallVec::from_slice(scratch_buffer))
3823}