Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9    borrow::Cow,
10    fmt::{self, Write},
11    future::Future,
12    hash::BuildHasherDefault,
13    mem::take,
14    pin::Pin,
15    sync::{
16        Arc, LazyLock,
17        atomic::{AtomicBool, Ordering},
18    },
19    time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32    CellId, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions,
33    ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, TaskExecutionReason,
34    TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic,
35    ValueTypeId,
36    backend::{
37        Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38        TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40        VerificationMode,
41    },
42    event::{Event, EventDescription, EventListener},
43    macro_helpers::NativeFunction,
44    message_queue::{TimingEvent, TraceEvent},
45    registry::get_value_type,
46    scope::scope_and_block,
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51
52pub use self::{
53    operation::AnyOperation,
54    storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
55};
56#[cfg(feature = "trace_task_dirty")]
57use crate::backend::operation::TaskDirtyCause;
58use crate::{
59    backend::{
60        operation::{
61            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64            connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65            prepare_new_children,
66        },
67        snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68        storage::Storage,
69        storage_schema::{TaskStorage, TaskStorageAccessors},
70    },
71    backing_storage::{BackingStorage, SnapshotItem, SnapshotMeta, compute_task_type_hash},
72    data::{
73        ActivenessState, CellDependency, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
74        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, TransientTask,
75    },
76    error::TaskError,
77    utils::{
78        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79        shard_amount::compute_shard_amount,
80    },
81};
82
83/// Threshold for parallelizing making dependent tasks dirty.
84/// If the number of dependent tasks exceeds this threshold,
85/// the operation will be parallelized.
86const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88/// Configurable idle timeout for snapshot persistence.
89/// Defaults to 2 seconds if not set or if the value is invalid.
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92        .ok()
93        .and_then(|v| v.parse::<u64>().ok())
94        .map(Duration::from_millis)
95        .unwrap_or(Duration::from_secs(2))
96});
97
98/// Priority used to re-schedule a task that became stale during execution.
99///
100/// Stale tasks must run again, but at a priority that reflects why they're being re-run rather
101/// than the (likely higher) priority of the original schedule. We use invalidation priority
102/// based on the task's leaf distance, parented under either the task's current dirty priority
103/// or `leaf()` if it is no longer dirty.
104fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105    TaskPriority::invalidation(
106        task.get_leaf_distance()
107            .copied()
108            .unwrap_or_default()
109            .distance,
110    )
111    .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115    /// Queries the storage for cache entries that don't exist locally.
116    ReadOnly,
117    /// Queries the storage for cache entries that don't exist locally.
118    /// Regularly pushes changes to the backing storage.
119    ReadWrite,
120    /// Queries the storage for cache entries that don't exist locally.
121    /// On shutdown, pushes all changes to the backing storage.
122    ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126    /// Enables dependency tracking.
127    ///
128    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
129    /// forever.
130    pub dependency_tracking: bool,
131
132    /// Enables active tracking.
133    ///
134    /// Automatically disabled when `dependency_tracking` is disabled.
135    ///
136    /// When disabled: All tasks are considered as active.
137    pub active_tracking: bool,
138
139    /// Enables the backing storage.
140    pub storage_mode: Option<StorageMode>,
141
142    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
143    /// datastructures. If `None`, it will use the available parallelism.
144    pub num_workers: Option<usize>,
145
146    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
147    pub small_preallocation: bool,
148
149    /// When enabled, evict all evictable tasks from in-memory storage after every snapshot.
150    /// This reclaims memory by clearing persisted data that can be re-loaded from disk on demand.
151    /// This is an EXPERIMENTAL FEATURE under development
152    pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156    fn default() -> Self {
157        Self {
158            dependency_tracking: true,
159            active_tracking: true,
160            storage_mode: Some(StorageMode::ReadWrite),
161            num_workers: None,
162            small_preallocation: false,
163            evict_after_snapshot: false,
164        }
165    }
166}
167
168pub enum TurboTasksBackendJob {
169    Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175    options: BackendOptions,
176
177    start_time: Instant,
178
179    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182    storage: Storage,
183
184    /// Coordinates the operation/snapshot interleaving protocol. See
185    /// [`SnapshotCoordinator`] for details.
186    snapshot_coord: SnapshotCoordinator,
187    /// Serializes calls to `snapshot_and_persist`. The coordinator's
188    /// `begin_snapshot` asserts that snapshots don't overlap; this mutex
189    /// enforces that contract for our two callers (background loop and
190    /// `stop_and_wait`).
191    snapshot_in_progress: Mutex<()>,
192
193    stopping: AtomicBool,
194    stopping_event: Event,
195    idle_start_event: Event,
196    idle_end_event: Event,
197    #[cfg(feature = "verify_aggregation_graph")]
198    is_idle: AtomicBool,
199
200    task_statistics: TaskStatisticsApi,
201
202    backing_storage: B,
203
204    #[cfg(feature = "verify_aggregation_graph")]
205    root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210        Self(Arc::new(TurboTasksBackendInner::new(
211            options,
212            backing_storage,
213        )))
214    }
215
216    pub fn backing_storage(&self) -> &B {
217        &self.0.backing_storage
218    }
219
220    /// Perform a snapshot and then evict all evictable tasks from memory.
221    ///
222    /// This is exposed for integration tests that need to verify the
223    /// snapshot → evict → restore cycle works correctly.
224    ///
225    /// Returns `(snapshot_had_new_data, eviction_counts)`.
226    pub fn snapshot_and_evict_for_testing(
227        &self,
228        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
229    ) -> (bool, EvictionCounts) {
230        self.0.snapshot_and_evict_for_testing(turbo_tasks)
231    }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237        if !options.dependency_tracking {
238            options.active_tracking = false;
239        }
240        let small_preallocation = options.small_preallocation;
241        let next_task_id = backing_storage
242            .next_free_task_id()
243            .expect("Failed to get task id");
244        Self {
245            options,
246            start_time: Instant::now(),
247            persisted_task_id_factory: IdFactoryWithReuse::new(
248                next_task_id,
249                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250            ),
251            transient_task_id_factory: IdFactoryWithReuse::new(
252                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253                TaskId::MAX,
254            ),
255            storage: Storage::new(shard_amount, small_preallocation),
256            snapshot_coord: SnapshotCoordinator::new(),
257            snapshot_in_progress: Mutex::new(()),
258            stopping: AtomicBool::new(false),
259            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262            #[cfg(feature = "verify_aggregation_graph")]
263            is_idle: AtomicBool::new(false),
264            task_statistics: TaskStatisticsApi::default(),
265            backing_storage,
266            #[cfg(feature = "verify_aggregation_graph")]
267            root_tasks: Default::default(),
268        }
269    }
270
271    fn execute_context<'a>(
272        &'a self,
273        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
274    ) -> impl ExecuteContext<'a> {
275        ExecuteContextImpl::new(self, turbo_tasks)
276    }
277
278    fn suspending_requested(&self) -> bool {
279        self.should_persist() && self.snapshot_coord.snapshot_pending()
280    }
281
282    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283        if self.should_persist() {
284            self.snapshot_coord.suspend_point(suspend);
285        }
286    }
287
288    pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289        if !self.should_persist() {
290            return OperationGuard::noop();
291        }
292        self.snapshot_coord.begin_operation()
293    }
294
295    fn should_persist(&self) -> bool {
296        matches!(
297            self.options.storage_mode,
298            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299        )
300    }
301
302    fn should_evict(&self) -> bool {
303        self.options.evict_after_snapshot && self.should_persist()
304    }
305
306    /// Perform a snapshot and then evict all evictable tasks from memory.
307    ///
308    /// This is exposed for integration tests that need to verify the
309    /// snapshot → evict → restore cycle works correctly.
310    ///
311    /// Returns `(snapshot_had_new_data, eviction_counts)`.
312    #[doc(hidden)]
313    pub fn snapshot_and_evict_for_testing(
314        &self,
315        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
316    ) -> (bool, EvictionCounts) {
317        assert!(
318            self.should_persist(),
319            "snapshot_and_evict requires persistence"
320        );
321        let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322        let had_new_data = match snapshot_result {
323            Ok((_, new_data)) => new_data,
324            Err(_) => {
325                // Snapshot/persist failed — skip eviction since the data may not
326                // be on disk yet. Evicting now could lose in-memory state that
327                // can't be restored.
328                return (false, EvictionCounts::default());
329            }
330        };
331        let counts = self.storage.evict_after_snapshot(None);
332        (had_new_data, counts)
333    }
334
335    fn should_restore(&self) -> bool {
336        self.options.storage_mode.is_some()
337    }
338
339    fn should_track_dependencies(&self) -> bool {
340        self.options.dependency_tracking
341    }
342
343    fn should_track_activeness(&self) -> bool {
344        self.options.active_tracking
345    }
346
347    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348        self.task_statistics
349            .map(|stats| stats.increment_cache_hit(native_fn));
350    }
351
352    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353        self.task_statistics
354            .map(|stats| stats.increment_cache_miss(native_fn));
355    }
356
357    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
358    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
359    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
360    /// references for lazy name resolution.
361    fn task_error_to_turbo_tasks_execution_error(
362        &self,
363        error: &TaskError,
364        ctx: &mut impl ExecuteContext<'_>,
365    ) -> TurboTasksExecutionError {
366        match error {
367            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369                message: item.message.clone(),
370                source: item
371                    .source
372                    .as_ref()
373                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374            })),
375            TaskError::LocalTaskContext(local_task_context) => {
376                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377                    name: local_task_context.name.clone(),
378                    source: local_task_context
379                        .source
380                        .as_ref()
381                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382                }))
383            }
384            TaskError::TaskChain(chain) => {
385                let task_id = chain.last().unwrap();
386                let error = {
387                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
388                    if let Some(OutputValue::Error(error)) = task.get_output() {
389                        Some(error.clone())
390                    } else {
391                        None
392                    }
393                };
394                let error = error.map_or_else(
395                    || {
396                        // Eventual consistency will cause errors to no longer be available
397                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399                                "Error no longer available",
400                            )),
401                            location: None,
402                        }))
403                    },
404                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405                );
406                let mut current_error = error;
407                for &task_id in chain.iter().rev() {
408                    current_error =
409                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410                            task_id,
411                            source: Some(current_error),
412                            turbo_tasks: ctx.turbo_tasks(),
413                        }));
414                }
415                current_error
416            }
417        }
418    }
419}
420
421/// Intermediate result of step 1 of task execution completion.
422struct TaskExecutionCompletePrepareResult {
423    pub new_children: FxHashSet<TaskId>,
424    pub is_now_immutable: bool,
425    #[cfg(feature = "verify_determinism")]
426    pub no_output_set: bool,
427    pub new_output: Option<OutputValue>,
428    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
429    pub is_recomputation: bool,
430    pub is_session_dependent: bool,
431}
432
433// Operations
434impl<B: BackingStorage> TurboTasksBackendInner<B> {
435    fn try_read_task_output(
436        self: &Arc<Self>,
437        task_id: TaskId,
438        reader: Option<TaskId>,
439        options: ReadOutputOptions,
440        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
441    ) -> Result<Result<RawVc, EventListener>> {
442        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
443
444        let mut ctx = self.execute_context(turbo_tasks);
445        let need_reader_task = if self.should_track_dependencies()
446            && !matches!(options.tracking, ReadTracking::Untracked)
447            && reader.is_some_and(|reader_id| reader_id != task_id)
448            && let Some(reader_id) = reader
449            && reader_id != task_id
450        {
451            Some(reader_id)
452        } else {
453            None
454        };
455        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
456            // Having a task_pair here is not optimal, but otherwise this would lead to a race
457            // condition. See below.
458            // TODO(sokra): solve that in a more performant way.
459            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
460            (task, Some(reader))
461        } else {
462            (ctx.task(task_id, TaskDataCategory::All), None)
463        };
464
465        fn listen_to_done_event(
466            reader_description: Option<EventDescription>,
467            tracking: ReadTracking,
468            done_event: &Event,
469        ) -> EventListener {
470            done_event.listen_with_note(move || {
471                move || {
472                    if let Some(reader_description) = reader_description.as_ref() {
473                        format!(
474                            "try_read_task_output from {} ({})",
475                            reader_description, tracking
476                        )
477                    } else {
478                        format!("try_read_task_output ({})", tracking)
479                    }
480                }
481            })
482        }
483
484        fn check_in_progress(
485            task: &impl TaskGuard,
486            reader_description: Option<EventDescription>,
487            tracking: ReadTracking,
488        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
489        {
490            match task.get_in_progress() {
491                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
492                    listen_to_done_event(reader_description, tracking, done_event),
493                ))),
494                Some(InProgressState::InProgress(box InProgressStateInner {
495                    done_event, ..
496                })) => Some(Ok(Err(listen_to_done_event(
497                    reader_description,
498                    tracking,
499                    done_event,
500                )))),
501                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
502                    "{} was canceled",
503                    task.get_task_description()
504                ))),
505                None => None,
506            }
507        }
508
509        if matches!(options.consistency, ReadConsistency::Strong) {
510            if task
511                .get_persistent_task_type()
512                .is_some_and(|t| !t.native_fn.is_root)
513            {
514                drop(task);
515                drop(reader_task);
516                panic!(
517                    "Strongly consistent read of non-root task {} (reader: {}). The `root` \
518                     attribute is missing on the task.",
519                    self.debug_get_task_description(task_id),
520                    reader.map_or_else(
521                        || "unknown".to_string(),
522                        |r| self.debug_get_task_description(r)
523                    )
524                );
525            }
526
527            let is_dirty = task.is_dirty();
528
529            // Check the dirty count of the root node
530            let has_dirty_containers = task.has_dirty_containers();
531            if has_dirty_containers || is_dirty.is_some() {
532                let activeness = task.get_activeness_mut();
533                let mut task_ids_to_schedule: Vec<_> = Vec::new();
534                // When there are dirty task, subscribe to the all_clean_event
535                let activeness = if let Some(activeness) = activeness {
536                    // This makes sure all tasks stay active and this task won't stale.
537                    // active_until_clean is automatically removed when this
538                    // task is clean.
539                    activeness.set_active_until_clean();
540                    activeness
541                } else {
542                    // If we don't have a root state, add one. This also makes sure all tasks stay
543                    // active and this task won't stale. active_until_clean
544                    // is automatically removed when this task is clean.
545                    if ctx.should_track_activeness() {
546                        // A newly added Activeness need to make sure to schedule the tasks
547                        task_ids_to_schedule = task.dirty_containers().collect();
548                        task_ids_to_schedule.push(task_id);
549                    }
550                    let activeness =
551                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
552                    activeness.set_active_until_clean();
553                    activeness
554                };
555                let listener = activeness.all_clean_event.listen_with_note(move || {
556                    let this = self.clone();
557                    let tt = turbo_tasks.pin();
558                    move || {
559                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
560                        let mut ctx = this.execute_context(tt);
561                        let mut visited = FxHashSet::default();
562                        fn indent(s: &str) -> String {
563                            s.split_inclusive('\n')
564                                .flat_map(|line: &str| ["  ", line].into_iter())
565                                .collect::<String>()
566                        }
567                        fn get_info(
568                            ctx: &mut impl ExecuteContext<'_>,
569                            task_id: TaskId,
570                            parent_and_count: Option<(TaskId, i32)>,
571                            visited: &mut FxHashSet<TaskId>,
572                        ) -> String {
573                            let task = ctx.task(task_id, TaskDataCategory::All);
574                            let is_dirty = task.is_dirty();
575                            let in_progress =
576                                task.get_in_progress()
577                                    .map_or("not in progress", |p| match p {
578                                        InProgressState::InProgress(_) => "in progress",
579                                        InProgressState::Scheduled { .. } => "scheduled",
580                                        InProgressState::Canceled => "canceled",
581                                    });
582                            let activeness = task.get_activeness().map_or_else(
583                                || "not active".to_string(),
584                                |activeness| format!("{activeness:?}"),
585                            );
586                            let aggregation_number = get_aggregation_number(&task);
587                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
588                            {
589                                let uppers = get_uppers(&task);
590                                !uppers.contains(&parent_task_id)
591                            } else {
592                                false
593                            };
594
595                            // Check the dirty count of the root node
596                            let has_dirty_containers = task.has_dirty_containers();
597
598                            let task_description = task.get_task_description();
599                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
600                                format!(", dirty({parent_priority})")
601                            } else {
602                                String::new()
603                            };
604                            let has_dirty_containers_label = if has_dirty_containers {
605                                ", dirty containers"
606                            } else {
607                                ""
608                            };
609                            let count = if let Some((_, count)) = parent_and_count {
610                                format!(" {count}")
611                            } else {
612                                String::new()
613                            };
614                            let mut info = format!(
615                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
616                                 {in_progress}, \
617                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
618                            );
619                            let children: Vec<_> = task.dirty_containers_with_count().collect();
620                            drop(task);
621
622                            if missing_upper {
623                                info.push_str("\n  ERROR: missing upper connection");
624                            }
625
626                            if has_dirty_containers || !children.is_empty() {
627                                writeln!(info, "\n  dirty tasks:").unwrap();
628
629                                for (child_task_id, count) in children {
630                                    let task_description = ctx
631                                        .task(child_task_id, TaskDataCategory::Data)
632                                        .get_task_description();
633                                    if visited.insert(child_task_id) {
634                                        let child_info = get_info(
635                                            ctx,
636                                            child_task_id,
637                                            Some((task_id, count)),
638                                            visited,
639                                        );
640                                        info.push_str(&indent(&child_info));
641                                        if !info.ends_with('\n') {
642                                            info.push('\n');
643                                        }
644                                    } else {
645                                        writeln!(
646                                            info,
647                                            "  {child_task_id} {task_description} {count} \
648                                             (already visited)"
649                                        )
650                                        .unwrap();
651                                    }
652                                }
653                            }
654                            info
655                        }
656                        let info = get_info(&mut ctx, task_id, None, &mut visited);
657                        format!(
658                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
659                        )
660                    }
661                });
662                drop(reader_task);
663                drop(task);
664                if !task_ids_to_schedule.is_empty() {
665                    let mut queue = AggregationUpdateQueue::new();
666                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
667                    queue.execute(&mut ctx);
668                }
669
670                return Ok(Err(listener));
671            }
672        }
673
674        let reader_description = reader_task
675            .as_ref()
676            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
677        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
678        {
679            return value;
680        }
681
682        if let Some(output) = task.get_output() {
683            let result = match output {
684                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
685                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
686                OutputValue::Error(error) => Err(error.clone()),
687            };
688            if let Some(mut reader_task) = reader_task.take()
689                && options.tracking.should_track(result.is_err())
690                && (!task.immutable() || cfg!(feature = "verify_immutable"))
691            {
692                #[cfg(feature = "trace_task_output_dependencies")]
693                let _span = tracing::trace_span!(
694                    "add output dependency",
695                    task = %task_id,
696                    dependent_task = ?reader
697                )
698                .entered();
699                let mut queue = LeafDistanceUpdateQueue::new();
700                let reader = reader.unwrap();
701                if task.add_output_dependent(reader) {
702                    // Ensure that dependent leaf distance is strictly monotonic increasing
703                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
704                    let reader_leaf_distance =
705                        reader_task.get_leaf_distance().copied().unwrap_or_default();
706                    if reader_leaf_distance.distance <= leaf_distance.distance {
707                        queue.push(
708                            reader,
709                            leaf_distance.distance,
710                            leaf_distance.max_distance_in_buffer,
711                        );
712                    }
713                }
714
715                drop(task);
716
717                // Note: We use `task_pair` earlier to lock the task and its reader at the same
718                // time. If we didn't and just locked the reader here, an invalidation could occur
719                // between grabbing the locks. If that happened, and if the task is "outdated" or
720                // doesn't have the dependency edge yet, the invalidation would be lost.
721
722                if !reader_task.remove_outdated_output_dependencies(&task_id) {
723                    let _ = reader_task.add_output_dependencies(task_id);
724                }
725                drop(reader_task);
726
727                queue.execute(&mut ctx);
728            } else {
729                drop(task);
730            }
731
732            return result.map_err(|error| {
733                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
734                    .with_task_context(task_id, turbo_tasks.pin())
735                    .into()
736            });
737        }
738        drop(reader_task);
739
740        let note = EventDescription::new(|| {
741            move || {
742                if let Some(reader) = reader_description.as_ref() {
743                    format!("try_read_task_output (recompute) from {reader}",)
744                } else {
745                    "try_read_task_output (recompute, untracked)".to_string()
746                }
747            }
748        });
749
750        // Output doesn't exist. We need to schedule the task to compute it.
751        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
752            TaskExecutionReason::OutputNotAvailable,
753            EventDescription::new(|| task.get_task_desc_fn()),
754            note,
755        );
756
757        // It's not possible that the task is InProgress at this point. If it is InProgress {
758        // done: true } it must have Output and would early return.
759        let old = task.set_in_progress(in_progress_state);
760        debug_assert!(old.is_none(), "InProgress already exists");
761        ctx.schedule_task(task, TaskPriority::Recomputation);
762
763        Ok(Err(listener))
764    }
765
766    fn try_read_task_cell(
767        &self,
768        task_id: TaskId,
769        reader: Option<TaskId>,
770        cell: CellId,
771        options: ReadCellOptions,
772        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
773    ) -> Result<Result<TypedCellContent, EventListener>> {
774        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
775
776        fn add_cell_dependency(
777            task_id: TaskId,
778            mut task: impl TaskGuard,
779            reader: Option<TaskId>,
780            reader_task: Option<impl TaskGuard>,
781            cell: CellId,
782            key: Option<u64>,
783        ) {
784            if let Some(mut reader_task) = reader_task
785                && (!task.immutable() || cfg!(feature = "verify_immutable"))
786            {
787                let reader = reader.unwrap();
788                let _ = task
789                    .add_cell_dependents(CellDependency::new(CellRef { task: reader, cell }, key));
790                drop(task);
791
792                // Note: We use `task_pair` earlier to lock the task and its reader at the same
793                // time. If we didn't and just locked the reader here, an invalidation could occur
794                // between grabbing the locks. If that happened, and if the task is "outdated" or
795                // doesn't have the dependency edge yet, the invalidation would be lost.
796
797                let target = CellRef {
798                    task: task_id,
799                    cell,
800                };
801                let dep = CellDependency::new(target, key);
802                if !reader_task.remove_outdated_cell_dependencies(&dep) {
803                    let _ = reader_task.add_cell_dependencies(dep);
804                }
805                drop(reader_task);
806            }
807        }
808
809        let ReadCellOptions {
810            tracking,
811            final_read_hint,
812        } = options;
813
814        let mut ctx = self.execute_context(turbo_tasks);
815        let (mut task, reader_task) = if self.should_track_dependencies()
816            && !matches!(tracking, ReadCellTracking::Untracked)
817            && let Some(reader_id) = reader
818            && reader_id != task_id
819        {
820            // Having a task_pair here is not optimal, but otherwise this would lead to a race
821            // condition. See below.
822            // TODO(sokra): solve that in a more performant way.
823            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
824            (task, Some(reader))
825        } else {
826            (ctx.task(task_id, TaskDataCategory::All), None)
827        };
828
829        let content = if final_read_hint {
830            task.remove_cell_data(&cell)
831        } else {
832            task.get_cell_data(&cell).cloned()
833        };
834        if let Some(content) = content {
835            if tracking.should_track(false) {
836                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
837            }
838            return Ok(Ok(TypedCellContent(
839                cell.type_id,
840                CellContent(Some(content)),
841            )));
842        }
843
844        let in_progress = task.get_in_progress();
845        if matches!(
846            in_progress,
847            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
848        ) {
849            return Ok(Err(self
850                .listen_to_cell(&mut task, task_id, &reader_task, cell)
851                .0));
852        }
853        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
854
855        // Check cell index range (cell might not exist at all)
856        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
857        let Some(max_id) = max_id else {
858            let task_desc = task.get_task_description();
859            if tracking.should_track(true) {
860                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
861            }
862            bail!(
863                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
864            );
865        };
866        if cell.index >= max_id {
867            let task_desc = task.get_task_description();
868            if tracking.should_track(true) {
869                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
870            }
871            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
872        }
873
874        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
875        // task to get the cell content.
876
877        // Bail early if the task was cancelled — no point in registering a listener
878        // on a task that won't execute again.
879        if is_cancelled {
880            bail!("{} was canceled", task.get_task_description());
881        }
882
883        // Listen to the cell and potentially schedule the task
884        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
885        drop(reader_task);
886        if !new_listener {
887            return Ok(Err(listener));
888        }
889
890        let _span = tracing::trace_span!(
891            "recomputation",
892            cell_type = get_value_type(cell.type_id).ty.global_name,
893            cell_index = cell.index
894        )
895        .entered();
896
897        let _ = task.add_scheduled(
898            TaskExecutionReason::CellNotAvailable,
899            EventDescription::new(|| task.get_task_desc_fn()),
900        );
901        ctx.schedule_task(task, TaskPriority::Recomputation);
902
903        Ok(Err(listener))
904    }
905
906    fn listen_to_cell(
907        &self,
908        task: &mut impl TaskGuard,
909        task_id: TaskId,
910        reader_task: &Option<impl TaskGuard>,
911        cell: CellId,
912    ) -> (EventListener, bool) {
913        let note = || {
914            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
915            move || {
916                if let Some(reader_desc) = reader_desc.as_ref() {
917                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
918                } else {
919                    "try_read_task_cell (in progress, untracked)".to_string()
920                }
921            }
922        };
923        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
924            // Someone else is already computing the cell
925            let listener = in_progress.event.listen_with_note(note);
926            return (listener, false);
927        }
928        let in_progress = InProgressCellState::new(task_id, cell);
929        let listener = in_progress.event.listen_with_note(note);
930        let old = task.insert_in_progress_cells(cell, in_progress);
931        debug_assert!(old.is_none(), "InProgressCell already exists");
932        (listener, true)
933    }
934
935    fn snapshot_and_persist(
936        &self,
937        parent_span: Option<tracing::Id>,
938        reason: &str,
939        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
940    ) -> Result<(Instant, bool), anyhow::Error> {
941        let snapshot_span =
942            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
943                .entered();
944        // Serialize snapshots. The internal protocol (snapshot_mode, snapshot
945        // request bit, suspended_operations) assumes only one snapshot runs at
946        // a time. Held for the entire snapshot lifecycle.
947        let _snapshot_in_progress = self.snapshot_in_progress.lock();
948        let start = Instant::now();
949        // SystemTime for wall-clock timestamps in trace events (milliseconds
950        // since epoch). Instant is monotonic but has no defined epoch, so it
951        // can't be used for cross-process trace correlation.
952        let wall_start = SystemTime::now();
953        debug_assert!(self.should_persist());
954
955        let mut snapshot_phase = {
956            let _span = tracing::info_span!("blocking").entered();
957            self.snapshot_coord.begin_snapshot()
958        };
959        // Enter snapshot mode, which atomically reads and resets the modified count.
960        // Checking after start_snapshot ensures no concurrent increments can race.
961        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
962
963        let suspended_operations = snapshot_phase.take_suspended_operations();
964
965        let snapshot_time = Instant::now();
966        drop(snapshot_phase);
967
968        if !has_modifications {
969            // No tasks modified since the last snapshot — drop the guard (which
970            // calls end_snapshot) and skip the expensive O(N) scan.
971            drop(snapshot_guard);
972            return Ok((start, false));
973        }
974
975        #[cfg(feature = "print_cache_item_size")]
976        #[derive(Default)]
977        struct TaskCacheStats {
978            data: usize,
979            #[cfg(feature = "print_cache_item_size_with_compressed")]
980            data_compressed: usize,
981            data_count: usize,
982            meta: usize,
983            #[cfg(feature = "print_cache_item_size_with_compressed")]
984            meta_compressed: usize,
985            meta_count: usize,
986            upper_count: usize,
987            collectibles_count: usize,
988            aggregated_collectibles_count: usize,
989            children_count: usize,
990            followers_count: usize,
991            collectibles_dependents_count: usize,
992            aggregated_dirty_containers_count: usize,
993            output_size: usize,
994        }
995        /// Formats a byte size, optionally including the compressed size when the
996        /// `print_cache_item_size_with_compressed` feature is enabled.
997        #[cfg(feature = "print_cache_item_size")]
998        struct FormatSizes {
999            size: usize,
1000            #[cfg(feature = "print_cache_item_size_with_compressed")]
1001            compressed_size: usize,
1002        }
1003        #[cfg(feature = "print_cache_item_size")]
1004        impl std::fmt::Display for FormatSizes {
1005            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1006                use turbo_tasks::util::FormatBytes;
1007                #[cfg(feature = "print_cache_item_size_with_compressed")]
1008                {
1009                    write!(
1010                        f,
1011                        "{} ({} compressed)",
1012                        FormatBytes(self.size),
1013                        FormatBytes(self.compressed_size)
1014                    )
1015                }
1016                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1017                {
1018                    write!(f, "{}", FormatBytes(self.size))
1019                }
1020            }
1021        }
1022        #[cfg(feature = "print_cache_item_size")]
1023        impl TaskCacheStats {
1024            #[cfg(feature = "print_cache_item_size_with_compressed")]
1025            fn compressed_size(data: &[u8]) -> Result<usize> {
1026                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1027                    data,
1028                    &mut Vec::new(),
1029                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1030                )?)
1031            }
1032
1033            fn add_data(&mut self, data: &[u8]) {
1034                self.data += data.len();
1035                #[cfg(feature = "print_cache_item_size_with_compressed")]
1036                {
1037                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1038                }
1039                self.data_count += 1;
1040            }
1041
1042            fn add_meta(&mut self, data: &[u8]) {
1043                self.meta += data.len();
1044                #[cfg(feature = "print_cache_item_size_with_compressed")]
1045                {
1046                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1047                }
1048                self.meta_count += 1;
1049            }
1050
1051            fn add_counts(&mut self, storage: &TaskStorage) {
1052                let counts = storage.meta_counts();
1053                self.upper_count += counts.upper;
1054                self.collectibles_count += counts.collectibles;
1055                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1056                self.children_count += counts.children;
1057                self.followers_count += counts.followers;
1058                self.collectibles_dependents_count += counts.collectibles_dependents;
1059                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1060                if let Some(output) = storage.get_output() {
1061                    use turbo_bincode::turbo_bincode_encode;
1062
1063                    self.output_size += turbo_bincode_encode(&output)
1064                        .map(|data| data.len())
1065                        .unwrap_or(0);
1066                }
1067            }
1068
1069            /// Returns the task name used as the stats grouping key.
1070            fn task_name(storage: &TaskStorage) -> String {
1071                storage
1072                    .get_persistent_task_type()
1073                    .map(|t| t.to_string())
1074                    .unwrap_or_else(|| "<unknown>".to_string())
1075            }
1076
1077            /// Returns the primary sort key: compressed total when
1078            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1079            fn sort_key(&self) -> usize {
1080                #[cfg(feature = "print_cache_item_size_with_compressed")]
1081                {
1082                    self.data_compressed + self.meta_compressed
1083                }
1084                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1085                {
1086                    self.data + self.meta
1087                }
1088            }
1089
1090            fn format_total(&self) -> FormatSizes {
1091                FormatSizes {
1092                    size: self.data + self.meta,
1093                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1094                    compressed_size: self.data_compressed + self.meta_compressed,
1095                }
1096            }
1097
1098            fn format_data(&self) -> FormatSizes {
1099                FormatSizes {
1100                    size: self.data,
1101                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1102                    compressed_size: self.data_compressed,
1103                }
1104            }
1105
1106            fn format_avg_data(&self) -> FormatSizes {
1107                FormatSizes {
1108                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1109                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1110                    compressed_size: self
1111                        .data_compressed
1112                        .checked_div(self.data_count)
1113                        .unwrap_or(0),
1114                }
1115            }
1116
1117            fn format_meta(&self) -> FormatSizes {
1118                FormatSizes {
1119                    size: self.meta,
1120                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1121                    compressed_size: self.meta_compressed,
1122                }
1123            }
1124
1125            fn format_avg_meta(&self) -> FormatSizes {
1126                FormatSizes {
1127                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1128                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1129                    compressed_size: self
1130                        .meta_compressed
1131                        .checked_div(self.meta_count)
1132                        .unwrap_or(0),
1133                }
1134            }
1135        }
1136        #[cfg(feature = "print_cache_item_size")]
1137        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1138            Mutex::new(FxHashMap::default());
1139
1140        // Encode each task's modified categories. We only encode categories with `modified` set,
1141        // meaning the category was actually dirtied. Categories restored from disk but never
1142        // modified don't need re-persisting since the on-disk version is still valid.
1143        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1144        // flags were copied from the live task at snapshot creation time, reflecting which
1145        // categories were dirtied before the snapshot was taken.
1146        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1147            let encode_category = |task_id: TaskId,
1148                                   data: &TaskStorage,
1149                                   category: SpecificTaskDataCategory,
1150                                   buffer: &mut TurboBincodeBuffer|
1151             -> Option<TurboBincodeBuffer> {
1152                match encode_task_data(task_id, data, category, buffer) {
1153                    Ok(encoded) => {
1154                        #[cfg(feature = "print_cache_item_size")]
1155                        {
1156                            let mut stats = task_cache_stats.lock();
1157                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1158                            match category {
1159                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1160                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1161                            }
1162                        }
1163                        Some(encoded)
1164                    }
1165                    Err(err) => {
1166                        panic!(
1167                            "Serializing task {} failed ({:?}): {:?}",
1168                            self.debug_get_task_description(task_id),
1169                            category,
1170                            err
1171                        );
1172                    }
1173                }
1174            };
1175            if task_id.is_transient() {
1176                unreachable!("transient task_ids should never be enqueued to be persisted");
1177            }
1178
1179            let encode_meta = inner.flags.meta_modified();
1180            let encode_data = inner.flags.data_modified();
1181
1182            #[cfg(feature = "print_cache_item_size")]
1183            if encode_data || encode_meta {
1184                task_cache_stats
1185                    .lock()
1186                    .entry(TaskCacheStats::task_name(inner))
1187                    .or_default()
1188                    .add_counts(inner);
1189            }
1190
1191            let meta = if encode_meta {
1192                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1193            } else {
1194                None
1195            };
1196
1197            let data = if encode_data {
1198                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1199            } else {
1200                None
1201            };
1202            let task_type_hash = if inner.flags.new_task() {
1203                let task_type = inner.get_persistent_task_type().expect(
1204                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1205                     creation for persistent tasks uses a single ExecutionContextImpl for \
1206                     creating the task (which sets new_task) and connect_child (which sets \
1207                     persistent_task_type) and take_snapshot waits for all operations to complete \
1208                     or suspend before we start snapshotting.  So task creation will always set \
1209                     the task_type.",
1210                );
1211                Some(compute_task_type_hash(task_type))
1212            } else {
1213                None
1214            };
1215
1216            SnapshotItem {
1217                task_id,
1218                meta,
1219                data,
1220                task_type_hash,
1221            }
1222        };
1223
1224        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1225
1226        drop(snapshot_span);
1227        let snapshot_duration = start.elapsed();
1228        let task_count = task_snapshots.len();
1229
1230        if task_snapshots.is_empty() {
1231            // This should be impossible — if we got here, modified_count was nonzero, and every
1232            // modification that increments the count also failed during encoding.
1233            std::hint::cold_path();
1234            return Ok((snapshot_time, false));
1235        }
1236
1237        let persist_start = Instant::now();
1238        let span = tracing::info_span!(
1239            parent: parent_span,
1240            "persist",
1241            reason = reason,
1242            data_items= tracing::field::Empty,
1243            meta_items= tracing::field::Empty,
1244            task_cache_items= tracing::field::Empty,
1245            next_task_id= tracing::field::Empty,)
1246        .entered();
1247        {
1248            // Tasks were already consumed by take_snapshot, so a future snapshot
1249            // would not re-persist them — returning an error signals to the caller
1250            // that further persist attempts would corrupt the task graph in storage.
1251            let SnapshotMeta {
1252                task_cache_items,
1253                data_items,
1254                meta_items,
1255                max_next_task_id,
1256            } = self
1257                .backing_storage
1258                .save_snapshot(suspended_operations, task_snapshots)?;
1259            span.record("data_items", data_items);
1260            span.record("meta_items", meta_items);
1261            span.record("task_cache_items", task_cache_items);
1262            span.record("next_task_id", max_next_task_id);
1263
1264            #[cfg(feature = "print_cache_item_size")]
1265            {
1266                let mut task_cache_stats = task_cache_stats
1267                    .into_inner()
1268                    .into_iter()
1269                    .collect::<Vec<_>>();
1270                if !task_cache_stats.is_empty() {
1271                    use turbo_tasks::util::FormatBytes;
1272
1273                    use crate::utils::markdown_table::print_markdown_table;
1274
1275                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1276                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1277                    });
1278
1279                    println!(
1280                        "Task cache stats: {}",
1281                        FormatSizes {
1282                            size: task_cache_stats
1283                                .iter()
1284                                .map(|(_, s)| s.data + s.meta)
1285                                .sum::<usize>(),
1286                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1287                            compressed_size: task_cache_stats
1288                                .iter()
1289                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1290                                .sum::<usize>()
1291                        },
1292                    );
1293
1294                    print_markdown_table(
1295                        [
1296                            "Task",
1297                            " Total Size",
1298                            " Data Size",
1299                            " Data Count x Avg",
1300                            " Data Count x Avg",
1301                            " Meta Size",
1302                            " Meta Count x Avg",
1303                            " Meta Count x Avg",
1304                            " Uppers",
1305                            " Coll",
1306                            " Agg Coll",
1307                            " Children",
1308                            " Followers",
1309                            " Coll Deps",
1310                            " Agg Dirty",
1311                            " Output Size",
1312                        ],
1313                        task_cache_stats.iter(),
1314                        |(task_desc, stats)| {
1315                            [
1316                                task_desc.to_string(),
1317                                format!(" {}", stats.format_total()),
1318                                format!(" {}", stats.format_data()),
1319                                format!(" {} x", stats.data_count),
1320                                format!("{}", stats.format_avg_data()),
1321                                format!(" {}", stats.format_meta()),
1322                                format!(" {} x", stats.meta_count),
1323                                format!("{}", stats.format_avg_meta()),
1324                                format!(" {}", stats.upper_count),
1325                                format!(" {}", stats.collectibles_count),
1326                                format!(" {}", stats.aggregated_collectibles_count),
1327                                format!(" {}", stats.children_count),
1328                                format!(" {}", stats.followers_count),
1329                                format!(" {}", stats.collectibles_dependents_count),
1330                                format!(" {}", stats.aggregated_dirty_containers_count),
1331                                format!(" {}", FormatBytes(stats.output_size)),
1332                            ]
1333                        },
1334                    );
1335                }
1336            }
1337        }
1338
1339        let elapsed = start.elapsed();
1340        let persist_duration = persist_start.elapsed();
1341        // avoid spamming the event queue with information about fast operations
1342        if elapsed > Duration::from_secs(10) {
1343            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1344                "Finished writing to filesystem cache".to_string(),
1345                elapsed,
1346            )));
1347        }
1348
1349        let wall_start_ms = wall_start
1350            .duration_since(SystemTime::UNIX_EPOCH)
1351            .unwrap_or_default()
1352            // as_millis_f64 is not stable yet
1353            .as_secs_f64()
1354            * 1000.0;
1355        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1356        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1357            "turbopack-persistence",
1358            wall_start_ms,
1359            wall_end_ms,
1360            vec![
1361                ("reason", serde_json::Value::from(reason)),
1362                (
1363                    "snapshot_duration_ms",
1364                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1365                ),
1366                (
1367                    "persist_duration_ms",
1368                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1369                ),
1370                ("task_count", serde_json::Value::from(task_count)),
1371            ],
1372        )));
1373
1374        Ok((snapshot_time, true))
1375    }
1376
1377    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1378        if self.should_restore() {
1379            // Continue all uncompleted operations
1380            // They can't be interrupted by a snapshot since the snapshotting job has not been
1381            // scheduled yet.
1382            let uncompleted_operations = self
1383                .backing_storage
1384                .uncompleted_operations()
1385                .expect("Failed to get uncompleted operations");
1386            if !uncompleted_operations.is_empty() {
1387                let mut ctx = self.execute_context(turbo_tasks);
1388                for op in uncompleted_operations {
1389                    op.execute(&mut ctx);
1390                }
1391            }
1392        }
1393
1394        // Only when it should write regularly to the storage, we schedule the initial snapshot
1395        // job.
1396        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1397            // Schedule the snapshot job
1398            let _span = trace_span!("persisting background job").entered();
1399            let _span = tracing::info_span!("thread").entered();
1400            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1401        }
1402    }
1403
1404    fn stopping(&self) {
1405        self.stopping.store(true, Ordering::Release);
1406        self.stopping_event.notify(usize::MAX);
1407    }
1408
1409    #[allow(unused_variables)]
1410    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1411        #[cfg(feature = "verify_aggregation_graph")]
1412        {
1413            self.is_idle.store(false, Ordering::Release);
1414            self.verify_aggregation_graph(turbo_tasks, false);
1415        }
1416        if self.should_persist()
1417            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1418        {
1419            eprintln!("Persisting failed during shutdown: {err:?}");
1420        }
1421        self.storage.drop_contents();
1422        if let Err(err) = self.backing_storage.shutdown() {
1423            println!("Shutting down failed: {err}");
1424        }
1425    }
1426
1427    #[allow(unused_variables)]
1428    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1429        self.idle_start_event.notify(usize::MAX);
1430
1431        #[cfg(feature = "verify_aggregation_graph")]
1432        {
1433            use tokio::select;
1434
1435            self.is_idle.store(true, Ordering::Release);
1436            let this = self.clone();
1437            let turbo_tasks = turbo_tasks.pin();
1438            tokio::task::spawn(async move {
1439                select! {
1440                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1441                        // do nothing
1442                    }
1443                    _ = this.idle_end_event.listen() => {
1444                        return;
1445                    }
1446                }
1447                if !this.is_idle.load(Ordering::Relaxed) {
1448                    return;
1449                }
1450                this.verify_aggregation_graph(&*turbo_tasks, true);
1451            });
1452        }
1453    }
1454
1455    fn idle_end(&self) {
1456        #[cfg(feature = "verify_aggregation_graph")]
1457        self.is_idle.store(false, Ordering::Release);
1458        self.idle_end_event.notify(usize::MAX);
1459    }
1460
1461    fn get_or_create_task(
1462        &self,
1463        native_fn: &'static NativeFunction,
1464        this: Option<RawVc>,
1465        arg: &mut dyn StackDynTaskInputs,
1466        parent_task: Option<TaskId>,
1467        persistence: TaskPersistence,
1468        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1469    ) -> TaskId {
1470        let transient = matches!(persistence, TaskPersistence::Transient);
1471
1472        if transient
1473            && let Some(parent_task) = parent_task
1474            && !parent_task.is_transient()
1475        {
1476            let task_type = CachedTaskType {
1477                native_fn,
1478                this,
1479                arg: arg.take_box(),
1480            };
1481            self.panic_persistent_calling_transient(
1482                self.debug_get_task_description(parent_task),
1483                Some(&task_type),
1484                /* cell_id */ None,
1485            );
1486        }
1487
1488        let is_root = native_fn.is_root;
1489
1490        // Compute hash and shard index once from borrowed components (no heap allocation).
1491        let arg_ref = arg.as_ref();
1492        let hash = CachedTaskType::hash_from_components(
1493            self.storage.task_cache.hasher(),
1494            native_fn,
1495            this,
1496            arg_ref,
1497        );
1498        // Locate the shard once so that the read-only lookup and any
1499        // write-lock retry below share the same reference (saves a modulo +
1500        // memory lookup on the miss path).
1501        let shard = get_shard(&self.storage.task_cache, hash);
1502
1503        let mut ctx = self.execute_context(turbo_tasks);
1504        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1505        // Use a read lock rather than a write lock to avoid contention. connect_child
1506        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1507        if let Some(task_id) =
1508            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1509        {
1510            self.track_cache_hit_by_fn(native_fn);
1511            operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1512            return task_id;
1513        }
1514
1515        // Step 2: Check backing storage using borrowed components (no box needed yet).
1516
1517        // Task exists in backing storage.
1518        // We only need to insert it into the in-memory cache.
1519        let task_id = if !transient
1520            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1521        {
1522            self.track_cache_hit_by_fn(native_fn);
1523            // Step 3a: Insert into in-memory cache using the pre-located shard.
1524            // Use the existing Arc from storage to avoid a duplicate allocation.
1525            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1526                k.eq_components(native_fn, this, arg_ref)
1527            }) {
1528                RawEntry::Occupied(_) => {}
1529                RawEntry::Vacant(e) => {
1530                    e.insert(stored_type, task_id);
1531                }
1532            };
1533            task_id
1534        } else {
1535            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1536                k.eq_components(native_fn, this, arg_ref)
1537            }) {
1538                RawEntry::Occupied(e) => {
1539                    // Another thread beat us to creating this task — use their task_id.
1540                    // They will handle logging the new task as modified.
1541                    let task_id = *e.get();
1542                    drop(e);
1543                    self.track_cache_hit_by_fn(native_fn);
1544                    task_id
1545                }
1546                RawEntry::Vacant(e) => {
1547                    // Only now do we force the allocation.
1548                    // NOTE: if our caller had to perform resolution, then this will have already
1549                    // been boxed and take_box just takes it.
1550                    let task_type = CachedTaskTypeArc::new(CachedTaskType {
1551                        native_fn,
1552                        this,
1553                        arg: arg.take_box(),
1554                    });
1555                    let task_id = if transient {
1556                        self.transient_task_id_factory.get()
1557                    } else {
1558                        self.persisted_task_id_factory.get()
1559                    };
1560                    // Initialize storage BEFORE making task_id visible in the cache.
1561                    // This ensures any thread that reads task_id from the cache sees
1562                    // the storage entry already initialized (restored flags set).
1563                    self.storage
1564                        .initialize_new_task(task_id, Some(task_type.clone()));
1565                    // insert() consumes e, releasing the shard write lock.
1566                    e.insert(task_type, task_id);
1567                    self.track_cache_miss_by_fn(native_fn);
1568                    // Update the aggregation number before connecting the child
1569                    // We don't need this on any of the task recovery paths above because the
1570                    // aggregation number will already be set.
1571                    if is_root {
1572                        AggregationUpdateQueue::run(
1573                            AggregationUpdateJob::UpdateAggregationNumber {
1574                                task_id,
1575                                base_aggregation_number: u32::MAX,
1576                                distance: None,
1577                            },
1578                            &mut ctx,
1579                        );
1580                    } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1581                        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1582                        AggregationUpdateQueue::run(
1583                            AggregationUpdateJob::UpdateAggregationNumber {
1584                                task_id,
1585                                base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1586                                distance: None,
1587                            },
1588                            &mut ctx,
1589                        );
1590                    };
1591
1592                    task_id
1593                }
1594            }
1595        };
1596
1597        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1598
1599        task_id
1600    }
1601
1602    /// Generate an object that implements [`fmt::Display`] explaining why the given
1603    /// [`CachedTaskType`] is transient.
1604    fn debug_trace_transient_task(
1605        &self,
1606        task_type: &CachedTaskType,
1607        cell_id: Option<CellId>,
1608    ) -> DebugTraceTransientTask {
1609        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1610        // from tracing the same task many times, so use a visited_set
1611        fn inner_id(
1612            backend: &TurboTasksBackendInner<impl BackingStorage>,
1613            task_id: TaskId,
1614            cell_type_id: Option<ValueTypeId>,
1615            visited_set: &mut FxHashSet<TaskId>,
1616        ) -> DebugTraceTransientTask {
1617            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1618                if visited_set.contains(&task_id) {
1619                    let task_name = task_type.get_name();
1620                    DebugTraceTransientTask::Collapsed {
1621                        task_name,
1622                        cell_type_id,
1623                    }
1624                } else {
1625                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1626                }
1627            } else {
1628                DebugTraceTransientTask::Uncached { cell_type_id }
1629            }
1630        }
1631        fn inner_cached(
1632            backend: &TurboTasksBackendInner<impl BackingStorage>,
1633            task_type: &CachedTaskType,
1634            cell_type_id: Option<ValueTypeId>,
1635            visited_set: &mut FxHashSet<TaskId>,
1636        ) -> DebugTraceTransientTask {
1637            let task_name = task_type.get_name();
1638
1639            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1640                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1641                    // `task_id` should never be `None` at this point, as that would imply a
1642                    // non-local task is returning a local `Vc`...
1643                    // Just ignore if it happens, as we're likely already panicking.
1644                    return None;
1645                };
1646                if task_id.is_transient() {
1647                    Some(Box::new(inner_id(
1648                        backend,
1649                        task_id,
1650                        cause_self_raw_vc.try_get_type_id(),
1651                        visited_set,
1652                    )))
1653                } else {
1654                    None
1655                }
1656            });
1657            let cause_args = task_type
1658                .arg
1659                .get_raw_vcs()
1660                .into_iter()
1661                .filter_map(|raw_vc| {
1662                    let Some(task_id) = raw_vc.try_get_task_id() else {
1663                        // `task_id` should never be `None` (see comment above)
1664                        return None;
1665                    };
1666                    if !task_id.is_transient() {
1667                        return None;
1668                    }
1669                    Some((task_id, raw_vc.try_get_type_id()))
1670                })
1671                .collect::<IndexSet<_>>() // dedupe
1672                .into_iter()
1673                .map(|(task_id, cell_type_id)| {
1674                    inner_id(backend, task_id, cell_type_id, visited_set)
1675                })
1676                .collect();
1677
1678            DebugTraceTransientTask::Cached {
1679                task_name,
1680                cell_type_id,
1681                cause_self,
1682                cause_args,
1683            }
1684        }
1685        inner_cached(
1686            self,
1687            task_type,
1688            cell_id.map(|c| c.type_id),
1689            &mut FxHashSet::default(),
1690        )
1691    }
1692
1693    fn invalidate_task(
1694        &self,
1695        task_id: TaskId,
1696        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1697    ) {
1698        if !self.should_track_dependencies() {
1699            panic!("Dependency tracking is disabled so invalidation is not allowed");
1700        }
1701        operation::InvalidateOperation::run(
1702            smallvec![task_id],
1703            #[cfg(feature = "trace_task_dirty")]
1704            TaskDirtyCause::Invalidator,
1705            self.execute_context(turbo_tasks),
1706        );
1707    }
1708
1709    fn invalidate_tasks(
1710        &self,
1711        tasks: &[TaskId],
1712        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1713    ) {
1714        if !self.should_track_dependencies() {
1715            panic!("Dependency tracking is disabled so invalidation is not allowed");
1716        }
1717        operation::InvalidateOperation::run(
1718            tasks.iter().copied().collect(),
1719            #[cfg(feature = "trace_task_dirty")]
1720            TaskDirtyCause::Unknown,
1721            self.execute_context(turbo_tasks),
1722        );
1723    }
1724
1725    fn invalidate_tasks_set(
1726        &self,
1727        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1728        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1729    ) {
1730        if !self.should_track_dependencies() {
1731            panic!("Dependency tracking is disabled so invalidation is not allowed");
1732        }
1733        operation::InvalidateOperation::run(
1734            tasks.iter().copied().collect(),
1735            #[cfg(feature = "trace_task_dirty")]
1736            TaskDirtyCause::Unknown,
1737            self.execute_context(turbo_tasks),
1738        );
1739    }
1740
1741    fn invalidate_serialization(
1742        &self,
1743        task_id: TaskId,
1744        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1745    ) {
1746        if task_id.is_transient() {
1747            return;
1748        }
1749        let mut ctx = self.execute_context(turbo_tasks);
1750        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1751        task.invalidate_serialization();
1752    }
1753
1754    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1755        let task = self.storage.access_mut(task_id);
1756        if let Some(value) = task.get_persistent_task_type() {
1757            format!("{task_id:?} {}", value)
1758        } else if let Some(value) = task.get_transient_task_type() {
1759            format!("{task_id:?} {}", value)
1760        } else {
1761            format!("{task_id:?} unknown")
1762        }
1763    }
1764
1765    fn get_task_name(
1766        &self,
1767        task_id: TaskId,
1768        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1769    ) -> String {
1770        let mut ctx = self.execute_context(turbo_tasks);
1771        let task = ctx.task(task_id, TaskDataCategory::Data);
1772        if let Some(value) = task.get_persistent_task_type() {
1773            value.to_string()
1774        } else if let Some(value) = task.get_transient_task_type() {
1775            value.to_string()
1776        } else {
1777            "unknown".to_string()
1778        }
1779    }
1780
1781    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1782        let task = self.storage.access_mut(task_id);
1783        task.get_persistent_task_type().cloned()
1784    }
1785
1786    fn task_execution_canceled(
1787        &self,
1788        task_id: TaskId,
1789        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1790    ) {
1791        let mut ctx = self.execute_context(turbo_tasks);
1792        let mut task = ctx.task(task_id, TaskDataCategory::All);
1793        if let Some(in_progress) = task.take_in_progress() {
1794            match in_progress {
1795                InProgressState::Scheduled {
1796                    done_event,
1797                    reason: _,
1798                } => done_event.notify(usize::MAX),
1799                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1800                    done_event.notify(usize::MAX)
1801                }
1802                InProgressState::Canceled => {}
1803            }
1804        }
1805        // Notify any readers waiting on in-progress cells so their listeners
1806        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1807        let in_progress_cells = task.take_in_progress_cells();
1808        if let Some(ref cells) = in_progress_cells {
1809            for state in cells.values() {
1810                state.event.notify(usize::MAX);
1811            }
1812        }
1813
1814        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1815        // in the next session. Without this, any reader that encounters the cancelled task
1816        // records an error in its output. That error is persisted and would poison
1817        // subsequent builds. By marking the task session-dependent dirty, the next build
1818        // re-executes it, which invalidates dependents and corrects the stale errors.
1819        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1820            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1821        } else {
1822            None
1823        };
1824
1825        let old = task.set_in_progress(InProgressState::Canceled);
1826        debug_assert!(old.is_none(), "InProgress already exists");
1827        drop(task);
1828
1829        if let Some(data_update) = data_update {
1830            AggregationUpdateQueue::run(data_update, &mut ctx);
1831        }
1832
1833        drop(in_progress_cells);
1834    }
1835
1836    fn try_start_task_execution(
1837        &self,
1838        task_id: TaskId,
1839        priority: TaskPriority,
1840        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1841    ) -> Option<TaskExecutionSpec<'_>> {
1842        let execution_reason;
1843        let task_type;
1844        {
1845            let mut ctx = self.execute_context(turbo_tasks);
1846            let mut task = ctx.task(task_id, TaskDataCategory::All);
1847            task_type = task.get_task_type().to_owned();
1848            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1849            if let Some(tasks) = task.prefetch() {
1850                drop(task);
1851                ctx.prepare_tasks(tasks, "prefetch");
1852                task = ctx.task(task_id, TaskDataCategory::All);
1853            }
1854            let in_progress = task.take_in_progress()?;
1855            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1856                let old = task.set_in_progress(in_progress);
1857                debug_assert!(old.is_none(), "InProgress already exists");
1858                return None;
1859            };
1860            execution_reason = reason;
1861            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1862                InProgressStateInner {
1863                    stale: false,
1864                    once_task,
1865                    done_event,
1866                    marked_as_completed: false,
1867                    new_children: Default::default(),
1868                },
1869            )));
1870            debug_assert!(old.is_none(), "InProgress already exists");
1871
1872            // Make all current collectibles outdated (remove left-over outdated collectibles)
1873            enum Collectible {
1874                Current(CollectibleRef, i32),
1875                Outdated(CollectibleRef),
1876            }
1877            let collectibles = task
1878                .iter_collectibles()
1879                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1880                .chain(
1881                    task.iter_outdated_collectibles()
1882                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1883                )
1884                .collect::<Vec<_>>();
1885            for collectible in collectibles {
1886                match collectible {
1887                    Collectible::Current(collectible, value) => {
1888                        let _ = task.insert_outdated_collectible(collectible, value);
1889                    }
1890                    Collectible::Outdated(collectible) => {
1891                        if task
1892                            .collectibles()
1893                            .is_none_or(|m| m.get(&collectible).is_none())
1894                        {
1895                            task.remove_outdated_collectibles(&collectible);
1896                        }
1897                    }
1898                }
1899            }
1900
1901            if self.should_track_dependencies() {
1902                // Make all dependencies outdated
1903                let cell_dependencies = task.iter_cell_dependencies().collect();
1904                task.set_outdated_cell_dependencies(cell_dependencies);
1905
1906                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1907                task.set_outdated_output_dependencies(outdated_output_dependencies);
1908            }
1909        }
1910
1911        let (span, future) = match task_type {
1912            TaskType::Cached(task_type) => {
1913                let CachedTaskType {
1914                    native_fn,
1915                    this,
1916                    arg,
1917                } = &*task_type;
1918                (
1919                    native_fn.span(task_id.persistence(), execution_reason, priority),
1920                    native_fn.execute(*this, &**arg),
1921                )
1922            }
1923            TaskType::Transient(task_type) => {
1924                let span = tracing::trace_span!("turbo_tasks::root_task");
1925                let future = match &*task_type {
1926                    TransientTask::Root(f) => f(),
1927                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1928                };
1929                (span, future)
1930            }
1931        };
1932        Some(TaskExecutionSpec { future, span })
1933    }
1934
1935    /// Returns `Some(priority)` if the task became stale during execution and needs to be
1936    /// re-scheduled at the given priority. The caller (turbo-tasks manager) hands it to the
1937    /// priority runner so re-execution doesn't inherit the original schedule priority.
1938    fn task_execution_completed(
1939        &self,
1940        task_id: TaskId,
1941        result: Result<RawVc, TurboTasksExecutionError>,
1942        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1943        #[cfg(feature = "verify_determinism")] stateful: bool,
1944        has_invalidator: bool,
1945        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1946    ) -> Option<TaskPriority> {
1947        // Task completion is a 4 step process:
1948        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1949        //    aggregation number of the task and the new children.
1950        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1951        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1952        // 4. Shrink the task memory to reduce footprint of the task.
1953
1954        // Due to persistence it is possible that the process is cancelled after any step. This is
1955        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1956        // in-memory representation.
1957
1958        // The task might be invalidated during this process, so we need to check the stale flag
1959        // at the start of every step.
1960
1961        #[cfg(not(feature = "trace_task_details"))]
1962        let span = tracing::trace_span!(
1963            "task execution completed",
1964            new_children = tracing::field::Empty
1965        )
1966        .entered();
1967        #[cfg(feature = "trace_task_details")]
1968        let span = tracing::trace_span!(
1969            "task execution completed",
1970            task_id = display(task_id),
1971            result = match result.as_ref() {
1972                Ok(value) => display(either::Either::Left(value)),
1973                Err(err) => display(either::Either::Right(err)),
1974            },
1975            new_children = tracing::field::Empty,
1976            immutable = tracing::field::Empty,
1977            new_output = tracing::field::Empty,
1978            output_dependents = tracing::field::Empty,
1979            aggregation_number = tracing::field::Empty,
1980            stale = tracing::field::Empty,
1981        )
1982        .entered();
1983
1984        let is_error = result.is_err();
1985
1986        let mut ctx = self.execute_context(turbo_tasks);
1987
1988        let TaskExecutionCompletePrepareResult {
1989            new_children,
1990            is_now_immutable,
1991            #[cfg(feature = "verify_determinism")]
1992            no_output_set,
1993            new_output,
1994            output_dependent_tasks,
1995            is_recomputation,
1996            is_session_dependent,
1997        } = match self.task_execution_completed_prepare(
1998            &mut ctx,
1999            #[cfg(feature = "trace_task_details")]
2000            &span,
2001            task_id,
2002            result,
2003            cell_counters,
2004            #[cfg(feature = "verify_determinism")]
2005            stateful,
2006            has_invalidator,
2007        ) {
2008            Ok(r) => r,
2009            Err(stale_priority) => {
2010                // Task was stale and has been rescheduled
2011                #[cfg(feature = "trace_task_details")]
2012                span.record("stale", "prepare");
2013                return Some(stale_priority);
2014            }
2015        };
2016
2017        #[cfg(feature = "trace_task_details")]
2018        span.record("new_output", new_output.is_some());
2019        #[cfg(feature = "trace_task_details")]
2020        span.record("output_dependents", output_dependent_tasks.len());
2021
2022        // When restoring from filesystem cache the following might not be executed (since we can
2023        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2024        // would be executed again.
2025
2026        if !output_dependent_tasks.is_empty() {
2027            self.task_execution_completed_invalidate_output_dependent(
2028                &mut ctx,
2029                task_id,
2030                output_dependent_tasks,
2031            );
2032        }
2033
2034        let has_new_children = !new_children.is_empty();
2035        span.record("new_children", new_children.len());
2036
2037        if has_new_children {
2038            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2039        }
2040
2041        if has_new_children
2042            && let Some(stale_priority) =
2043                self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2044        {
2045            // Task was stale and has been rescheduled
2046            #[cfg(feature = "trace_task_details")]
2047            span.record("stale", "connect");
2048            return Some(stale_priority);
2049        }
2050
2051        let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2052            &mut ctx,
2053            task_id,
2054            #[cfg(feature = "verify_determinism")]
2055            no_output_set,
2056            new_output,
2057            is_now_immutable,
2058            is_session_dependent,
2059        );
2060        if let Some(stale_priority) = stale_priority {
2061            // Task was stale and has been rescheduled
2062            #[cfg(feature = "trace_task_details")]
2063            span.record("stale", "finish");
2064            return Some(stale_priority);
2065        }
2066
2067        let removed_data = self.task_execution_completed_cleanup(
2068            &mut ctx,
2069            task_id,
2070            cell_counters,
2071            is_error,
2072            is_recomputation,
2073        );
2074
2075        // Drop data outside of critical sections
2076        drop(removed_data);
2077        drop(in_progress_cells);
2078
2079        None
2080    }
2081
2082    fn task_execution_completed_prepare(
2083        &self,
2084        ctx: &mut impl ExecuteContext<'_>,
2085        #[cfg(feature = "trace_task_details")] span: &Span,
2086        task_id: TaskId,
2087        result: Result<RawVc, TurboTasksExecutionError>,
2088        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2089        #[cfg(feature = "verify_determinism")] stateful: bool,
2090        has_invalidator: bool,
2091    ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2092        let mut task = ctx.task(task_id, TaskDataCategory::All);
2093        let is_recomputation = task.is_dirty().is_none();
2094        // Without dependency tracking, the SessionDependent dirty state is never read (no session
2095        // restore), so skip the work
2096        let is_session_dependent = self.should_track_dependencies()
2097            && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2098        let Some(in_progress) = task.get_in_progress_mut() else {
2099            panic!("Task execution completed, but task is not in progress: {task:#?}");
2100        };
2101        if matches!(in_progress, InProgressState::Canceled) {
2102            return Ok(TaskExecutionCompletePrepareResult {
2103                new_children: Default::default(),
2104                is_now_immutable: false,
2105                #[cfg(feature = "verify_determinism")]
2106                no_output_set: false,
2107                new_output: None,
2108                output_dependent_tasks: Default::default(),
2109                is_recomputation,
2110                is_session_dependent,
2111            });
2112        }
2113        let &mut InProgressState::InProgress(box InProgressStateInner {
2114            stale,
2115            ref mut new_children,
2116            once_task: is_once_task,
2117            ..
2118        }) = in_progress
2119        else {
2120            panic!("Task execution completed, but task is not in progress: {task:#?}");
2121        };
2122
2123        // If the task is stale, reschedule it
2124        #[cfg(not(feature = "no_fast_stale"))]
2125        if stale && !is_once_task {
2126            let stale_priority = compute_stale_priority(&task);
2127            let Some(InProgressState::InProgress(box InProgressStateInner {
2128                done_event,
2129                mut new_children,
2130                ..
2131            })) = task.take_in_progress()
2132            else {
2133                unreachable!();
2134            };
2135            let old = task.set_in_progress(InProgressState::Scheduled {
2136                done_event,
2137                reason: TaskExecutionReason::Stale,
2138            });
2139            debug_assert!(old.is_none(), "InProgress already exists");
2140            // Remove old children from new_children to leave only the children that had their
2141            // active count increased
2142            for task in task.iter_children() {
2143                new_children.remove(&task);
2144            }
2145            drop(task);
2146
2147            // We need to undo the active count increase for the children since we throw away the
2148            // new_children list now.
2149            AggregationUpdateQueue::run(
2150                AggregationUpdateJob::DecreaseActiveCounts {
2151                    task_ids: new_children.into_iter().collect(),
2152                },
2153                ctx,
2154            );
2155            return Err(stale_priority);
2156        }
2157
2158        // take the children from the task to process them
2159        let mut new_children = take(new_children);
2160
2161        // handle stateful (only tracked when verify_determinism is enabled)
2162        #[cfg(feature = "verify_determinism")]
2163        if stateful {
2164            task.set_stateful(true);
2165        }
2166
2167        // handle has_invalidator
2168        if has_invalidator {
2169            task.set_invalidator(true);
2170        }
2171
2172        // handle cell counters: update max index and remove cells that are no longer used
2173        // On error, skip this update: the task may have failed before creating all cells it
2174        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2175        // based on partial counters would cause "cell no longer exists" errors for tasks that
2176        // still hold dependencies on those cells. The old cell data is preserved on error
2177        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2178        // that data is correct.
2179        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2180        // skips cell data removal on error.
2181        if result.is_ok() || is_recomputation {
2182            let old_counters: FxHashMap<_, _> = task
2183                .iter_cell_type_max_index()
2184                .map(|(&k, &v)| (k, v))
2185                .collect();
2186            let mut counters_to_remove = old_counters.clone();
2187
2188            for (&cell_type, &max_index) in cell_counters.iter() {
2189                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2190                    if old_max_index != max_index {
2191                        task.insert_cell_type_max_index(cell_type, max_index);
2192                    }
2193                } else {
2194                    task.insert_cell_type_max_index(cell_type, max_index);
2195                }
2196            }
2197            for (cell_type, _) in counters_to_remove {
2198                task.remove_cell_type_max_index(&cell_type);
2199            }
2200        }
2201
2202        let mut queue = AggregationUpdateQueue::new();
2203
2204        let mut old_edges = Vec::new();
2205
2206        let has_children = !new_children.is_empty();
2207        let is_immutable = task.immutable();
2208        let task_dependencies_for_immutable =
2209            // Task was previously marked as immutable
2210            if !is_immutable
2211            // Task is not session dependent (session dependent tasks can change between sessions)
2212            && !is_session_dependent
2213            // Task has no invalidator
2214            && !task.invalidator()
2215            // Task has no dependencies on collectibles
2216            && task.is_collectibles_dependencies_empty()
2217        {
2218            Some(
2219                // Collect all dependencies on tasks to check if all dependencies are immutable
2220                task.iter_output_dependencies()
2221                    .chain(task.iter_cell_dependencies().map(|dep| dep.cell_ref().task))
2222                    .collect::<FxHashSet<_>>(),
2223            )
2224        } else {
2225            None
2226        };
2227
2228        if has_children {
2229            // Prepare all new children
2230            let _aggregation_number =
2231                prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2232
2233            #[cfg(feature = "trace_task_details")]
2234            span.record("aggregation_number", _aggregation_number);
2235
2236            // Filter actual new children
2237            old_edges.extend(
2238                task.iter_children()
2239                    .filter(|task| !new_children.remove(task))
2240                    .map(OutdatedEdge::Child),
2241            );
2242        } else {
2243            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2244        }
2245
2246        old_edges.extend(
2247            task.iter_outdated_collectibles()
2248                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2249        );
2250
2251        if self.should_track_dependencies() {
2252            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2253            // At execution start, active deps are copied to outdated as a "before" snapshot.
2254            // During execution, new deps are added to active.
2255            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2256            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2257            // breaking dependency tracking.
2258            old_edges.extend(
2259                task.iter_outdated_cell_dependencies()
2260                    .map(OutdatedEdge::CellDependency),
2261            );
2262            old_edges.extend(
2263                task.iter_outdated_output_dependencies()
2264                    .map(OutdatedEdge::OutputDependency),
2265            );
2266        }
2267
2268        // Check if output need to be updated
2269        let current_output = task.get_output();
2270        #[cfg(feature = "verify_determinism")]
2271        let no_output_set = current_output.is_none();
2272        let new_output = match result {
2273            Ok(RawVc::TaskOutput(output_task_id)) => {
2274                if let Some(OutputValue::Output(current_task_id)) = current_output
2275                    && *current_task_id == output_task_id
2276                {
2277                    None
2278                } else {
2279                    Some(OutputValue::Output(output_task_id))
2280                }
2281            }
2282            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2283                if let Some(OutputValue::Cell(CellRef {
2284                    task: current_task_id,
2285                    cell: current_cell,
2286                })) = current_output
2287                    && *current_task_id == output_task_id
2288                    && *current_cell == cell
2289                {
2290                    None
2291                } else {
2292                    Some(OutputValue::Cell(CellRef {
2293                        task: output_task_id,
2294                        cell,
2295                    }))
2296                }
2297            }
2298            Ok(RawVc::LocalOutput(..)) => {
2299                panic!("Non-local tasks must not return a local Vc");
2300            }
2301            Err(err) => {
2302                if let Some(OutputValue::Error(old_error)) = current_output
2303                    && **old_error == err
2304                {
2305                    None
2306                } else {
2307                    Some(OutputValue::Error(Arc::new((&err).into())))
2308                }
2309            }
2310        };
2311        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2312        // When output has changed, grab the dependent tasks
2313        if new_output.is_some() && ctx.should_track_dependencies() {
2314            output_dependent_tasks = task.iter_output_dependent().collect();
2315        }
2316
2317        drop(task);
2318
2319        // Check if the task can be marked as immutable
2320        let mut is_now_immutable = false;
2321        if let Some(dependencies) = task_dependencies_for_immutable
2322            && dependencies
2323                .iter()
2324                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2325        {
2326            is_now_immutable = true;
2327        }
2328        #[cfg(feature = "trace_task_details")]
2329        span.record("immutable", is_immutable || is_now_immutable);
2330
2331        if !queue.is_empty() || !old_edges.is_empty() {
2332            #[cfg(any(
2333                feature = "trace_task_completion",
2334                feature = "trace_aggregation_update_stats"
2335            ))]
2336            let _span =
2337                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2338                    .entered();
2339            // Remove outdated edges first, before removing in_progress+dirty flag.
2340            // We need to make sure all outdated edges are removed before the task can potentially
2341            // be scheduled and executed again
2342            #[cfg(feature = "trace_aggregation_update_stats")]
2343            {
2344                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2345                _span.record("stats", tracing::field::debug(stats));
2346            }
2347            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2348            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2349        }
2350
2351        Ok(TaskExecutionCompletePrepareResult {
2352            new_children,
2353            is_now_immutable,
2354            #[cfg(feature = "verify_determinism")]
2355            no_output_set,
2356            new_output,
2357            output_dependent_tasks,
2358            is_recomputation,
2359            is_session_dependent,
2360        })
2361    }
2362
2363    fn task_execution_completed_invalidate_output_dependent(
2364        &self,
2365        ctx: &mut impl ExecuteContext<'_>,
2366        task_id: TaskId,
2367        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2368    ) {
2369        debug_assert!(!output_dependent_tasks.is_empty());
2370
2371        #[cfg(feature = "trace_task_dirty")]
2372        let task_description = self.debug_get_task_description(task_id);
2373
2374        if output_dependent_tasks.len() > 1 {
2375            ctx.prepare_tasks(
2376                output_dependent_tasks
2377                    .iter()
2378                    .map(|&id| (id, TaskDataCategory::All)),
2379                "invalidate output dependents",
2380            );
2381        }
2382
2383        fn process_output_dependents(
2384            ctx: &mut impl ExecuteContext<'_>,
2385            task_id: TaskId,
2386            #[cfg(feature = "trace_task_dirty")] task_description: &str,
2387            dependent_task_id: TaskId,
2388            queue: &mut AggregationUpdateQueue,
2389        ) {
2390            #[cfg(feature = "trace_task_output_dependencies")]
2391            let span = tracing::trace_span!(
2392                "invalidate output dependency",
2393                task = %task_id,
2394                dependent_task = %dependent_task_id,
2395                result = tracing::field::Empty,
2396            )
2397            .entered();
2398            let mut make_stale = true;
2399            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2400            let transient_task_type = dependent.get_transient_task_type();
2401            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2402                // once tasks are never invalidated
2403                #[cfg(feature = "trace_task_output_dependencies")]
2404                span.record("result", "once task");
2405                return;
2406            }
2407            if dependent.outdated_output_dependencies_contains(&task_id) {
2408                #[cfg(feature = "trace_task_output_dependencies")]
2409                span.record("result", "outdated dependency");
2410                // output dependency is outdated, so it hasn't read the output yet
2411                // and doesn't need to be invalidated
2412                // But importantly we still need to make the task dirty as it should no longer
2413                // be considered as "recomputation".
2414                make_stale = false;
2415            } else if !dependent.output_dependencies_contains(&task_id) {
2416                // output dependency has been removed, so the task doesn't depend on the
2417                // output anymore and doesn't need to be invalidated
2418                #[cfg(feature = "trace_task_output_dependencies")]
2419                span.record("result", "no backward dependency");
2420                return;
2421            }
2422            make_task_dirty_internal(
2423                dependent,
2424                dependent_task_id,
2425                make_stale,
2426                #[cfg(feature = "trace_task_dirty")]
2427                TaskDirtyCause::OutputChange {
2428                    task_description: task_description.to_string(),
2429                },
2430                queue,
2431                ctx,
2432            );
2433            #[cfg(feature = "trace_task_output_dependencies")]
2434            span.record("result", "marked dirty");
2435        }
2436
2437        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2438            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2439            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2440            let _ = scope_and_block(chunks.len(), |scope| {
2441                for chunk in chunks {
2442                    let child_ctx = ctx.child_context();
2443                    #[cfg(feature = "trace_task_dirty")]
2444                    let task_description = &task_description;
2445                    scope.spawn(move || {
2446                        let mut ctx = child_ctx.create();
2447                        let mut queue = AggregationUpdateQueue::new();
2448                        for dependent_task_id in chunk {
2449                            process_output_dependents(
2450                                &mut ctx,
2451                                task_id,
2452                                #[cfg(feature = "trace_task_dirty")]
2453                                task_description,
2454                                dependent_task_id,
2455                                &mut queue,
2456                            )
2457                        }
2458                        queue.execute(&mut ctx);
2459                    });
2460                }
2461            });
2462        } else {
2463            let mut queue = AggregationUpdateQueue::new();
2464            for dependent_task_id in output_dependent_tasks {
2465                process_output_dependents(
2466                    ctx,
2467                    task_id,
2468                    #[cfg(feature = "trace_task_dirty")]
2469                    &task_description,
2470                    dependent_task_id,
2471                    &mut queue,
2472                );
2473            }
2474            queue.execute(ctx);
2475        }
2476    }
2477
2478    fn task_execution_completed_unfinished_children_dirty(
2479        &self,
2480        ctx: &mut impl ExecuteContext<'_>,
2481        new_children: &FxHashSet<TaskId>,
2482    ) {
2483        debug_assert!(!new_children.is_empty());
2484
2485        let mut queue = AggregationUpdateQueue::new();
2486        ctx.for_each_task_all(
2487            new_children.iter().copied(),
2488            "unfinished children dirty",
2489            |child_task, ctx| {
2490                if !child_task.has_output() {
2491                    let child_id = child_task.id();
2492                    make_task_dirty_internal(
2493                        child_task,
2494                        child_id,
2495                        false,
2496                        #[cfg(feature = "trace_task_dirty")]
2497                        TaskDirtyCause::InitialDirty,
2498                        &mut queue,
2499                        ctx,
2500                    );
2501                }
2502            },
2503        );
2504
2505        queue.execute(ctx);
2506    }
2507
2508    fn task_execution_completed_connect(
2509        &self,
2510        ctx: &mut impl ExecuteContext<'_>,
2511        task_id: TaskId,
2512        new_children: FxHashSet<TaskId>,
2513    ) -> Option<TaskPriority> {
2514        debug_assert!(!new_children.is_empty());
2515
2516        let mut task = ctx.task(task_id, TaskDataCategory::All);
2517        let Some(in_progress) = task.get_in_progress() else {
2518            panic!("Task execution completed, but task is not in progress: {task:#?}");
2519        };
2520        if matches!(in_progress, InProgressState::Canceled) {
2521            // Task was canceled in the meantime, so we don't connect the children
2522            return None;
2523        }
2524        let InProgressState::InProgress(box InProgressStateInner {
2525            #[cfg(not(feature = "no_fast_stale"))]
2526            stale,
2527            once_task: is_once_task,
2528            ..
2529        }) = in_progress
2530        else {
2531            panic!("Task execution completed, but task is not in progress: {task:#?}");
2532        };
2533
2534        // If the task is stale, reschedule it
2535        #[cfg(not(feature = "no_fast_stale"))]
2536        if *stale && !is_once_task {
2537            let stale_priority = compute_stale_priority(&task);
2538            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2539                task.take_in_progress()
2540            else {
2541                unreachable!();
2542            };
2543            let old = task.set_in_progress(InProgressState::Scheduled {
2544                done_event,
2545                reason: TaskExecutionReason::Stale,
2546            });
2547            debug_assert!(old.is_none(), "InProgress already exists");
2548            drop(task);
2549
2550            // All `new_children` are currently hold active with an active count and we need to undo
2551            // that. (We already filtered out the old children from that list)
2552            AggregationUpdateQueue::run(
2553                AggregationUpdateJob::DecreaseActiveCounts {
2554                    task_ids: new_children.into_iter().collect(),
2555                },
2556                ctx,
2557            );
2558            return Some(stale_priority);
2559        }
2560
2561        let has_active_count = ctx.should_track_activeness()
2562            && task
2563                .get_activeness()
2564                .is_some_and(|activeness| activeness.active_counter > 0);
2565        connect_children(
2566            ctx,
2567            task_id,
2568            task,
2569            new_children,
2570            has_active_count,
2571            ctx.should_track_activeness(),
2572        );
2573
2574        None
2575    }
2576
2577    #[allow(clippy::type_complexity)]
2578    fn task_execution_completed_finish(
2579        &self,
2580        ctx: &mut impl ExecuteContext<'_>,
2581        task_id: TaskId,
2582        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2583        new_output: Option<OutputValue>,
2584        is_now_immutable: bool,
2585        is_session_dependent: bool,
2586    ) -> (
2587        Option<TaskPriority>,
2588        Option<
2589            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2590        >,
2591    ) {
2592        let mut task = ctx.task(task_id, TaskDataCategory::All);
2593        let Some(in_progress) = task.take_in_progress() else {
2594            panic!("Task execution completed, but task is not in progress: {task:#?}");
2595        };
2596        if matches!(in_progress, InProgressState::Canceled) {
2597            // Task was canceled in the meantime, so we don't finish it
2598            return (None, None);
2599        }
2600        let InProgressState::InProgress(box InProgressStateInner {
2601            done_event,
2602            once_task: is_once_task,
2603            stale,
2604            marked_as_completed: _,
2605            new_children,
2606        }) = in_progress
2607        else {
2608            panic!("Task execution completed, but task is not in progress: {task:#?}");
2609        };
2610        debug_assert!(new_children.is_empty());
2611
2612        // If the task is stale, reschedule it
2613        if stale && !is_once_task {
2614            let stale_priority = compute_stale_priority(&task);
2615            let old = task.set_in_progress(InProgressState::Scheduled {
2616                done_event,
2617                reason: TaskExecutionReason::Stale,
2618            });
2619            debug_assert!(old.is_none(), "InProgress already exists");
2620            return (Some(stale_priority), None);
2621        }
2622
2623        // Set the output if it has changed
2624        let mut old_content = None;
2625        if let Some(value) = new_output {
2626            old_content = task.set_output(value);
2627        }
2628
2629        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2630        // to be invalidated and we can mark it as immutable.
2631        if is_now_immutable {
2632            task.set_immutable(true);
2633        }
2634
2635        // Notify in progress cells and remove all of them
2636        let in_progress_cells = task.take_in_progress_cells();
2637        if let Some(ref cells) = in_progress_cells {
2638            for state in cells.values() {
2639                state.event.notify(usize::MAX);
2640            }
2641        }
2642
2643        // Compute and apply the new dirty state, propagating to aggregating ancestors
2644        let new_dirtyness = if is_session_dependent {
2645            Some(Dirtyness::SessionDependent)
2646        } else {
2647            None
2648        };
2649        #[cfg(feature = "verify_determinism")]
2650        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2651        let data_update = task.update_dirty_state(new_dirtyness);
2652
2653        // Under verify_determinism we re-run a task whose dirty state or output unexpectedly
2654        // changed during execution to confirm determinism. The leaf priority keeps these
2655        // verification reruns at the highest priority.
2656        #[cfg(feature = "verify_determinism")]
2657        let stale_priority: Option<TaskPriority> =
2658            ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2659                .then(TaskPriority::leaf);
2660        #[cfg(not(feature = "verify_determinism"))]
2661        let stale_priority: Option<TaskPriority> = None;
2662        if stale_priority.is_some() {
2663            let old = task.set_in_progress(InProgressState::Scheduled {
2664                done_event,
2665                reason: TaskExecutionReason::Stale,
2666            });
2667            debug_assert!(old.is_none(), "InProgress already exists");
2668            drop(task);
2669        } else {
2670            drop(task);
2671
2672            // Notify dependent tasks that are waiting for this task to finish
2673            done_event.notify(usize::MAX);
2674        }
2675
2676        drop(old_content);
2677
2678        if let Some(data_update) = data_update {
2679            AggregationUpdateQueue::run(data_update, ctx);
2680        }
2681
2682        // We return so the data can be dropped outside of critical sections
2683        (stale_priority, in_progress_cells)
2684    }
2685
2686    fn task_execution_completed_cleanup(
2687        &self,
2688        ctx: &mut impl ExecuteContext<'_>,
2689        task_id: TaskId,
2690        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2691        is_error: bool,
2692        is_recomputation: bool,
2693    ) -> Vec<SharedReference> {
2694        let mut task = ctx.task(task_id, TaskDataCategory::All);
2695        let mut removed_cell_data = Vec::new();
2696        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2697        // after an error as it is likely transient and we want to keep the dependent tasks
2698        // clean to avoid re-executions.
2699        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2700        // skips cell_type_max_index updates on error.
2701        if !is_error || is_recomputation {
2702            // Remove no longer existing cells and
2703            // find all outdated data items (removed cells, outdated edges)
2704            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2705            // anyway and we want to avoid needless re-executions. When the cells become
2706            // used again, they are invalidated from the update cell operation.
2707            let to_remove: Vec<_> = task
2708                .iter_cell_data()
2709                .filter_map(|(cell, _)| {
2710                    cell_counters
2711                        .get(&cell.type_id)
2712                        .is_none_or(|start_index| cell.index >= *start_index)
2713                        .then_some(*cell)
2714                })
2715                .collect();
2716            removed_cell_data.reserve_exact(to_remove.len());
2717            for cell in to_remove {
2718                if let Some(data) = task.remove_cell_data(&cell) {
2719                    removed_cell_data.push(data);
2720                }
2721            }
2722            // Remove cell_data_hash entries for cells that no longer exist
2723            let to_remove_hash: Vec<_> = task
2724                .iter_cell_data_hash()
2725                .filter_map(|(cell, _)| {
2726                    cell_counters
2727                        .get(&cell.type_id)
2728                        .is_none_or(|start_index| cell.index >= *start_index)
2729                        .then_some(*cell)
2730                })
2731                .collect();
2732            for cell in to_remove_hash {
2733                task.remove_cell_data_hash(&cell);
2734            }
2735        }
2736
2737        // Clean up task storage after execution:
2738        // - Shrink collections marked with shrink_on_completion
2739        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2740        task.cleanup_after_execution();
2741
2742        drop(task);
2743
2744        // Return so we can drop outside of critical sections
2745        removed_cell_data
2746    }
2747
2748    /// Prints the standard message emitted when the background persisting process stops due to an
2749    /// unrecoverable write error. The caller is responsible for returning from the background job.
2750    fn log_unrecoverable_persist_error() {
2751        eprintln!(
2752            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2753             background persisting process."
2754        );
2755    }
2756
2757    fn run_backend_job<'a>(
2758        self: &'a Arc<Self>,
2759        job: TurboTasksBackendJob,
2760        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2761    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2762        Box::pin(async move {
2763            match job {
2764                TurboTasksBackendJob::Snapshot => {
2765                    debug_assert!(self.should_persist());
2766
2767                    let mut last_snapshot = self.start_time;
2768                    let mut idle_start_listener = self.idle_start_event.listen();
2769                    let mut idle_end_listener = self.idle_end_event.listen();
2770                    // Whether to immediately set an idle timeout if possible.
2771                    // Set to false if we don't persist anything in a cycle.
2772                    let mut fresh_idle = true;
2773                    let mut evicted = false;
2774                    let mut is_first = true;
2775                    'outer: loop {
2776                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2777                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2778                        let idle_timeout = *IDLE_TIMEOUT;
2779                        let (time, mut reason) = if is_first {
2780                            (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2781                        } else {
2782                            (SNAPSHOT_INTERVAL, "regular snapshot interval")
2783                        };
2784
2785                        let until = last_snapshot + time;
2786                        if until > Instant::now() {
2787                            let mut stop_listener = self.stopping_event.listen();
2788                            if self.stopping.load(Ordering::Acquire) {
2789                                return;
2790                            }
2791                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2792                                Instant::now() + idle_timeout
2793                            } else {
2794                                far_future()
2795                            };
2796                            loop {
2797                                tokio::select! {
2798                                    _ = &mut stop_listener => {
2799                                        return;
2800                                    },
2801                                    _ = &mut idle_start_listener => {
2802                                        idle_time = Instant::now() + idle_timeout;
2803                                        idle_start_listener = self.idle_start_event.listen()
2804                                    },
2805                                    _ = &mut idle_end_listener => {
2806                                        idle_time = far_future();
2807                                        idle_end_listener = self.idle_end_event.listen()
2808                                    },
2809                                    _ = tokio::time::sleep_until(until) => {
2810                                        break;
2811                                    },
2812                                    _ = tokio::time::sleep_until(idle_time) => {
2813                                        if turbo_tasks.is_idle() {
2814                                            reason = "idle timeout";
2815                                            break;
2816                                        }
2817                                    },
2818                                }
2819                            }
2820                        }
2821
2822                        let this = self.clone();
2823                        // Create a root span shared by both the snapshot/persist
2824                        // work and the subsequent compaction so they appear
2825                        // grouped together in trace viewers.
2826                        let background_span =
2827                            tracing::info_span!(parent: None, "background snapshot");
2828                        match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2829                            Err(err) => {
2830                                // save_snapshot consumed persisted_task_cache_log entries;
2831                                // further snapshots would corrupt the task graph.
2832                                eprintln!("Persisting failed: {err:?}");
2833                                Self::log_unrecoverable_persist_error();
2834                                return;
2835                            }
2836                            Ok((snapshot_start, new_data)) => {
2837                                // if we see 'new_data' then the next idle transition is 'fresh'
2838                                fresh_idle = new_data;
2839                                is_first = false;
2840                                last_snapshot = snapshot_start;
2841
2842                                // Polls the idle-end event without blocking. Returns
2843                                // `true` and refreshes the listener if idle has ended,
2844                                // `false` if we are still idle.
2845                                macro_rules! check_idle_ended {
2846                                    () => {{
2847                                        tokio::select! {
2848                                            biased;
2849                                            _ = &mut idle_end_listener => {
2850                                                idle_end_listener = self.idle_end_event.listen();
2851                                                true
2852                                            },
2853                                            _ = std::future::ready(()) => false,
2854                                        }
2855                                    }};
2856                                }
2857                                // Evict persisted tasks from memory to reclaim space.
2858                                // Like compaction, this runs after snapshot_and_persist
2859                                // as a separate concern.
2860                                //
2861                                // TODO: improve eviction policy — current approach is a full sweep
2862                                // after every snapshot. Better strategies to consider:
2863                                //   - Memory pressure signals: only evict when RSS exceeds a
2864                                //     threshold rather than unconditionally.
2865                                //   - Recency data: track last-access time per task and evict
2866                                //     least-recently-used entries first rather than all at once.
2867                                //   - Eviction intensity: partial sweeps (evict a fraction of
2868                                //     eligible tasks per cycle) to reduce latency spikes.
2869                                // Evict when there is new data to persist (the common
2870                                // case) or on the very first snapshot after startup
2871                                // (data was already on disk from a prior run, so
2872                                // new_data may be false but in-memory state can still
2873                                // be evicted).
2874                                let mut ran_eviction = false;
2875                                if this.should_evict() && (new_data || !evicted) {
2876                                    if check_idle_ended!() {
2877                                        // need to start all the way over so we catch the next
2878                                        // signal
2879                                        continue 'outer;
2880                                    }
2881                                    evicted = true;
2882                                    ran_eviction = true;
2883                                    this.storage.evict_after_snapshot(background_span.id());
2884                                }
2885
2886                                // Compact while idle (up to limit), regardless of
2887                                // whether the snapshot had new data.
2888                                // `background_span` is not entered here because
2889                                // `EnteredSpan` is `!Send` and would prevent the
2890                                // future from being sent across threads when it
2891                                // suspends at the `select!` await below.
2892                                let mut ran_compaction = false;
2893                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2894                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2895                                    if check_idle_ended!() {
2896                                        continue 'outer;
2897                                    }
2898                                    // Enter the span only around the synchronous
2899                                    // compact() call so we never hold an
2900                                    // `EnteredSpan` across an await point.
2901                                    let _compact_span = tracing::info_span!(
2902                                        parent: background_span.id(),
2903                                        "compact database"
2904                                    )
2905                                    .entered();
2906                                    match self.backing_storage.compact() {
2907                                        Ok(true) => {
2908                                            ran_compaction = true;
2909                                        }
2910                                        Ok(false) => break,
2911                                        Err(err) => {
2912                                            eprintln!("Compaction failed: {err:?}");
2913                                            if self.backing_storage.has_unrecoverable_write_error()
2914                                            {
2915                                                Self::log_unrecoverable_persist_error();
2916                                                return;
2917                                            }
2918                                            break;
2919                                        }
2920                                    }
2921                                }
2922                                if check_idle_ended!() {
2923                                    continue 'outer;
2924                                }
2925                                // After running snapshotting/eviction/compaction we have churned a
2926                                // _lot_ of memory if we are still
2927                                // idle tell `mimalloc` that now would be a good time to release
2928                                // memory back to the OS
2929                                if new_data || ran_compaction || ran_eviction {
2930                                    turbo_tasks_malloc::TurboMalloc::collect(true);
2931                                }
2932                            }
2933                        }
2934                    }
2935                }
2936            }
2937        })
2938    }
2939
2940    fn try_read_own_task_cell(
2941        &self,
2942        task_id: TaskId,
2943        cell: CellId,
2944        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2945    ) -> Result<TypedCellContent> {
2946        let mut ctx = self.execute_context(turbo_tasks);
2947        let task = ctx.task(task_id, TaskDataCategory::Data);
2948        if let Some(content) = task.get_cell_data(&cell).cloned() {
2949            Ok(CellContent(Some(content)).into_typed(cell.type_id))
2950        } else {
2951            Ok(CellContent(None).into_typed(cell.type_id))
2952        }
2953    }
2954
2955    fn read_task_collectibles(
2956        &self,
2957        task_id: TaskId,
2958        collectible_type: TraitTypeId,
2959        reader_id: Option<TaskId>,
2960        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2961    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2962        let mut ctx = self.execute_context(turbo_tasks);
2963        let mut collectibles = AutoMap::default();
2964        {
2965            let mut task = ctx.task(task_id, TaskDataCategory::All);
2966            if task
2967                .get_persistent_task_type()
2968                .is_some_and(|t| !t.native_fn.is_root)
2969            {
2970                drop(task);
2971                panic!(
2972                    "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
2973                     is missing on the task.",
2974                    self.debug_get_task_description(task_id),
2975                    reader_id.map_or_else(
2976                        || "unknown".to_string(),
2977                        |r| self.debug_get_task_description(r)
2978                    )
2979                );
2980            }
2981            for (collectible, count) in task.iter_aggregated_collectibles() {
2982                if *count > 0 && collectible.collectible_type == collectible_type {
2983                    *collectibles
2984                        .entry(RawVc::TaskCell(
2985                            collectible.cell.task,
2986                            collectible.cell.cell,
2987                        ))
2988                        .or_insert(0) += 1;
2989                }
2990            }
2991            for (&collectible, &count) in task.iter_collectibles() {
2992                if collectible.collectible_type == collectible_type {
2993                    *collectibles
2994                        .entry(RawVc::TaskCell(
2995                            collectible.cell.task,
2996                            collectible.cell.cell,
2997                        ))
2998                        .or_insert(0) += count;
2999                }
3000            }
3001            if let Some(reader_id) = reader_id {
3002                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3003            }
3004        }
3005        if let Some(reader_id) = reader_id {
3006            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3007            let target = CollectiblesRef {
3008                task: task_id,
3009                collectible_type,
3010            };
3011            if !reader.remove_outdated_collectibles_dependencies(&target) {
3012                let _ = reader.add_collectibles_dependencies(target);
3013            }
3014        }
3015        collectibles
3016    }
3017
3018    fn emit_collectible(
3019        &self,
3020        collectible_type: TraitTypeId,
3021        collectible: RawVc,
3022        task_id: TaskId,
3023        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3024    ) {
3025        self.assert_valid_collectible(task_id, collectible);
3026
3027        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3028            panic!("Collectibles need to be resolved");
3029        };
3030        let cell = CellRef {
3031            task: collectible_task,
3032            cell,
3033        };
3034        operation::UpdateCollectibleOperation::run(
3035            task_id,
3036            CollectibleRef {
3037                collectible_type,
3038                cell,
3039            },
3040            1,
3041            self.execute_context(turbo_tasks),
3042        );
3043    }
3044
3045    fn unemit_collectible(
3046        &self,
3047        collectible_type: TraitTypeId,
3048        collectible: RawVc,
3049        count: u32,
3050        task_id: TaskId,
3051        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3052    ) {
3053        self.assert_valid_collectible(task_id, collectible);
3054
3055        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3056            panic!("Collectibles need to be resolved");
3057        };
3058        let cell = CellRef {
3059            task: collectible_task,
3060            cell,
3061        };
3062        operation::UpdateCollectibleOperation::run(
3063            task_id,
3064            CollectibleRef {
3065                collectible_type,
3066                cell,
3067            },
3068            -(i32::try_from(count).unwrap()),
3069            self.execute_context(turbo_tasks),
3070        );
3071    }
3072
3073    fn update_task_cell(
3074        &self,
3075        task_id: TaskId,
3076        cell: CellId,
3077        content: CellContent,
3078        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3079        content_hash: Option<CellHash>,
3080        verification_mode: VerificationMode,
3081        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3082    ) {
3083        operation::UpdateCellOperation::run(
3084            task_id,
3085            cell,
3086            content,
3087            updated_key_hashes,
3088            content_hash,
3089            verification_mode,
3090            self.execute_context(turbo_tasks),
3091        );
3092    }
3093
3094    fn mark_own_task_as_finished(
3095        &self,
3096        task: TaskId,
3097        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3098    ) {
3099        let mut ctx = self.execute_context(turbo_tasks);
3100        let mut task = ctx.task(task, TaskDataCategory::Data);
3101        if let Some(InProgressState::InProgress(box InProgressStateInner {
3102            marked_as_completed,
3103            ..
3104        })) = task.get_in_progress_mut()
3105        {
3106            *marked_as_completed = true;
3107            // TODO this should remove the dirty state (also check session_dependent)
3108            // but this would break some assumptions for strongly consistent reads.
3109            // Client tasks are not connected yet, so we wouldn't wait for them.
3110            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3111        }
3112    }
3113
3114    fn connect_task(
3115        &self,
3116        task: TaskId,
3117        parent_task: Option<TaskId>,
3118        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3119    ) {
3120        self.assert_not_persistent_calling_transient(parent_task, task, None);
3121        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3122    }
3123
3124    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3125        let task_id = self.transient_task_id_factory.get();
3126        {
3127            let mut task = self.storage.access_mut(task_id);
3128            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3129        }
3130        #[cfg(feature = "verify_aggregation_graph")]
3131        self.root_tasks.lock().insert(task_id);
3132        task_id
3133    }
3134
3135    fn dispose_root_task(
3136        &self,
3137        task_id: TaskId,
3138        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3139    ) {
3140        #[cfg(feature = "verify_aggregation_graph")]
3141        self.root_tasks.lock().remove(&task_id);
3142
3143        let mut ctx = self.execute_context(turbo_tasks);
3144        let mut task = ctx.task(task_id, TaskDataCategory::All);
3145        let is_dirty = task.is_dirty();
3146        let has_dirty_containers = task.has_dirty_containers();
3147        if is_dirty.is_some() || has_dirty_containers {
3148            if let Some(activeness_state) = task.get_activeness_mut() {
3149                // We will finish the task, but it would be removed after the task is done
3150                activeness_state.unset_root_type();
3151                activeness_state.set_active_until_clean();
3152            };
3153        } else if let Some(activeness_state) = task.take_activeness() {
3154            // Technically nobody should be listening to this event, but just in case
3155            // we notify it anyway
3156            activeness_state.all_clean_event.notify(usize::MAX);
3157        }
3158    }
3159
3160    #[cfg(feature = "verify_aggregation_graph")]
3161    fn verify_aggregation_graph(
3162        &self,
3163        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3164        idle: bool,
3165    ) {
3166        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3167            return;
3168        }
3169        use std::{collections::VecDeque, env, io::stdout};
3170
3171        use crate::backend::operation::{get_uppers, is_aggregating_node};
3172
3173        let mut ctx = self.execute_context(turbo_tasks);
3174        let root_tasks = self.root_tasks.lock().clone();
3175
3176        for task_id in root_tasks.into_iter() {
3177            let mut queue = VecDeque::new();
3178            let mut visited = FxHashSet::default();
3179            let mut aggregated_nodes = FxHashSet::default();
3180            let mut collectibles = FxHashMap::default();
3181            let root_task_id = task_id;
3182            visited.insert(task_id);
3183            aggregated_nodes.insert(task_id);
3184            queue.push_back(task_id);
3185            let mut counter = 0;
3186            while let Some(task_id) = queue.pop_front() {
3187                counter += 1;
3188                if counter % 100000 == 0 {
3189                    println!(
3190                        "queue={}, visited={}, aggregated_nodes={}",
3191                        queue.len(),
3192                        visited.len(),
3193                        aggregated_nodes.len()
3194                    );
3195                }
3196                let task = ctx.task(task_id, TaskDataCategory::All);
3197                if idle && !self.is_idle.load(Ordering::Relaxed) {
3198                    return;
3199                }
3200
3201                let uppers = get_uppers(&task);
3202                if task_id != root_task_id
3203                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3204                {
3205                    panic!(
3206                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3207                         {:?})",
3208                        task_id,
3209                        task.get_task_description(),
3210                        uppers
3211                    );
3212                }
3213
3214                for (collectible, _) in task.iter_aggregated_collectibles() {
3215                    collectibles
3216                        .entry(*collectible)
3217                        .or_insert_with(|| (false, Vec::new()))
3218                        .1
3219                        .push(task_id);
3220                }
3221
3222                for (&collectible, &value) in task.iter_collectibles() {
3223                    if value > 0 {
3224                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3225                            *flag = true
3226                        } else {
3227                            panic!(
3228                                "Task {} has a collectible {:?} that is not in any upper task",
3229                                task_id, collectible
3230                            );
3231                        }
3232                    }
3233                }
3234
3235                let is_dirty = task.has_dirty();
3236                let has_dirty_container = task.has_dirty_containers();
3237                let should_be_in_upper = is_dirty || has_dirty_container;
3238
3239                let aggregation_number = get_aggregation_number(&task);
3240                if is_aggregating_node(aggregation_number) {
3241                    aggregated_nodes.insert(task_id);
3242                }
3243                // println!(
3244                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3245                //     ctx.get_task_description(task_id),
3246                //     uppers
3247                // );
3248
3249                for child_id in task.iter_children() {
3250                    // println!("{task_id}: child -> {child_id}");
3251                    if visited.insert(child_id) {
3252                        queue.push_back(child_id);
3253                    }
3254                }
3255                drop(task);
3256
3257                if should_be_in_upper {
3258                    for upper_id in uppers {
3259                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3260                        let in_upper = upper
3261                            .get_aggregated_dirty_containers(&task_id)
3262                            .is_some_and(|&dirty| dirty > 0);
3263                        if !in_upper {
3264                            let containers: Vec<_> = upper
3265                                .iter_aggregated_dirty_containers()
3266                                .map(|(&k, &v)| (k, v))
3267                                .collect();
3268                            let upper_task_desc = upper.get_task_description();
3269                            drop(upper);
3270                            panic!(
3271                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3272                                 ({})\nThese dirty containers are present:\n{:#?}",
3273                                task_id,
3274                                ctx.task(task_id, TaskDataCategory::Data)
3275                                    .get_task_description(),
3276                                upper_id,
3277                                upper_task_desc,
3278                                containers,
3279                            );
3280                        }
3281                    }
3282                }
3283            }
3284
3285            for (collectible, (flag, task_ids)) in collectibles {
3286                if !flag {
3287                    use std::io::Write;
3288                    let mut stdout = stdout().lock();
3289                    writeln!(
3290                        stdout,
3291                        "{:?} that is not emitted in any child task but in these aggregated \
3292                         tasks: {:#?}",
3293                        collectible,
3294                        task_ids
3295                            .iter()
3296                            .map(|t| format!(
3297                                "{t} {}",
3298                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3299                            ))
3300                            .collect::<Vec<_>>()
3301                    )
3302                    .unwrap();
3303
3304                    let task_id = collectible.cell.task;
3305                    let mut queue = {
3306                        let task = ctx.task(task_id, TaskDataCategory::All);
3307                        get_uppers(&task)
3308                    };
3309                    let mut visited = FxHashSet::default();
3310                    for &upper_id in queue.iter() {
3311                        visited.insert(upper_id);
3312                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3313                    }
3314                    while let Some(task_id) = queue.pop() {
3315                        let task = ctx.task(task_id, TaskDataCategory::All);
3316                        let desc = task.get_task_description();
3317                        let aggregated_collectible = task
3318                            .get_aggregated_collectibles(&collectible)
3319                            .copied()
3320                            .unwrap_or_default();
3321                        let uppers = get_uppers(&task);
3322                        drop(task);
3323                        writeln!(
3324                            stdout,
3325                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3326                        )
3327                        .unwrap();
3328                        if task_ids.contains(&task_id) {
3329                            writeln!(
3330                                stdout,
3331                                "Task has an upper connection to an aggregated task that doesn't \
3332                                 reference it. Upper connection is invalid!"
3333                            )
3334                            .unwrap();
3335                        }
3336                        for upper_id in uppers {
3337                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3338                            if !visited.contains(&upper_id) {
3339                                queue.push(upper_id);
3340                            }
3341                        }
3342                    }
3343                    panic!("See stdout for more details");
3344                }
3345            }
3346        }
3347    }
3348
3349    fn assert_not_persistent_calling_transient(
3350        &self,
3351        parent_id: Option<TaskId>,
3352        child_id: TaskId,
3353        cell_id: Option<CellId>,
3354    ) {
3355        if let Some(parent_id) = parent_id
3356            && !parent_id.is_transient()
3357            && child_id.is_transient()
3358        {
3359            self.panic_persistent_calling_transient(
3360                self.debug_get_task_description(parent_id),
3361                self.debug_get_cached_task_type(child_id).as_deref(),
3362                cell_id,
3363            );
3364        }
3365    }
3366
3367    fn panic_persistent_calling_transient(
3368        &self,
3369        parent: String,
3370        child: Option<&CachedTaskType>,
3371        cell_id: Option<CellId>,
3372    ) -> ! {
3373        let transient_reason = if let Some(child) = child {
3374            Cow::Owned(format!(
3375                " The callee is transient because it depends on:\n{}",
3376                self.debug_trace_transient_task(child, cell_id),
3377            ))
3378        } else {
3379            Cow::Borrowed("")
3380        };
3381        panic!(
3382            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3383            parent,
3384            child.map_or("unknown", |t| t.get_name()),
3385            transient_reason,
3386        );
3387    }
3388
3389    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3390        // these checks occur in a potentially hot codepath, but they're cheap
3391        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3392            // This should never happen: The collectible APIs use ResolvedVc
3393            let task_info = if let Some(col_task_ty) = collectible
3394                .try_get_task_id()
3395                .map(|t| self.debug_get_task_description(t))
3396            {
3397                Cow::Owned(format!(" (return type of {col_task_ty})"))
3398            } else {
3399                Cow::Borrowed("")
3400            };
3401            panic!("Collectible{task_info} must be a ResolvedVc")
3402        };
3403        if col_task_id.is_transient() && !task_id.is_transient() {
3404            let transient_reason =
3405                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3406                    Cow::Owned(format!(
3407                        ". The collectible is transient because it depends on:\n{}",
3408                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3409                    ))
3410                } else {
3411                    Cow::Borrowed("")
3412                };
3413            // this should never happen: How would a persistent function get a transient Vc?
3414            panic!(
3415                "Collectible is transient, transient collectibles cannot be emitted from \
3416                 persistent tasks{transient_reason}",
3417            )
3418        }
3419    }
3420}
3421
3422impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3423    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3424        self.0.startup(turbo_tasks);
3425    }
3426
3427    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3428        self.0.stopping();
3429    }
3430
3431    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3432        self.0.stop(turbo_tasks);
3433    }
3434
3435    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3436        self.0.idle_start(turbo_tasks);
3437    }
3438
3439    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3440        self.0.idle_end();
3441    }
3442
3443    fn get_or_create_task(
3444        &self,
3445        native_fn: &'static NativeFunction,
3446        this: Option<RawVc>,
3447        arg: &mut dyn StackDynTaskInputs,
3448        parent_task: Option<TaskId>,
3449        persistence: TaskPersistence,
3450        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3451    ) -> TaskId {
3452        self.0
3453            .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3454    }
3455
3456    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3457        self.0.invalidate_task(task_id, turbo_tasks);
3458    }
3459
3460    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3461        self.0.invalidate_tasks(tasks, turbo_tasks);
3462    }
3463
3464    fn invalidate_tasks_set(
3465        &self,
3466        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3467        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3468    ) {
3469        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3470    }
3471
3472    fn invalidate_serialization(
3473        &self,
3474        task_id: TaskId,
3475        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3476    ) {
3477        self.0.invalidate_serialization(task_id, turbo_tasks);
3478    }
3479
3480    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3481        self.0.task_execution_canceled(task, turbo_tasks)
3482    }
3483
3484    fn try_start_task_execution(
3485        &self,
3486        task_id: TaskId,
3487        priority: TaskPriority,
3488        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3489    ) -> Option<TaskExecutionSpec<'_>> {
3490        self.0
3491            .try_start_task_execution(task_id, priority, turbo_tasks)
3492    }
3493
3494    fn task_execution_completed(
3495        &self,
3496        task_id: TaskId,
3497        result: Result<RawVc, TurboTasksExecutionError>,
3498        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3499        #[cfg(feature = "verify_determinism")] stateful: bool,
3500        has_invalidator: bool,
3501        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3502    ) -> Option<TaskPriority> {
3503        self.0.task_execution_completed(
3504            task_id,
3505            result,
3506            cell_counters,
3507            #[cfg(feature = "verify_determinism")]
3508            stateful,
3509            has_invalidator,
3510            turbo_tasks,
3511        )
3512    }
3513
3514    type BackendJob = TurboTasksBackendJob;
3515
3516    fn run_backend_job<'a>(
3517        &'a self,
3518        job: Self::BackendJob,
3519        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3520    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3521        self.0.run_backend_job(job, turbo_tasks)
3522    }
3523
3524    fn try_read_task_output(
3525        &self,
3526        task_id: TaskId,
3527        reader: Option<TaskId>,
3528        options: ReadOutputOptions,
3529        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3530    ) -> Result<Result<RawVc, EventListener>> {
3531        self.0
3532            .try_read_task_output(task_id, reader, options, turbo_tasks)
3533    }
3534
3535    fn try_read_task_cell(
3536        &self,
3537        task_id: TaskId,
3538        cell: CellId,
3539        reader: Option<TaskId>,
3540        options: ReadCellOptions,
3541        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3542    ) -> Result<Result<TypedCellContent, EventListener>> {
3543        self.0
3544            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3545    }
3546
3547    fn try_read_own_task_cell(
3548        &self,
3549        task_id: TaskId,
3550        cell: CellId,
3551        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3552    ) -> Result<TypedCellContent> {
3553        self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3554    }
3555
3556    fn read_task_collectibles(
3557        &self,
3558        task_id: TaskId,
3559        collectible_type: TraitTypeId,
3560        reader: Option<TaskId>,
3561        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3562    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3563        self.0
3564            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3565    }
3566
3567    fn emit_collectible(
3568        &self,
3569        collectible_type: TraitTypeId,
3570        collectible: RawVc,
3571        task_id: TaskId,
3572        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3573    ) {
3574        self.0
3575            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3576    }
3577
3578    fn unemit_collectible(
3579        &self,
3580        collectible_type: TraitTypeId,
3581        collectible: RawVc,
3582        count: u32,
3583        task_id: TaskId,
3584        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3585    ) {
3586        self.0
3587            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3588    }
3589
3590    fn update_task_cell(
3591        &self,
3592        task_id: TaskId,
3593        cell: CellId,
3594        content: CellContent,
3595        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3596        content_hash: Option<CellHash>,
3597        verification_mode: VerificationMode,
3598        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3599    ) {
3600        self.0.update_task_cell(
3601            task_id,
3602            cell,
3603            content,
3604            updated_key_hashes,
3605            content_hash,
3606            verification_mode,
3607            turbo_tasks,
3608        );
3609    }
3610
3611    fn mark_own_task_as_finished(
3612        &self,
3613        task_id: TaskId,
3614        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3615    ) {
3616        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3617    }
3618
3619    fn connect_task(
3620        &self,
3621        task: TaskId,
3622        parent_task: Option<TaskId>,
3623        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3624    ) {
3625        self.0.connect_task(task, parent_task, turbo_tasks);
3626    }
3627
3628    fn create_transient_task(
3629        &self,
3630        task_type: TransientTaskType,
3631        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3632    ) -> TaskId {
3633        self.0.create_transient_task(task_type)
3634    }
3635
3636    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3637        self.0.dispose_root_task(task_id, turbo_tasks);
3638    }
3639
3640    fn task_statistics(&self) -> &TaskStatisticsApi {
3641        &self.0.task_statistics
3642    }
3643
3644    fn is_tracking_dependencies(&self) -> bool {
3645        self.0.options.dependency_tracking
3646    }
3647
3648    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3649        self.0.get_task_name(task, turbo_tasks)
3650    }
3651}
3652
3653enum DebugTraceTransientTask {
3654    Cached {
3655        task_name: &'static str,
3656        cell_type_id: Option<ValueTypeId>,
3657        cause_self: Option<Box<DebugTraceTransientTask>>,
3658        cause_args: Vec<DebugTraceTransientTask>,
3659    },
3660    /// This representation is used when this task is a duplicate of one previously shown
3661    Collapsed {
3662        task_name: &'static str,
3663        cell_type_id: Option<ValueTypeId>,
3664    },
3665    Uncached {
3666        cell_type_id: Option<ValueTypeId>,
3667    },
3668}
3669
3670impl DebugTraceTransientTask {
3671    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3672        let indent = "    ".repeat(level);
3673        f.write_str(&indent)?;
3674
3675        fn fmt_cell_type_id(
3676            f: &mut fmt::Formatter<'_>,
3677            cell_type_id: Option<ValueTypeId>,
3678        ) -> fmt::Result {
3679            if let Some(ty) = cell_type_id {
3680                write!(
3681                    f,
3682                    " (read cell of type {})",
3683                    get_value_type(ty).ty.global_name
3684                )
3685            } else {
3686                Ok(())
3687            }
3688        }
3689
3690        // write the name and type
3691        match self {
3692            Self::Cached {
3693                task_name,
3694                cell_type_id,
3695                ..
3696            }
3697            | Self::Collapsed {
3698                task_name,
3699                cell_type_id,
3700                ..
3701            } => {
3702                f.write_str(task_name)?;
3703                fmt_cell_type_id(f, *cell_type_id)?;
3704                if matches!(self, Self::Collapsed { .. }) {
3705                    f.write_str(" (collapsed)")?;
3706                }
3707            }
3708            Self::Uncached { cell_type_id } => {
3709                f.write_str("unknown transient task")?;
3710                fmt_cell_type_id(f, *cell_type_id)?;
3711            }
3712        }
3713        f.write_char('\n')?;
3714
3715        // write any extra "cause" information we might have
3716        if let Self::Cached {
3717            cause_self,
3718            cause_args,
3719            ..
3720        } = self
3721        {
3722            if let Some(c) = cause_self {
3723                writeln!(f, "{indent}  self:")?;
3724                c.fmt_indented(f, level + 1)?;
3725            }
3726            if !cause_args.is_empty() {
3727                writeln!(f, "{indent}  args:")?;
3728                for c in cause_args {
3729                    c.fmt_indented(f, level + 1)?;
3730                }
3731            }
3732        }
3733        Ok(())
3734    }
3735}
3736
3737impl fmt::Display for DebugTraceTransientTask {
3738    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3739        self.fmt_indented(f, 0)
3740    }
3741}
3742
3743// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3744fn far_future() -> Instant {
3745    // Roughly 30 years from now.
3746    // API does not provide a way to obtain max `Instant`
3747    // or convert specific date in the future to instant.
3748    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3749    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3750}
3751
3752/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3753/// buffer.
3754/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3755/// resulting buffer sizes.
3756///
3757/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3758/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3759/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3760/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3761/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3762/// spurious `Result` threading throughout the encode path.
3763fn encode_task_data(
3764    task: TaskId,
3765    data: &TaskStorage,
3766    category: SpecificTaskDataCategory,
3767    scratch_buffer: &mut TurboBincodeBuffer,
3768) -> Result<TurboBincodeBuffer> {
3769    scratch_buffer.clear();
3770    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3771    data.encode(category, &mut encoder)?;
3772
3773    if cfg!(feature = "verify_serialization") {
3774        TaskStorage::new()
3775            .decode(
3776                category,
3777                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3778            )
3779            .with_context(|| {
3780                format!(
3781                    "expected to be able to decode serialized data for '{category:?}' information \
3782                     for {task}"
3783                )
3784            })?;
3785    }
3786    Ok(SmallVec::from_slice(scratch_buffer))
3787}