Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9    borrow::Cow,
10    fmt::{self, Write},
11    future::Future,
12    hash::BuildHasherDefault,
13    mem::take,
14    pin::Pin,
15    sync::{
16        Arc, LazyLock,
17        atomic::{AtomicBool, Ordering},
18    },
19    time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32    CellId, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions,
33    ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, TaskExecutionReason,
34    TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic,
35    ValueTypeId,
36    backend::{
37        Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
38        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40        VerificationMode,
41    },
42    event::{Event, EventDescription, EventListener},
43    macro_helpers::NativeFunction,
44    message_queue::{TimingEvent, TraceEvent},
45    registry::get_value_type,
46    scope::scope_and_block,
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51
52pub use self::{
53    operation::AnyOperation,
54    storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
55};
56#[cfg(feature = "trace_task_dirty")]
57use crate::backend::operation::TaskDirtyCause;
58use crate::{
59    backend::{
60        operation::{
61            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64            connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65            prepare_new_children,
66        },
67        snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68        storage::Storage,
69        storage_schema::{TaskStorage, TaskStorageAccessors},
70    },
71    backing_storage::{BackingStorage, SnapshotItem, SnapshotMeta, compute_task_type_hash},
72    data::{
73        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
74        InProgressState, InProgressStateInner, OutputValue, TransientTask,
75    },
76    error::TaskError,
77    utils::{
78        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79        shard_amount::compute_shard_amount,
80    },
81};
82
83/// Threshold for parallelizing making dependent tasks dirty.
84/// If the number of dependent tasks exceeds this threshold,
85/// the operation will be parallelized.
86const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88/// Configurable idle timeout for snapshot persistence.
89/// Defaults to 2 seconds if not set or if the value is invalid.
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92        .ok()
93        .and_then(|v| v.parse::<u64>().ok())
94        .map(Duration::from_millis)
95        .unwrap_or(Duration::from_secs(2))
96});
97
98/// Priority used to re-schedule a task that became stale during execution.
99///
100/// Stale tasks must run again, but at a priority that reflects why they're being re-run rather
101/// than the (likely higher) priority of the original schedule. We use invalidation priority
102/// based on the task's leaf distance, parented under either the task's current dirty priority
103/// or `leaf()` if it is no longer dirty.
104fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105    TaskPriority::invalidation(
106        task.get_leaf_distance()
107            .copied()
108            .unwrap_or_default()
109            .distance,
110    )
111    .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115    /// Queries the storage for cache entries that don't exist locally.
116    ReadOnly,
117    /// Queries the storage for cache entries that don't exist locally.
118    /// Regularly pushes changes to the backing storage.
119    ReadWrite,
120    /// Queries the storage for cache entries that don't exist locally.
121    /// On shutdown, pushes all changes to the backing storage.
122    ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126    /// Enables dependency tracking.
127    ///
128    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
129    /// forever.
130    pub dependency_tracking: bool,
131
132    /// Enables active tracking.
133    ///
134    /// Automatically disabled when `dependency_tracking` is disabled.
135    ///
136    /// When disabled: All tasks are considered as active.
137    pub active_tracking: bool,
138
139    /// Enables the backing storage.
140    pub storage_mode: Option<StorageMode>,
141
142    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
143    /// datastructures. If `None`, it will use the available parallelism.
144    pub num_workers: Option<usize>,
145
146    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
147    pub small_preallocation: bool,
148
149    /// When enabled, evict all evictable tasks from in-memory storage after every snapshot.
150    /// This reclaims memory by clearing persisted data that can be re-loaded from disk on demand.
151    /// This is an EXPERIMENTAL FEATURE under development
152    pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156    fn default() -> Self {
157        Self {
158            dependency_tracking: true,
159            active_tracking: true,
160            storage_mode: Some(StorageMode::ReadWrite),
161            num_workers: None,
162            small_preallocation: false,
163            evict_after_snapshot: false,
164        }
165    }
166}
167
168pub enum TurboTasksBackendJob {
169    Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175    options: BackendOptions,
176
177    start_time: Instant,
178
179    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182    storage: Storage,
183
184    /// Coordinates the operation/snapshot interleaving protocol. See
185    /// [`SnapshotCoordinator`] for details.
186    snapshot_coord: SnapshotCoordinator,
187    /// Serializes calls to `snapshot_and_persist`. The coordinator's
188    /// `begin_snapshot` asserts that snapshots don't overlap; this mutex
189    /// enforces that contract for our two callers (background loop and
190    /// `stop_and_wait`).
191    snapshot_in_progress: Mutex<()>,
192
193    stopping: AtomicBool,
194    stopping_event: Event,
195    idle_start_event: Event,
196    idle_end_event: Event,
197    #[cfg(feature = "verify_aggregation_graph")]
198    is_idle: AtomicBool,
199
200    task_statistics: TaskStatisticsApi,
201
202    backing_storage: B,
203
204    #[cfg(feature = "verify_aggregation_graph")]
205    root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210        Self(Arc::new(TurboTasksBackendInner::new(
211            options,
212            backing_storage,
213        )))
214    }
215
216    pub fn backing_storage(&self) -> &B {
217        &self.0.backing_storage
218    }
219
220    /// Perform a snapshot and then evict all evictable tasks from memory.
221    ///
222    /// This is exposed for integration tests that need to verify the
223    /// snapshot → evict → restore cycle works correctly.
224    ///
225    /// Returns `(snapshot_had_new_data, eviction_counts)`.
226    pub fn snapshot_and_evict_for_testing(
227        &self,
228        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
229    ) -> (bool, EvictionCounts) {
230        self.0.snapshot_and_evict_for_testing(turbo_tasks)
231    }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237        if !options.dependency_tracking {
238            options.active_tracking = false;
239        }
240        let small_preallocation = options.small_preallocation;
241        let next_task_id = backing_storage
242            .next_free_task_id()
243            .expect("Failed to get task id");
244        Self {
245            options,
246            start_time: Instant::now(),
247            persisted_task_id_factory: IdFactoryWithReuse::new(
248                next_task_id,
249                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250            ),
251            transient_task_id_factory: IdFactoryWithReuse::new(
252                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253                TaskId::MAX,
254            ),
255            storage: Storage::new(shard_amount, small_preallocation),
256            snapshot_coord: SnapshotCoordinator::new(),
257            snapshot_in_progress: Mutex::new(()),
258            stopping: AtomicBool::new(false),
259            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262            #[cfg(feature = "verify_aggregation_graph")]
263            is_idle: AtomicBool::new(false),
264            task_statistics: TaskStatisticsApi::default(),
265            backing_storage,
266            #[cfg(feature = "verify_aggregation_graph")]
267            root_tasks: Default::default(),
268        }
269    }
270
271    fn execute_context<'a>(
272        &'a self,
273        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
274    ) -> impl ExecuteContext<'a> {
275        ExecuteContextImpl::new(self, turbo_tasks)
276    }
277
278    fn suspending_requested(&self) -> bool {
279        self.should_persist() && self.snapshot_coord.snapshot_pending()
280    }
281
282    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283        if self.should_persist() {
284            self.snapshot_coord.suspend_point(suspend);
285        }
286    }
287
288    pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289        if !self.should_persist() {
290            return OperationGuard::noop();
291        }
292        self.snapshot_coord.begin_operation()
293    }
294
295    fn should_persist(&self) -> bool {
296        matches!(
297            self.options.storage_mode,
298            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299        )
300    }
301
302    fn should_evict(&self) -> bool {
303        self.options.evict_after_snapshot && self.should_persist()
304    }
305
306    /// Perform a snapshot and then evict all evictable tasks from memory.
307    ///
308    /// This is exposed for integration tests that need to verify the
309    /// snapshot → evict → restore cycle works correctly.
310    ///
311    /// Returns `(snapshot_had_new_data, eviction_counts)`.
312    #[doc(hidden)]
313    pub fn snapshot_and_evict_for_testing(
314        &self,
315        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
316    ) -> (bool, EvictionCounts) {
317        assert!(
318            self.should_persist(),
319            "snapshot_and_evict requires persistence"
320        );
321        let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322        let had_new_data = match snapshot_result {
323            Ok((_, new_data)) => new_data,
324            Err(_) => {
325                // Snapshot/persist failed — skip eviction since the data may not
326                // be on disk yet. Evicting now could lose in-memory state that
327                // can't be restored.
328                return (false, EvictionCounts::default());
329            }
330        };
331        let counts = self.storage.evict_after_snapshot(None);
332        (had_new_data, counts)
333    }
334
335    fn should_restore(&self) -> bool {
336        self.options.storage_mode.is_some()
337    }
338
339    fn should_track_dependencies(&self) -> bool {
340        self.options.dependency_tracking
341    }
342
343    fn should_track_activeness(&self) -> bool {
344        self.options.active_tracking
345    }
346
347    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348        self.task_statistics
349            .map(|stats| stats.increment_cache_hit(native_fn));
350    }
351
352    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353        self.task_statistics
354            .map(|stats| stats.increment_cache_miss(native_fn));
355    }
356
357    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
358    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
359    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
360    /// references for lazy name resolution.
361    fn task_error_to_turbo_tasks_execution_error(
362        &self,
363        error: &TaskError,
364        ctx: &mut impl ExecuteContext<'_>,
365    ) -> TurboTasksExecutionError {
366        match error {
367            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369                message: item.message.clone(),
370                source: item
371                    .source
372                    .as_ref()
373                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374            })),
375            TaskError::LocalTaskContext(local_task_context) => {
376                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377                    name: local_task_context.name.clone(),
378                    source: local_task_context
379                        .source
380                        .as_ref()
381                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382                }))
383            }
384            TaskError::TaskChain(chain) => {
385                let task_id = chain.last().unwrap();
386                let error = {
387                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
388                    if let Some(OutputValue::Error(error)) = task.get_output() {
389                        Some(error.clone())
390                    } else {
391                        None
392                    }
393                };
394                let error = error.map_or_else(
395                    || {
396                        // Eventual consistency will cause errors to no longer be available
397                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399                                "Error no longer available",
400                            )),
401                            location: None,
402                        }))
403                    },
404                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405                );
406                let mut current_error = error;
407                for &task_id in chain.iter().rev() {
408                    current_error =
409                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410                            task_id,
411                            source: Some(current_error),
412                            turbo_tasks: ctx.turbo_tasks(),
413                        }));
414                }
415                current_error
416            }
417        }
418    }
419}
420
421/// Intermediate result of step 1 of task execution completion.
422struct TaskExecutionCompletePrepareResult {
423    pub new_children: FxHashSet<TaskId>,
424    pub is_now_immutable: bool,
425    #[cfg(feature = "verify_determinism")]
426    pub no_output_set: bool,
427    pub new_output: Option<OutputValue>,
428    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
429    pub is_recomputation: bool,
430    pub is_session_dependent: bool,
431}
432
433// Operations
434impl<B: BackingStorage> TurboTasksBackendInner<B> {
435    fn try_read_task_output(
436        self: &Arc<Self>,
437        task_id: TaskId,
438        reader: Option<TaskId>,
439        options: ReadOutputOptions,
440        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
441    ) -> Result<Result<RawVc, EventListener>> {
442        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
443
444        let mut ctx = self.execute_context(turbo_tasks);
445        let need_reader_task = if self.should_track_dependencies()
446            && !matches!(options.tracking, ReadTracking::Untracked)
447            && reader.is_some_and(|reader_id| reader_id != task_id)
448            && let Some(reader_id) = reader
449            && reader_id != task_id
450        {
451            Some(reader_id)
452        } else {
453            None
454        };
455        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
456            // Having a task_pair here is not optimal, but otherwise this would lead to a race
457            // condition. See below.
458            // TODO(sokra): solve that in a more performant way.
459            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
460            (task, Some(reader))
461        } else {
462            (ctx.task(task_id, TaskDataCategory::All), None)
463        };
464
465        fn listen_to_done_event(
466            reader_description: Option<EventDescription>,
467            tracking: ReadTracking,
468            done_event: &Event,
469        ) -> EventListener {
470            done_event.listen_with_note(move || {
471                move || {
472                    if let Some(reader_description) = reader_description.as_ref() {
473                        format!(
474                            "try_read_task_output from {} ({})",
475                            reader_description, tracking
476                        )
477                    } else {
478                        format!("try_read_task_output ({})", tracking)
479                    }
480                }
481            })
482        }
483
484        fn check_in_progress(
485            task: &impl TaskGuard,
486            reader_description: Option<EventDescription>,
487            tracking: ReadTracking,
488        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
489        {
490            match task.get_in_progress() {
491                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
492                    listen_to_done_event(reader_description, tracking, done_event),
493                ))),
494                Some(InProgressState::InProgress(box InProgressStateInner {
495                    done_event, ..
496                })) => Some(Ok(Err(listen_to_done_event(
497                    reader_description,
498                    tracking,
499                    done_event,
500                )))),
501                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
502                    "{} was canceled",
503                    task.get_task_description()
504                ))),
505                None => None,
506            }
507        }
508
509        if matches!(options.consistency, ReadConsistency::Strong) {
510            if task
511                .get_persistent_task_type()
512                .is_some_and(|t| !t.native_fn.is_root)
513            {
514                drop(task);
515                drop(reader_task);
516                panic!(
517                    "Strongly consistent read of non-root task {} (reader: {}). The `root` \
518                     attribute is missing on the task.",
519                    self.debug_get_task_description(task_id),
520                    reader.map_or_else(
521                        || "unknown".to_string(),
522                        |r| self.debug_get_task_description(r)
523                    )
524                );
525            }
526
527            let is_dirty = task.is_dirty();
528
529            // Check the dirty count of the root node
530            let has_dirty_containers = task.has_dirty_containers();
531            if has_dirty_containers || is_dirty.is_some() {
532                let activeness = task.get_activeness_mut();
533                let mut task_ids_to_schedule: Vec<_> = Vec::new();
534                // When there are dirty task, subscribe to the all_clean_event
535                let activeness = if let Some(activeness) = activeness {
536                    // This makes sure all tasks stay active and this task won't stale.
537                    // active_until_clean is automatically removed when this
538                    // task is clean.
539                    activeness.set_active_until_clean();
540                    activeness
541                } else {
542                    // If we don't have a root state, add one. This also makes sure all tasks stay
543                    // active and this task won't stale. active_until_clean
544                    // is automatically removed when this task is clean.
545                    if ctx.should_track_activeness() {
546                        // A newly added Activeness need to make sure to schedule the tasks
547                        task_ids_to_schedule = task.dirty_containers().collect();
548                        task_ids_to_schedule.push(task_id);
549                    }
550                    let activeness =
551                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
552                    activeness.set_active_until_clean();
553                    activeness
554                };
555                let listener = activeness.all_clean_event.listen_with_note(move || {
556                    let this = self.clone();
557                    let tt = turbo_tasks.pin();
558                    move || {
559                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
560                        let mut ctx = this.execute_context(tt);
561                        let mut visited = FxHashSet::default();
562                        fn indent(s: &str) -> String {
563                            s.split_inclusive('\n')
564                                .flat_map(|line: &str| ["  ", line].into_iter())
565                                .collect::<String>()
566                        }
567                        fn get_info(
568                            ctx: &mut impl ExecuteContext<'_>,
569                            task_id: TaskId,
570                            parent_and_count: Option<(TaskId, i32)>,
571                            visited: &mut FxHashSet<TaskId>,
572                        ) -> String {
573                            let task = ctx.task(task_id, TaskDataCategory::All);
574                            let is_dirty = task.is_dirty();
575                            let in_progress =
576                                task.get_in_progress()
577                                    .map_or("not in progress", |p| match p {
578                                        InProgressState::InProgress(_) => "in progress",
579                                        InProgressState::Scheduled { .. } => "scheduled",
580                                        InProgressState::Canceled => "canceled",
581                                    });
582                            let activeness = task.get_activeness().map_or_else(
583                                || "not active".to_string(),
584                                |activeness| format!("{activeness:?}"),
585                            );
586                            let aggregation_number = get_aggregation_number(&task);
587                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
588                            {
589                                let uppers = get_uppers(&task);
590                                !uppers.contains(&parent_task_id)
591                            } else {
592                                false
593                            };
594
595                            // Check the dirty count of the root node
596                            let has_dirty_containers = task.has_dirty_containers();
597
598                            let task_description = task.get_task_description();
599                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
600                                format!(", dirty({parent_priority})")
601                            } else {
602                                String::new()
603                            };
604                            let has_dirty_containers_label = if has_dirty_containers {
605                                ", dirty containers"
606                            } else {
607                                ""
608                            };
609                            let count = if let Some((_, count)) = parent_and_count {
610                                format!(" {count}")
611                            } else {
612                                String::new()
613                            };
614                            let mut info = format!(
615                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
616                                 {in_progress}, \
617                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
618                            );
619                            let children: Vec<_> = task.dirty_containers_with_count().collect();
620                            drop(task);
621
622                            if missing_upper {
623                                info.push_str("\n  ERROR: missing upper connection");
624                            }
625
626                            if has_dirty_containers || !children.is_empty() {
627                                writeln!(info, "\n  dirty tasks:").unwrap();
628
629                                for (child_task_id, count) in children {
630                                    let task_description = ctx
631                                        .task(child_task_id, TaskDataCategory::Data)
632                                        .get_task_description();
633                                    if visited.insert(child_task_id) {
634                                        let child_info = get_info(
635                                            ctx,
636                                            child_task_id,
637                                            Some((task_id, count)),
638                                            visited,
639                                        );
640                                        info.push_str(&indent(&child_info));
641                                        if !info.ends_with('\n') {
642                                            info.push('\n');
643                                        }
644                                    } else {
645                                        writeln!(
646                                            info,
647                                            "  {child_task_id} {task_description} {count} \
648                                             (already visited)"
649                                        )
650                                        .unwrap();
651                                    }
652                                }
653                            }
654                            info
655                        }
656                        let info = get_info(&mut ctx, task_id, None, &mut visited);
657                        format!(
658                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
659                        )
660                    }
661                });
662                drop(reader_task);
663                drop(task);
664                if !task_ids_to_schedule.is_empty() {
665                    let mut queue = AggregationUpdateQueue::new();
666                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
667                    queue.execute(&mut ctx);
668                }
669
670                return Ok(Err(listener));
671            }
672        }
673
674        let reader_description = reader_task
675            .as_ref()
676            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
677        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
678        {
679            return value;
680        }
681
682        if let Some(output) = task.get_output() {
683            let result = match output {
684                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
685                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
686                OutputValue::Error(error) => Err(error.clone()),
687            };
688            if let Some(mut reader_task) = reader_task.take()
689                && options.tracking.should_track(result.is_err())
690                && (!task.immutable() || cfg!(feature = "verify_immutable"))
691            {
692                #[cfg(feature = "trace_task_output_dependencies")]
693                let _span = tracing::trace_span!(
694                    "add output dependency",
695                    task = %task_id,
696                    dependent_task = ?reader
697                )
698                .entered();
699                let mut queue = LeafDistanceUpdateQueue::new();
700                let reader = reader.unwrap();
701                if task.add_output_dependent(reader) {
702                    // Ensure that dependent leaf distance is strictly monotonic increasing
703                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
704                    let reader_leaf_distance =
705                        reader_task.get_leaf_distance().copied().unwrap_or_default();
706                    if reader_leaf_distance.distance <= leaf_distance.distance {
707                        queue.push(
708                            reader,
709                            leaf_distance.distance,
710                            leaf_distance.max_distance_in_buffer,
711                        );
712                    }
713                }
714
715                drop(task);
716
717                // Note: We use `task_pair` earlier to lock the task and its reader at the same
718                // time. If we didn't and just locked the reader here, an invalidation could occur
719                // between grabbing the locks. If that happened, and if the task is "outdated" or
720                // doesn't have the dependency edge yet, the invalidation would be lost.
721
722                if !reader_task.remove_outdated_output_dependencies(&task_id) {
723                    let _ = reader_task.add_output_dependencies(task_id);
724                }
725                drop(reader_task);
726
727                queue.execute(&mut ctx);
728            } else {
729                drop(task);
730            }
731
732            return result.map_err(|error| {
733                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
734                    .with_task_context(task_id, turbo_tasks.pin())
735                    .into()
736            });
737        }
738        drop(reader_task);
739
740        let note = EventDescription::new(|| {
741            move || {
742                if let Some(reader) = reader_description.as_ref() {
743                    format!("try_read_task_output (recompute) from {reader}",)
744                } else {
745                    "try_read_task_output (recompute, untracked)".to_string()
746                }
747            }
748        });
749
750        // Output doesn't exist. We need to schedule the task to compute it.
751        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
752            TaskExecutionReason::OutputNotAvailable,
753            EventDescription::new(|| task.get_task_desc_fn()),
754            note,
755        );
756
757        // It's not possible that the task is InProgress at this point. If it is InProgress {
758        // done: true } it must have Output and would early return.
759        let old = task.set_in_progress(in_progress_state);
760        debug_assert!(old.is_none(), "InProgress already exists");
761        ctx.schedule_task(task, TaskPriority::Recomputation);
762
763        Ok(Err(listener))
764    }
765
766    fn try_read_task_cell(
767        &self,
768        task_id: TaskId,
769        reader: Option<TaskId>,
770        cell: CellId,
771        options: ReadCellOptions,
772        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
773    ) -> Result<Result<TypedCellContent, EventListener>> {
774        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
775
776        fn add_cell_dependency(
777            task_id: TaskId,
778            mut task: impl TaskGuard,
779            reader: Option<TaskId>,
780            reader_task: Option<impl TaskGuard>,
781            cell: CellId,
782            key: Option<u64>,
783        ) {
784            if let Some(mut reader_task) = reader_task
785                && (!task.immutable() || cfg!(feature = "verify_immutable"))
786            {
787                let reader = reader.unwrap();
788                let _ = task.add_cell_dependents((cell, key, reader));
789                drop(task);
790
791                // Note: We use `task_pair` earlier to lock the task and its reader at the same
792                // time. If we didn't and just locked the reader here, an invalidation could occur
793                // between grabbing the locks. If that happened, and if the task is "outdated" or
794                // doesn't have the dependency edge yet, the invalidation would be lost.
795
796                let target = CellRef {
797                    task: task_id,
798                    cell,
799                };
800                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
801                    let _ = reader_task.add_cell_dependencies((target, key));
802                }
803                drop(reader_task);
804            }
805        }
806
807        let ReadCellOptions {
808            tracking,
809            final_read_hint,
810        } = options;
811
812        let mut ctx = self.execute_context(turbo_tasks);
813        let (mut task, reader_task) = if self.should_track_dependencies()
814            && !matches!(tracking, ReadCellTracking::Untracked)
815            && let Some(reader_id) = reader
816            && reader_id != task_id
817        {
818            // Having a task_pair here is not optimal, but otherwise this would lead to a race
819            // condition. See below.
820            // TODO(sokra): solve that in a more performant way.
821            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
822            (task, Some(reader))
823        } else {
824            (ctx.task(task_id, TaskDataCategory::All), None)
825        };
826
827        let content = if final_read_hint {
828            task.remove_cell_data(&cell)
829        } else {
830            task.get_cell_data(&cell).cloned()
831        };
832        if let Some(content) = content {
833            if tracking.should_track(false) {
834                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
835            }
836            return Ok(Ok(TypedCellContent(
837                cell.type_id,
838                CellContent(Some(content)),
839            )));
840        }
841
842        let in_progress = task.get_in_progress();
843        if matches!(
844            in_progress,
845            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
846        ) {
847            return Ok(Err(self
848                .listen_to_cell(&mut task, task_id, &reader_task, cell)
849                .0));
850        }
851        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
852
853        // Check cell index range (cell might not exist at all)
854        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
855        let Some(max_id) = max_id else {
856            let task_desc = task.get_task_description();
857            if tracking.should_track(true) {
858                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
859            }
860            bail!(
861                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
862            );
863        };
864        if cell.index >= max_id {
865            let task_desc = task.get_task_description();
866            if tracking.should_track(true) {
867                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
868            }
869            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
870        }
871
872        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
873        // task to get the cell content.
874
875        // Bail early if the task was cancelled — no point in registering a listener
876        // on a task that won't execute again.
877        if is_cancelled {
878            bail!("{} was canceled", task.get_task_description());
879        }
880
881        // Listen to the cell and potentially schedule the task
882        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
883        drop(reader_task);
884        if !new_listener {
885            return Ok(Err(listener));
886        }
887
888        let _span = tracing::trace_span!(
889            "recomputation",
890            cell_type = get_value_type(cell.type_id).ty.global_name,
891            cell_index = cell.index
892        )
893        .entered();
894
895        let _ = task.add_scheduled(
896            TaskExecutionReason::CellNotAvailable,
897            EventDescription::new(|| task.get_task_desc_fn()),
898        );
899        ctx.schedule_task(task, TaskPriority::Recomputation);
900
901        Ok(Err(listener))
902    }
903
904    fn listen_to_cell(
905        &self,
906        task: &mut impl TaskGuard,
907        task_id: TaskId,
908        reader_task: &Option<impl TaskGuard>,
909        cell: CellId,
910    ) -> (EventListener, bool) {
911        let note = || {
912            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
913            move || {
914                if let Some(reader_desc) = reader_desc.as_ref() {
915                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
916                } else {
917                    "try_read_task_cell (in progress, untracked)".to_string()
918                }
919            }
920        };
921        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
922            // Someone else is already computing the cell
923            let listener = in_progress.event.listen_with_note(note);
924            return (listener, false);
925        }
926        let in_progress = InProgressCellState::new(task_id, cell);
927        let listener = in_progress.event.listen_with_note(note);
928        let old = task.insert_in_progress_cells(cell, in_progress);
929        debug_assert!(old.is_none(), "InProgressCell already exists");
930        (listener, true)
931    }
932
933    fn snapshot_and_persist(
934        &self,
935        parent_span: Option<tracing::Id>,
936        reason: &str,
937        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
938    ) -> Result<(Instant, bool), anyhow::Error> {
939        let snapshot_span =
940            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
941                .entered();
942        // Serialize snapshots. The internal protocol (snapshot_mode, snapshot
943        // request bit, suspended_operations) assumes only one snapshot runs at
944        // a time. Held for the entire snapshot lifecycle.
945        let _snapshot_in_progress = self.snapshot_in_progress.lock();
946        let start = Instant::now();
947        // SystemTime for wall-clock timestamps in trace events (milliseconds
948        // since epoch). Instant is monotonic but has no defined epoch, so it
949        // can't be used for cross-process trace correlation.
950        let wall_start = SystemTime::now();
951        debug_assert!(self.should_persist());
952
953        let mut snapshot_phase = {
954            let _span = tracing::info_span!("blocking").entered();
955            self.snapshot_coord.begin_snapshot()
956        };
957        // Enter snapshot mode, which atomically reads and resets the modified count.
958        // Checking after start_snapshot ensures no concurrent increments can race.
959        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
960
961        let suspended_operations = snapshot_phase.take_suspended_operations();
962
963        let snapshot_time = Instant::now();
964        drop(snapshot_phase);
965
966        if !has_modifications {
967            // No tasks modified since the last snapshot — drop the guard (which
968            // calls end_snapshot) and skip the expensive O(N) scan.
969            drop(snapshot_guard);
970            return Ok((start, false));
971        }
972
973        #[cfg(feature = "print_cache_item_size")]
974        #[derive(Default)]
975        struct TaskCacheStats {
976            data: usize,
977            #[cfg(feature = "print_cache_item_size_with_compressed")]
978            data_compressed: usize,
979            data_count: usize,
980            meta: usize,
981            #[cfg(feature = "print_cache_item_size_with_compressed")]
982            meta_compressed: usize,
983            meta_count: usize,
984            upper_count: usize,
985            collectibles_count: usize,
986            aggregated_collectibles_count: usize,
987            children_count: usize,
988            followers_count: usize,
989            collectibles_dependents_count: usize,
990            aggregated_dirty_containers_count: usize,
991            output_size: usize,
992        }
993        /// Formats a byte size, optionally including the compressed size when the
994        /// `print_cache_item_size_with_compressed` feature is enabled.
995        #[cfg(feature = "print_cache_item_size")]
996        struct FormatSizes {
997            size: usize,
998            #[cfg(feature = "print_cache_item_size_with_compressed")]
999            compressed_size: usize,
1000        }
1001        #[cfg(feature = "print_cache_item_size")]
1002        impl std::fmt::Display for FormatSizes {
1003            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1004                use turbo_tasks::util::FormatBytes;
1005                #[cfg(feature = "print_cache_item_size_with_compressed")]
1006                {
1007                    write!(
1008                        f,
1009                        "{} ({} compressed)",
1010                        FormatBytes(self.size),
1011                        FormatBytes(self.compressed_size)
1012                    )
1013                }
1014                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1015                {
1016                    write!(f, "{}", FormatBytes(self.size))
1017                }
1018            }
1019        }
1020        #[cfg(feature = "print_cache_item_size")]
1021        impl TaskCacheStats {
1022            #[cfg(feature = "print_cache_item_size_with_compressed")]
1023            fn compressed_size(data: &[u8]) -> Result<usize> {
1024                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1025                    data,
1026                    &mut Vec::new(),
1027                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1028                )?)
1029            }
1030
1031            fn add_data(&mut self, data: &[u8]) {
1032                self.data += data.len();
1033                #[cfg(feature = "print_cache_item_size_with_compressed")]
1034                {
1035                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1036                }
1037                self.data_count += 1;
1038            }
1039
1040            fn add_meta(&mut self, data: &[u8]) {
1041                self.meta += data.len();
1042                #[cfg(feature = "print_cache_item_size_with_compressed")]
1043                {
1044                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1045                }
1046                self.meta_count += 1;
1047            }
1048
1049            fn add_counts(&mut self, storage: &TaskStorage) {
1050                let counts = storage.meta_counts();
1051                self.upper_count += counts.upper;
1052                self.collectibles_count += counts.collectibles;
1053                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1054                self.children_count += counts.children;
1055                self.followers_count += counts.followers;
1056                self.collectibles_dependents_count += counts.collectibles_dependents;
1057                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1058                if let Some(output) = storage.get_output() {
1059                    use turbo_bincode::turbo_bincode_encode;
1060
1061                    self.output_size += turbo_bincode_encode(&output)
1062                        .map(|data| data.len())
1063                        .unwrap_or(0);
1064                }
1065            }
1066
1067            /// Returns the task name used as the stats grouping key.
1068            fn task_name(storage: &TaskStorage) -> String {
1069                storage
1070                    .get_persistent_task_type()
1071                    .map(|t| t.to_string())
1072                    .unwrap_or_else(|| "<unknown>".to_string())
1073            }
1074
1075            /// Returns the primary sort key: compressed total when
1076            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1077            fn sort_key(&self) -> usize {
1078                #[cfg(feature = "print_cache_item_size_with_compressed")]
1079                {
1080                    self.data_compressed + self.meta_compressed
1081                }
1082                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1083                {
1084                    self.data + self.meta
1085                }
1086            }
1087
1088            fn format_total(&self) -> FormatSizes {
1089                FormatSizes {
1090                    size: self.data + self.meta,
1091                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1092                    compressed_size: self.data_compressed + self.meta_compressed,
1093                }
1094            }
1095
1096            fn format_data(&self) -> FormatSizes {
1097                FormatSizes {
1098                    size: self.data,
1099                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1100                    compressed_size: self.data_compressed,
1101                }
1102            }
1103
1104            fn format_avg_data(&self) -> FormatSizes {
1105                FormatSizes {
1106                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1107                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1108                    compressed_size: self
1109                        .data_compressed
1110                        .checked_div(self.data_count)
1111                        .unwrap_or(0),
1112                }
1113            }
1114
1115            fn format_meta(&self) -> FormatSizes {
1116                FormatSizes {
1117                    size: self.meta,
1118                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1119                    compressed_size: self.meta_compressed,
1120                }
1121            }
1122
1123            fn format_avg_meta(&self) -> FormatSizes {
1124                FormatSizes {
1125                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1126                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1127                    compressed_size: self
1128                        .meta_compressed
1129                        .checked_div(self.meta_count)
1130                        .unwrap_or(0),
1131                }
1132            }
1133        }
1134        #[cfg(feature = "print_cache_item_size")]
1135        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1136            Mutex::new(FxHashMap::default());
1137
1138        // Encode each task's modified categories. We only encode categories with `modified` set,
1139        // meaning the category was actually dirtied. Categories restored from disk but never
1140        // modified don't need re-persisting since the on-disk version is still valid.
1141        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1142        // flags were copied from the live task at snapshot creation time, reflecting which
1143        // categories were dirtied before the snapshot was taken.
1144        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1145            let encode_category = |task_id: TaskId,
1146                                   data: &TaskStorage,
1147                                   category: SpecificTaskDataCategory,
1148                                   buffer: &mut TurboBincodeBuffer|
1149             -> Option<TurboBincodeBuffer> {
1150                match encode_task_data(task_id, data, category, buffer) {
1151                    Ok(encoded) => {
1152                        #[cfg(feature = "print_cache_item_size")]
1153                        {
1154                            let mut stats = task_cache_stats.lock();
1155                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1156                            match category {
1157                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1158                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1159                            }
1160                        }
1161                        Some(encoded)
1162                    }
1163                    Err(err) => {
1164                        panic!(
1165                            "Serializing task {} failed ({:?}): {:?}",
1166                            self.debug_get_task_description(task_id),
1167                            category,
1168                            err
1169                        );
1170                    }
1171                }
1172            };
1173            if task_id.is_transient() {
1174                unreachable!("transient task_ids should never be enqueued to be persisted");
1175            }
1176
1177            let encode_meta = inner.flags.meta_modified();
1178            let encode_data = inner.flags.data_modified();
1179
1180            #[cfg(feature = "print_cache_item_size")]
1181            if encode_data || encode_meta {
1182                task_cache_stats
1183                    .lock()
1184                    .entry(TaskCacheStats::task_name(inner))
1185                    .or_default()
1186                    .add_counts(inner);
1187            }
1188
1189            let meta = if encode_meta {
1190                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1191            } else {
1192                None
1193            };
1194
1195            let data = if encode_data {
1196                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1197            } else {
1198                None
1199            };
1200            let task_type_hash = if inner.flags.new_task() {
1201                let task_type = inner.get_persistent_task_type().expect(
1202                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1203                     creation for persistent tasks uses a single ExecutionContextImpl for \
1204                     creating the task (which sets new_task) and connect_child (which sets \
1205                     persistent_task_type) and take_snapshot waits for all operations to complete \
1206                     or suspend before we start snapshotting.  So task creation will always set \
1207                     the task_type.",
1208                );
1209                Some(compute_task_type_hash(task_type))
1210            } else {
1211                None
1212            };
1213
1214            SnapshotItem {
1215                task_id,
1216                meta,
1217                data,
1218                task_type_hash,
1219            }
1220        };
1221
1222        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1223
1224        drop(snapshot_span);
1225        let snapshot_duration = start.elapsed();
1226        let task_count = task_snapshots.len();
1227
1228        if task_snapshots.is_empty() {
1229            // This should be impossible — if we got here, modified_count was nonzero, and every
1230            // modification that increments the count also failed during encoding.
1231            std::hint::cold_path();
1232            return Ok((snapshot_time, false));
1233        }
1234
1235        let persist_start = Instant::now();
1236        let span = tracing::info_span!(
1237            parent: parent_span,
1238            "persist",
1239            reason = reason,
1240            data_items= tracing::field::Empty,
1241            meta_items= tracing::field::Empty,
1242            task_cache_items= tracing::field::Empty,
1243            next_task_id= tracing::field::Empty,)
1244        .entered();
1245        {
1246            // Tasks were already consumed by take_snapshot, so a future snapshot
1247            // would not re-persist them — returning an error signals to the caller
1248            // that further persist attempts would corrupt the task graph in storage.
1249            let SnapshotMeta {
1250                task_cache_items,
1251                data_items,
1252                meta_items,
1253                max_next_task_id,
1254            } = self
1255                .backing_storage
1256                .save_snapshot(suspended_operations, task_snapshots)?;
1257            span.record("data_items", data_items);
1258            span.record("meta_items", meta_items);
1259            span.record("task_cache_items", task_cache_items);
1260            span.record("next_task_id", max_next_task_id);
1261
1262            #[cfg(feature = "print_cache_item_size")]
1263            {
1264                let mut task_cache_stats = task_cache_stats
1265                    .into_inner()
1266                    .into_iter()
1267                    .collect::<Vec<_>>();
1268                if !task_cache_stats.is_empty() {
1269                    use turbo_tasks::util::FormatBytes;
1270
1271                    use crate::utils::markdown_table::print_markdown_table;
1272
1273                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1274                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1275                    });
1276
1277                    println!(
1278                        "Task cache stats: {}",
1279                        FormatSizes {
1280                            size: task_cache_stats
1281                                .iter()
1282                                .map(|(_, s)| s.data + s.meta)
1283                                .sum::<usize>(),
1284                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1285                            compressed_size: task_cache_stats
1286                                .iter()
1287                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1288                                .sum::<usize>()
1289                        },
1290                    );
1291
1292                    print_markdown_table(
1293                        [
1294                            "Task",
1295                            " Total Size",
1296                            " Data Size",
1297                            " Data Count x Avg",
1298                            " Data Count x Avg",
1299                            " Meta Size",
1300                            " Meta Count x Avg",
1301                            " Meta Count x Avg",
1302                            " Uppers",
1303                            " Coll",
1304                            " Agg Coll",
1305                            " Children",
1306                            " Followers",
1307                            " Coll Deps",
1308                            " Agg Dirty",
1309                            " Output Size",
1310                        ],
1311                        task_cache_stats.iter(),
1312                        |(task_desc, stats)| {
1313                            [
1314                                task_desc.to_string(),
1315                                format!(" {}", stats.format_total()),
1316                                format!(" {}", stats.format_data()),
1317                                format!(" {} x", stats.data_count),
1318                                format!("{}", stats.format_avg_data()),
1319                                format!(" {}", stats.format_meta()),
1320                                format!(" {} x", stats.meta_count),
1321                                format!("{}", stats.format_avg_meta()),
1322                                format!(" {}", stats.upper_count),
1323                                format!(" {}", stats.collectibles_count),
1324                                format!(" {}", stats.aggregated_collectibles_count),
1325                                format!(" {}", stats.children_count),
1326                                format!(" {}", stats.followers_count),
1327                                format!(" {}", stats.collectibles_dependents_count),
1328                                format!(" {}", stats.aggregated_dirty_containers_count),
1329                                format!(" {}", FormatBytes(stats.output_size)),
1330                            ]
1331                        },
1332                    );
1333                }
1334            }
1335        }
1336
1337        let elapsed = start.elapsed();
1338        let persist_duration = persist_start.elapsed();
1339        // avoid spamming the event queue with information about fast operations
1340        if elapsed > Duration::from_secs(10) {
1341            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1342                "Finished writing to filesystem cache".to_string(),
1343                elapsed,
1344            )));
1345        }
1346
1347        let wall_start_ms = wall_start
1348            .duration_since(SystemTime::UNIX_EPOCH)
1349            .unwrap_or_default()
1350            // as_millis_f64 is not stable yet
1351            .as_secs_f64()
1352            * 1000.0;
1353        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1354        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1355            "turbopack-persistence",
1356            wall_start_ms,
1357            wall_end_ms,
1358            vec![
1359                ("reason", serde_json::Value::from(reason)),
1360                (
1361                    "snapshot_duration_ms",
1362                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1363                ),
1364                (
1365                    "persist_duration_ms",
1366                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1367                ),
1368                ("task_count", serde_json::Value::from(task_count)),
1369            ],
1370        )));
1371
1372        Ok((snapshot_time, true))
1373    }
1374
1375    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1376        if self.should_restore() {
1377            // Continue all uncompleted operations
1378            // They can't be interrupted by a snapshot since the snapshotting job has not been
1379            // scheduled yet.
1380            let uncompleted_operations = self
1381                .backing_storage
1382                .uncompleted_operations()
1383                .expect("Failed to get uncompleted operations");
1384            if !uncompleted_operations.is_empty() {
1385                let mut ctx = self.execute_context(turbo_tasks);
1386                for op in uncompleted_operations {
1387                    op.execute(&mut ctx);
1388                }
1389            }
1390        }
1391
1392        // Only when it should write regularly to the storage, we schedule the initial snapshot
1393        // job.
1394        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1395            // Schedule the snapshot job
1396            let _span = trace_span!("persisting background job").entered();
1397            let _span = tracing::info_span!("thread").entered();
1398            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1399        }
1400    }
1401
1402    fn stopping(&self) {
1403        self.stopping.store(true, Ordering::Release);
1404        self.stopping_event.notify(usize::MAX);
1405    }
1406
1407    #[allow(unused_variables)]
1408    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1409        #[cfg(feature = "verify_aggregation_graph")]
1410        {
1411            self.is_idle.store(false, Ordering::Release);
1412            self.verify_aggregation_graph(turbo_tasks, false);
1413        }
1414        if self.should_persist()
1415            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1416        {
1417            eprintln!("Persisting failed during shutdown: {err:?}");
1418        }
1419        self.storage.drop_contents();
1420        if let Err(err) = self.backing_storage.shutdown() {
1421            println!("Shutting down failed: {err}");
1422        }
1423    }
1424
1425    #[allow(unused_variables)]
1426    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1427        self.idle_start_event.notify(usize::MAX);
1428
1429        #[cfg(feature = "verify_aggregation_graph")]
1430        {
1431            use tokio::select;
1432
1433            self.is_idle.store(true, Ordering::Release);
1434            let this = self.clone();
1435            let turbo_tasks = turbo_tasks.pin();
1436            tokio::task::spawn(async move {
1437                select! {
1438                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1439                        // do nothing
1440                    }
1441                    _ = this.idle_end_event.listen() => {
1442                        return;
1443                    }
1444                }
1445                if !this.is_idle.load(Ordering::Relaxed) {
1446                    return;
1447                }
1448                this.verify_aggregation_graph(&*turbo_tasks, true);
1449            });
1450        }
1451    }
1452
1453    fn idle_end(&self) {
1454        #[cfg(feature = "verify_aggregation_graph")]
1455        self.is_idle.store(false, Ordering::Release);
1456        self.idle_end_event.notify(usize::MAX);
1457    }
1458
1459    fn get_or_create_task(
1460        &self,
1461        native_fn: &'static NativeFunction,
1462        this: Option<RawVc>,
1463        arg: &mut dyn StackDynTaskInputs,
1464        parent_task: Option<TaskId>,
1465        persistence: TaskPersistence,
1466        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1467    ) -> TaskId {
1468        let transient = matches!(persistence, TaskPersistence::Transient);
1469
1470        if transient
1471            && let Some(parent_task) = parent_task
1472            && !parent_task.is_transient()
1473        {
1474            let task_type = CachedTaskType {
1475                native_fn,
1476                this,
1477                arg: arg.take_box(),
1478            };
1479            self.panic_persistent_calling_transient(
1480                self.debug_get_task_description(parent_task),
1481                Some(&task_type),
1482                /* cell_id */ None,
1483            );
1484        }
1485
1486        let is_root = native_fn.is_root;
1487
1488        // Compute hash and shard index once from borrowed components (no heap allocation).
1489        let arg_ref = arg.as_ref();
1490        let hash = CachedTaskType::hash_from_components(
1491            self.storage.task_cache.hasher(),
1492            native_fn,
1493            this,
1494            arg_ref,
1495        );
1496        // Locate the shard once so that the read-only lookup and any
1497        // write-lock retry below share the same reference (saves a modulo +
1498        // memory lookup on the miss path).
1499        let shard = get_shard(&self.storage.task_cache, hash);
1500
1501        let mut ctx = self.execute_context(turbo_tasks);
1502        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1503        // Use a read lock rather than a write lock to avoid contention. connect_child
1504        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1505        if let Some(task_id) =
1506            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1507        {
1508            self.track_cache_hit_by_fn(native_fn);
1509            operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1510            return task_id;
1511        }
1512
1513        // Step 2: Check backing storage using borrowed components (no box needed yet).
1514
1515        // Task exists in backing storage.
1516        // We only need to insert it into the in-memory cache.
1517        let task_id = if !transient
1518            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1519        {
1520            self.track_cache_hit_by_fn(native_fn);
1521            // Step 3a: Insert into in-memory cache using the pre-located shard.
1522            // Use the existing Arc from storage to avoid a duplicate allocation.
1523            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1524                k.eq_components(native_fn, this, arg_ref)
1525            }) {
1526                RawEntry::Occupied(_) => {}
1527                RawEntry::Vacant(e) => {
1528                    e.insert(stored_type, task_id);
1529                }
1530            };
1531            task_id
1532        } else {
1533            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1534                k.eq_components(native_fn, this, arg_ref)
1535            }) {
1536                RawEntry::Occupied(e) => {
1537                    // Another thread beat us to creating this task — use their task_id.
1538                    // They will handle logging the new task as modified.
1539                    let task_id = *e.get();
1540                    drop(e);
1541                    self.track_cache_hit_by_fn(native_fn);
1542                    task_id
1543                }
1544                RawEntry::Vacant(e) => {
1545                    // Only now do we force the allocation.
1546                    // NOTE: if our caller had to perform resolution, then this will have already
1547                    // been boxed and take_box just takes it.
1548                    let task_type = Arc::new(CachedTaskType {
1549                        native_fn,
1550                        this,
1551                        arg: arg.take_box(),
1552                    });
1553                    let task_id = if transient {
1554                        self.transient_task_id_factory.get()
1555                    } else {
1556                        self.persisted_task_id_factory.get()
1557                    };
1558                    // Initialize storage BEFORE making task_id visible in the cache.
1559                    // This ensures any thread that reads task_id from the cache sees
1560                    // the storage entry already initialized (restored flags set).
1561                    self.storage
1562                        .initialize_new_task(task_id, Some(task_type.clone()));
1563                    // insert() consumes e, releasing the shard write lock.
1564                    e.insert(task_type, task_id);
1565                    self.track_cache_miss_by_fn(native_fn);
1566                    // Update the aggregation number before connecting the child
1567                    // We don't need this on any of the task recovery paths above because the
1568                    // aggregation number will already be set.
1569                    if is_root {
1570                        AggregationUpdateQueue::run(
1571                            AggregationUpdateJob::UpdateAggregationNumber {
1572                                task_id,
1573                                base_aggregation_number: u32::MAX,
1574                                distance: None,
1575                            },
1576                            &mut ctx,
1577                        );
1578                    } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1579                        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1580                        AggregationUpdateQueue::run(
1581                            AggregationUpdateJob::UpdateAggregationNumber {
1582                                task_id,
1583                                base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1584                                distance: None,
1585                            },
1586                            &mut ctx,
1587                        );
1588                    };
1589
1590                    task_id
1591                }
1592            }
1593        };
1594
1595        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1596
1597        task_id
1598    }
1599
1600    /// Generate an object that implements [`fmt::Display`] explaining why the given
1601    /// [`CachedTaskType`] is transient.
1602    fn debug_trace_transient_task(
1603        &self,
1604        task_type: &CachedTaskType,
1605        cell_id: Option<CellId>,
1606    ) -> DebugTraceTransientTask {
1607        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1608        // from tracing the same task many times, so use a visited_set
1609        fn inner_id(
1610            backend: &TurboTasksBackendInner<impl BackingStorage>,
1611            task_id: TaskId,
1612            cell_type_id: Option<ValueTypeId>,
1613            visited_set: &mut FxHashSet<TaskId>,
1614        ) -> DebugTraceTransientTask {
1615            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1616                if visited_set.contains(&task_id) {
1617                    let task_name = task_type.get_name();
1618                    DebugTraceTransientTask::Collapsed {
1619                        task_name,
1620                        cell_type_id,
1621                    }
1622                } else {
1623                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1624                }
1625            } else {
1626                DebugTraceTransientTask::Uncached { cell_type_id }
1627            }
1628        }
1629        fn inner_cached(
1630            backend: &TurboTasksBackendInner<impl BackingStorage>,
1631            task_type: &CachedTaskType,
1632            cell_type_id: Option<ValueTypeId>,
1633            visited_set: &mut FxHashSet<TaskId>,
1634        ) -> DebugTraceTransientTask {
1635            let task_name = task_type.get_name();
1636
1637            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1638                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1639                    // `task_id` should never be `None` at this point, as that would imply a
1640                    // non-local task is returning a local `Vc`...
1641                    // Just ignore if it happens, as we're likely already panicking.
1642                    return None;
1643                };
1644                if task_id.is_transient() {
1645                    Some(Box::new(inner_id(
1646                        backend,
1647                        task_id,
1648                        cause_self_raw_vc.try_get_type_id(),
1649                        visited_set,
1650                    )))
1651                } else {
1652                    None
1653                }
1654            });
1655            let cause_args = task_type
1656                .arg
1657                .get_raw_vcs()
1658                .into_iter()
1659                .filter_map(|raw_vc| {
1660                    let Some(task_id) = raw_vc.try_get_task_id() else {
1661                        // `task_id` should never be `None` (see comment above)
1662                        return None;
1663                    };
1664                    if !task_id.is_transient() {
1665                        return None;
1666                    }
1667                    Some((task_id, raw_vc.try_get_type_id()))
1668                })
1669                .collect::<IndexSet<_>>() // dedupe
1670                .into_iter()
1671                .map(|(task_id, cell_type_id)| {
1672                    inner_id(backend, task_id, cell_type_id, visited_set)
1673                })
1674                .collect();
1675
1676            DebugTraceTransientTask::Cached {
1677                task_name,
1678                cell_type_id,
1679                cause_self,
1680                cause_args,
1681            }
1682        }
1683        inner_cached(
1684            self,
1685            task_type,
1686            cell_id.map(|c| c.type_id),
1687            &mut FxHashSet::default(),
1688        )
1689    }
1690
1691    fn invalidate_task(
1692        &self,
1693        task_id: TaskId,
1694        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1695    ) {
1696        if !self.should_track_dependencies() {
1697            panic!("Dependency tracking is disabled so invalidation is not allowed");
1698        }
1699        operation::InvalidateOperation::run(
1700            smallvec![task_id],
1701            #[cfg(feature = "trace_task_dirty")]
1702            TaskDirtyCause::Invalidator,
1703            self.execute_context(turbo_tasks),
1704        );
1705    }
1706
1707    fn invalidate_tasks(
1708        &self,
1709        tasks: &[TaskId],
1710        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1711    ) {
1712        if !self.should_track_dependencies() {
1713            panic!("Dependency tracking is disabled so invalidation is not allowed");
1714        }
1715        operation::InvalidateOperation::run(
1716            tasks.iter().copied().collect(),
1717            #[cfg(feature = "trace_task_dirty")]
1718            TaskDirtyCause::Unknown,
1719            self.execute_context(turbo_tasks),
1720        );
1721    }
1722
1723    fn invalidate_tasks_set(
1724        &self,
1725        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1726        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1727    ) {
1728        if !self.should_track_dependencies() {
1729            panic!("Dependency tracking is disabled so invalidation is not allowed");
1730        }
1731        operation::InvalidateOperation::run(
1732            tasks.iter().copied().collect(),
1733            #[cfg(feature = "trace_task_dirty")]
1734            TaskDirtyCause::Unknown,
1735            self.execute_context(turbo_tasks),
1736        );
1737    }
1738
1739    fn invalidate_serialization(
1740        &self,
1741        task_id: TaskId,
1742        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1743    ) {
1744        if task_id.is_transient() {
1745            return;
1746        }
1747        let mut ctx = self.execute_context(turbo_tasks);
1748        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1749        task.invalidate_serialization();
1750    }
1751
1752    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1753        let task = self.storage.access_mut(task_id);
1754        if let Some(value) = task.get_persistent_task_type() {
1755            format!("{task_id:?} {}", value)
1756        } else if let Some(value) = task.get_transient_task_type() {
1757            format!("{task_id:?} {}", value)
1758        } else {
1759            format!("{task_id:?} unknown")
1760        }
1761    }
1762
1763    fn get_task_name(
1764        &self,
1765        task_id: TaskId,
1766        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1767    ) -> String {
1768        let mut ctx = self.execute_context(turbo_tasks);
1769        let task = ctx.task(task_id, TaskDataCategory::Data);
1770        if let Some(value) = task.get_persistent_task_type() {
1771            value.to_string()
1772        } else if let Some(value) = task.get_transient_task_type() {
1773            value.to_string()
1774        } else {
1775            "unknown".to_string()
1776        }
1777    }
1778
1779    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1780        let task = self.storage.access_mut(task_id);
1781        task.get_persistent_task_type().cloned()
1782    }
1783
1784    fn task_execution_canceled(
1785        &self,
1786        task_id: TaskId,
1787        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1788    ) {
1789        let mut ctx = self.execute_context(turbo_tasks);
1790        let mut task = ctx.task(task_id, TaskDataCategory::All);
1791        if let Some(in_progress) = task.take_in_progress() {
1792            match in_progress {
1793                InProgressState::Scheduled {
1794                    done_event,
1795                    reason: _,
1796                } => done_event.notify(usize::MAX),
1797                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1798                    done_event.notify(usize::MAX)
1799                }
1800                InProgressState::Canceled => {}
1801            }
1802        }
1803        // Notify any readers waiting on in-progress cells so their listeners
1804        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1805        let in_progress_cells = task.take_in_progress_cells();
1806        if let Some(ref cells) = in_progress_cells {
1807            for state in cells.values() {
1808                state.event.notify(usize::MAX);
1809            }
1810        }
1811
1812        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1813        // in the next session. Without this, any reader that encounters the cancelled task
1814        // records an error in its output. That error is persisted and would poison
1815        // subsequent builds. By marking the task session-dependent dirty, the next build
1816        // re-executes it, which invalidates dependents and corrects the stale errors.
1817        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1818            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1819        } else {
1820            None
1821        };
1822
1823        let old = task.set_in_progress(InProgressState::Canceled);
1824        debug_assert!(old.is_none(), "InProgress already exists");
1825        drop(task);
1826
1827        if let Some(data_update) = data_update {
1828            AggregationUpdateQueue::run(data_update, &mut ctx);
1829        }
1830
1831        drop(in_progress_cells);
1832    }
1833
1834    fn try_start_task_execution(
1835        &self,
1836        task_id: TaskId,
1837        priority: TaskPriority,
1838        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1839    ) -> Option<TaskExecutionSpec<'_>> {
1840        let execution_reason;
1841        let task_type;
1842        {
1843            let mut ctx = self.execute_context(turbo_tasks);
1844            let mut task = ctx.task(task_id, TaskDataCategory::All);
1845            task_type = task.get_task_type().to_owned();
1846            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1847            if let Some(tasks) = task.prefetch() {
1848                drop(task);
1849                ctx.prepare_tasks(tasks, "prefetch");
1850                task = ctx.task(task_id, TaskDataCategory::All);
1851            }
1852            let in_progress = task.take_in_progress()?;
1853            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1854                let old = task.set_in_progress(in_progress);
1855                debug_assert!(old.is_none(), "InProgress already exists");
1856                return None;
1857            };
1858            execution_reason = reason;
1859            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1860                InProgressStateInner {
1861                    stale: false,
1862                    once_task,
1863                    done_event,
1864                    marked_as_completed: false,
1865                    new_children: Default::default(),
1866                },
1867            )));
1868            debug_assert!(old.is_none(), "InProgress already exists");
1869
1870            // Make all current collectibles outdated (remove left-over outdated collectibles)
1871            enum Collectible {
1872                Current(CollectibleRef, i32),
1873                Outdated(CollectibleRef),
1874            }
1875            let collectibles = task
1876                .iter_collectibles()
1877                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1878                .chain(
1879                    task.iter_outdated_collectibles()
1880                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1881                )
1882                .collect::<Vec<_>>();
1883            for collectible in collectibles {
1884                match collectible {
1885                    Collectible::Current(collectible, value) => {
1886                        let _ = task.insert_outdated_collectible(collectible, value);
1887                    }
1888                    Collectible::Outdated(collectible) => {
1889                        if task
1890                            .collectibles()
1891                            .is_none_or(|m| m.get(&collectible).is_none())
1892                        {
1893                            task.remove_outdated_collectibles(&collectible);
1894                        }
1895                    }
1896                }
1897            }
1898
1899            if self.should_track_dependencies() {
1900                // Make all dependencies outdated
1901                let cell_dependencies = task.iter_cell_dependencies().collect();
1902                task.set_outdated_cell_dependencies(cell_dependencies);
1903
1904                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1905                task.set_outdated_output_dependencies(outdated_output_dependencies);
1906            }
1907        }
1908
1909        let (span, future) = match task_type {
1910            TaskType::Cached(task_type) => {
1911                let CachedTaskType {
1912                    native_fn,
1913                    this,
1914                    arg,
1915                } = &*task_type;
1916                (
1917                    native_fn.span(task_id.persistence(), execution_reason, priority),
1918                    native_fn.execute(*this, &**arg),
1919                )
1920            }
1921            TaskType::Transient(task_type) => {
1922                let span = tracing::trace_span!("turbo_tasks::root_task");
1923                let future = match &*task_type {
1924                    TransientTask::Root(f) => f(),
1925                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1926                };
1927                (span, future)
1928            }
1929        };
1930        Some(TaskExecutionSpec { future, span })
1931    }
1932
1933    /// Returns `Some(priority)` if the task became stale during execution and needs to be
1934    /// re-scheduled at the given priority. The caller (turbo-tasks manager) hands it to the
1935    /// priority runner so re-execution doesn't inherit the original schedule priority.
1936    fn task_execution_completed(
1937        &self,
1938        task_id: TaskId,
1939        result: Result<RawVc, TurboTasksExecutionError>,
1940        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1941        #[cfg(feature = "verify_determinism")] stateful: bool,
1942        has_invalidator: bool,
1943        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1944    ) -> Option<TaskPriority> {
1945        // Task completion is a 4 step process:
1946        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1947        //    aggregation number of the task and the new children.
1948        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1949        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1950        // 4. Shrink the task memory to reduce footprint of the task.
1951
1952        // Due to persistence it is possible that the process is cancelled after any step. This is
1953        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1954        // in-memory representation.
1955
1956        // The task might be invalidated during this process, so we need to check the stale flag
1957        // at the start of every step.
1958
1959        #[cfg(not(feature = "trace_task_details"))]
1960        let span = tracing::trace_span!(
1961            "task execution completed",
1962            new_children = tracing::field::Empty
1963        )
1964        .entered();
1965        #[cfg(feature = "trace_task_details")]
1966        let span = tracing::trace_span!(
1967            "task execution completed",
1968            task_id = display(task_id),
1969            result = match result.as_ref() {
1970                Ok(value) => display(either::Either::Left(value)),
1971                Err(err) => display(either::Either::Right(err)),
1972            },
1973            new_children = tracing::field::Empty,
1974            immutable = tracing::field::Empty,
1975            new_output = tracing::field::Empty,
1976            output_dependents = tracing::field::Empty,
1977            aggregation_number = tracing::field::Empty,
1978            stale = tracing::field::Empty,
1979        )
1980        .entered();
1981
1982        let is_error = result.is_err();
1983
1984        let mut ctx = self.execute_context(turbo_tasks);
1985
1986        let TaskExecutionCompletePrepareResult {
1987            new_children,
1988            is_now_immutable,
1989            #[cfg(feature = "verify_determinism")]
1990            no_output_set,
1991            new_output,
1992            output_dependent_tasks,
1993            is_recomputation,
1994            is_session_dependent,
1995        } = match self.task_execution_completed_prepare(
1996            &mut ctx,
1997            #[cfg(feature = "trace_task_details")]
1998            &span,
1999            task_id,
2000            result,
2001            cell_counters,
2002            #[cfg(feature = "verify_determinism")]
2003            stateful,
2004            has_invalidator,
2005        ) {
2006            Ok(r) => r,
2007            Err(stale_priority) => {
2008                // Task was stale and has been rescheduled
2009                #[cfg(feature = "trace_task_details")]
2010                span.record("stale", "prepare");
2011                return Some(stale_priority);
2012            }
2013        };
2014
2015        #[cfg(feature = "trace_task_details")]
2016        span.record("new_output", new_output.is_some());
2017        #[cfg(feature = "trace_task_details")]
2018        span.record("output_dependents", output_dependent_tasks.len());
2019
2020        // When restoring from filesystem cache the following might not be executed (since we can
2021        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2022        // would be executed again.
2023
2024        if !output_dependent_tasks.is_empty() {
2025            self.task_execution_completed_invalidate_output_dependent(
2026                &mut ctx,
2027                task_id,
2028                output_dependent_tasks,
2029            );
2030        }
2031
2032        let has_new_children = !new_children.is_empty();
2033        span.record("new_children", new_children.len());
2034
2035        if has_new_children {
2036            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2037        }
2038
2039        if has_new_children
2040            && let Some(stale_priority) =
2041                self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2042        {
2043            // Task was stale and has been rescheduled
2044            #[cfg(feature = "trace_task_details")]
2045            span.record("stale", "connect");
2046            return Some(stale_priority);
2047        }
2048
2049        let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2050            &mut ctx,
2051            task_id,
2052            #[cfg(feature = "verify_determinism")]
2053            no_output_set,
2054            new_output,
2055            is_now_immutable,
2056            is_session_dependent,
2057        );
2058        if let Some(stale_priority) = stale_priority {
2059            // Task was stale and has been rescheduled
2060            #[cfg(feature = "trace_task_details")]
2061            span.record("stale", "finish");
2062            return Some(stale_priority);
2063        }
2064
2065        let removed_data = self.task_execution_completed_cleanup(
2066            &mut ctx,
2067            task_id,
2068            cell_counters,
2069            is_error,
2070            is_recomputation,
2071        );
2072
2073        // Drop data outside of critical sections
2074        drop(removed_data);
2075        drop(in_progress_cells);
2076
2077        None
2078    }
2079
2080    fn task_execution_completed_prepare(
2081        &self,
2082        ctx: &mut impl ExecuteContext<'_>,
2083        #[cfg(feature = "trace_task_details")] span: &Span,
2084        task_id: TaskId,
2085        result: Result<RawVc, TurboTasksExecutionError>,
2086        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2087        #[cfg(feature = "verify_determinism")] stateful: bool,
2088        has_invalidator: bool,
2089    ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2090        let mut task = ctx.task(task_id, TaskDataCategory::All);
2091        let is_recomputation = task.is_dirty().is_none();
2092        // Without dependency tracking, the SessionDependent dirty state is never read (no session
2093        // restore), so skip the work
2094        let is_session_dependent = self.should_track_dependencies()
2095            && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2096        let Some(in_progress) = task.get_in_progress_mut() else {
2097            panic!("Task execution completed, but task is not in progress: {task:#?}");
2098        };
2099        if matches!(in_progress, InProgressState::Canceled) {
2100            return Ok(TaskExecutionCompletePrepareResult {
2101                new_children: Default::default(),
2102                is_now_immutable: false,
2103                #[cfg(feature = "verify_determinism")]
2104                no_output_set: false,
2105                new_output: None,
2106                output_dependent_tasks: Default::default(),
2107                is_recomputation,
2108                is_session_dependent,
2109            });
2110        }
2111        let &mut InProgressState::InProgress(box InProgressStateInner {
2112            stale,
2113            ref mut new_children,
2114            once_task: is_once_task,
2115            ..
2116        }) = in_progress
2117        else {
2118            panic!("Task execution completed, but task is not in progress: {task:#?}");
2119        };
2120
2121        // If the task is stale, reschedule it
2122        #[cfg(not(feature = "no_fast_stale"))]
2123        if stale && !is_once_task {
2124            let stale_priority = compute_stale_priority(&task);
2125            let Some(InProgressState::InProgress(box InProgressStateInner {
2126                done_event,
2127                mut new_children,
2128                ..
2129            })) = task.take_in_progress()
2130            else {
2131                unreachable!();
2132            };
2133            let old = task.set_in_progress(InProgressState::Scheduled {
2134                done_event,
2135                reason: TaskExecutionReason::Stale,
2136            });
2137            debug_assert!(old.is_none(), "InProgress already exists");
2138            // Remove old children from new_children to leave only the children that had their
2139            // active count increased
2140            for task in task.iter_children() {
2141                new_children.remove(&task);
2142            }
2143            drop(task);
2144
2145            // We need to undo the active count increase for the children since we throw away the
2146            // new_children list now.
2147            AggregationUpdateQueue::run(
2148                AggregationUpdateJob::DecreaseActiveCounts {
2149                    task_ids: new_children.into_iter().collect(),
2150                },
2151                ctx,
2152            );
2153            return Err(stale_priority);
2154        }
2155
2156        // take the children from the task to process them
2157        let mut new_children = take(new_children);
2158
2159        // handle stateful (only tracked when verify_determinism is enabled)
2160        #[cfg(feature = "verify_determinism")]
2161        if stateful {
2162            task.set_stateful(true);
2163        }
2164
2165        // handle has_invalidator
2166        if has_invalidator {
2167            task.set_invalidator(true);
2168        }
2169
2170        // handle cell counters: update max index and remove cells that are no longer used
2171        // On error, skip this update: the task may have failed before creating all cells it
2172        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2173        // based on partial counters would cause "cell no longer exists" errors for tasks that
2174        // still hold dependencies on those cells. The old cell data is preserved on error
2175        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2176        // that data is correct.
2177        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2178        // skips cell data removal on error.
2179        if result.is_ok() || is_recomputation {
2180            let old_counters: FxHashMap<_, _> = task
2181                .iter_cell_type_max_index()
2182                .map(|(&k, &v)| (k, v))
2183                .collect();
2184            let mut counters_to_remove = old_counters.clone();
2185
2186            for (&cell_type, &max_index) in cell_counters.iter() {
2187                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2188                    if old_max_index != max_index {
2189                        task.insert_cell_type_max_index(cell_type, max_index);
2190                    }
2191                } else {
2192                    task.insert_cell_type_max_index(cell_type, max_index);
2193                }
2194            }
2195            for (cell_type, _) in counters_to_remove {
2196                task.remove_cell_type_max_index(&cell_type);
2197            }
2198        }
2199
2200        let mut queue = AggregationUpdateQueue::new();
2201
2202        let mut old_edges = Vec::new();
2203
2204        let has_children = !new_children.is_empty();
2205        let is_immutable = task.immutable();
2206        let task_dependencies_for_immutable =
2207            // Task was previously marked as immutable
2208            if !is_immutable
2209            // Task is not session dependent (session dependent tasks can change between sessions)
2210            && !is_session_dependent
2211            // Task has no invalidator
2212            && !task.invalidator()
2213            // Task has no dependencies on collectibles
2214            && task.is_collectibles_dependencies_empty()
2215        {
2216            Some(
2217                // Collect all dependencies on tasks to check if all dependencies are immutable
2218                task.iter_output_dependencies()
2219                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2220                    .collect::<FxHashSet<_>>(),
2221            )
2222        } else {
2223            None
2224        };
2225
2226        if has_children {
2227            // Prepare all new children
2228            let _aggregation_number =
2229                prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2230
2231            #[cfg(feature = "trace_task_details")]
2232            span.record("aggregation_number", _aggregation_number);
2233
2234            // Filter actual new children
2235            old_edges.extend(
2236                task.iter_children()
2237                    .filter(|task| !new_children.remove(task))
2238                    .map(OutdatedEdge::Child),
2239            );
2240        } else {
2241            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2242        }
2243
2244        old_edges.extend(
2245            task.iter_outdated_collectibles()
2246                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2247        );
2248
2249        if self.should_track_dependencies() {
2250            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2251            // At execution start, active deps are copied to outdated as a "before" snapshot.
2252            // During execution, new deps are added to active.
2253            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2254            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2255            // breaking dependency tracking.
2256            old_edges.extend(
2257                task.iter_outdated_cell_dependencies()
2258                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2259            );
2260            old_edges.extend(
2261                task.iter_outdated_output_dependencies()
2262                    .map(OutdatedEdge::OutputDependency),
2263            );
2264        }
2265
2266        // Check if output need to be updated
2267        let current_output = task.get_output();
2268        #[cfg(feature = "verify_determinism")]
2269        let no_output_set = current_output.is_none();
2270        let new_output = match result {
2271            Ok(RawVc::TaskOutput(output_task_id)) => {
2272                if let Some(OutputValue::Output(current_task_id)) = current_output
2273                    && *current_task_id == output_task_id
2274                {
2275                    None
2276                } else {
2277                    Some(OutputValue::Output(output_task_id))
2278                }
2279            }
2280            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2281                if let Some(OutputValue::Cell(CellRef {
2282                    task: current_task_id,
2283                    cell: current_cell,
2284                })) = current_output
2285                    && *current_task_id == output_task_id
2286                    && *current_cell == cell
2287                {
2288                    None
2289                } else {
2290                    Some(OutputValue::Cell(CellRef {
2291                        task: output_task_id,
2292                        cell,
2293                    }))
2294                }
2295            }
2296            Ok(RawVc::LocalOutput(..)) => {
2297                panic!("Non-local tasks must not return a local Vc");
2298            }
2299            Err(err) => {
2300                if let Some(OutputValue::Error(old_error)) = current_output
2301                    && **old_error == err
2302                {
2303                    None
2304                } else {
2305                    Some(OutputValue::Error(Arc::new((&err).into())))
2306                }
2307            }
2308        };
2309        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2310        // When output has changed, grab the dependent tasks
2311        if new_output.is_some() && ctx.should_track_dependencies() {
2312            output_dependent_tasks = task.iter_output_dependent().collect();
2313        }
2314
2315        drop(task);
2316
2317        // Check if the task can be marked as immutable
2318        let mut is_now_immutable = false;
2319        if let Some(dependencies) = task_dependencies_for_immutable
2320            && dependencies
2321                .iter()
2322                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2323        {
2324            is_now_immutable = true;
2325        }
2326        #[cfg(feature = "trace_task_details")]
2327        span.record("immutable", is_immutable || is_now_immutable);
2328
2329        if !queue.is_empty() || !old_edges.is_empty() {
2330            #[cfg(any(
2331                feature = "trace_task_completion",
2332                feature = "trace_aggregation_update_stats"
2333            ))]
2334            let _span =
2335                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2336                    .entered();
2337            // Remove outdated edges first, before removing in_progress+dirty flag.
2338            // We need to make sure all outdated edges are removed before the task can potentially
2339            // be scheduled and executed again
2340            #[cfg(feature = "trace_aggregation_update_stats")]
2341            {
2342                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2343                _span.record("stats", tracing::field::debug(stats));
2344            }
2345            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2346            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2347        }
2348
2349        Ok(TaskExecutionCompletePrepareResult {
2350            new_children,
2351            is_now_immutable,
2352            #[cfg(feature = "verify_determinism")]
2353            no_output_set,
2354            new_output,
2355            output_dependent_tasks,
2356            is_recomputation,
2357            is_session_dependent,
2358        })
2359    }
2360
2361    fn task_execution_completed_invalidate_output_dependent(
2362        &self,
2363        ctx: &mut impl ExecuteContext<'_>,
2364        task_id: TaskId,
2365        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2366    ) {
2367        debug_assert!(!output_dependent_tasks.is_empty());
2368
2369        #[cfg(feature = "trace_task_dirty")]
2370        let task_description = self.debug_get_task_description(task_id);
2371
2372        if output_dependent_tasks.len() > 1 {
2373            ctx.prepare_tasks(
2374                output_dependent_tasks
2375                    .iter()
2376                    .map(|&id| (id, TaskDataCategory::All)),
2377                "invalidate output dependents",
2378            );
2379        }
2380
2381        fn process_output_dependents(
2382            ctx: &mut impl ExecuteContext<'_>,
2383            task_id: TaskId,
2384            #[cfg(feature = "trace_task_dirty")] task_description: &str,
2385            dependent_task_id: TaskId,
2386            queue: &mut AggregationUpdateQueue,
2387        ) {
2388            #[cfg(feature = "trace_task_output_dependencies")]
2389            let span = tracing::trace_span!(
2390                "invalidate output dependency",
2391                task = %task_id,
2392                dependent_task = %dependent_task_id,
2393                result = tracing::field::Empty,
2394            )
2395            .entered();
2396            let mut make_stale = true;
2397            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2398            let transient_task_type = dependent.get_transient_task_type();
2399            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2400                // once tasks are never invalidated
2401                #[cfg(feature = "trace_task_output_dependencies")]
2402                span.record("result", "once task");
2403                return;
2404            }
2405            if dependent.outdated_output_dependencies_contains(&task_id) {
2406                #[cfg(feature = "trace_task_output_dependencies")]
2407                span.record("result", "outdated dependency");
2408                // output dependency is outdated, so it hasn't read the output yet
2409                // and doesn't need to be invalidated
2410                // But importantly we still need to make the task dirty as it should no longer
2411                // be considered as "recomputation".
2412                make_stale = false;
2413            } else if !dependent.output_dependencies_contains(&task_id) {
2414                // output dependency has been removed, so the task doesn't depend on the
2415                // output anymore and doesn't need to be invalidated
2416                #[cfg(feature = "trace_task_output_dependencies")]
2417                span.record("result", "no backward dependency");
2418                return;
2419            }
2420            make_task_dirty_internal(
2421                dependent,
2422                dependent_task_id,
2423                make_stale,
2424                #[cfg(feature = "trace_task_dirty")]
2425                TaskDirtyCause::OutputChange {
2426                    task_description: task_description.to_string(),
2427                },
2428                queue,
2429                ctx,
2430            );
2431            #[cfg(feature = "trace_task_output_dependencies")]
2432            span.record("result", "marked dirty");
2433        }
2434
2435        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2436            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2437            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2438            let _ = scope_and_block(chunks.len(), |scope| {
2439                for chunk in chunks {
2440                    let child_ctx = ctx.child_context();
2441                    #[cfg(feature = "trace_task_dirty")]
2442                    let task_description = &task_description;
2443                    scope.spawn(move || {
2444                        let mut ctx = child_ctx.create();
2445                        let mut queue = AggregationUpdateQueue::new();
2446                        for dependent_task_id in chunk {
2447                            process_output_dependents(
2448                                &mut ctx,
2449                                task_id,
2450                                #[cfg(feature = "trace_task_dirty")]
2451                                task_description,
2452                                dependent_task_id,
2453                                &mut queue,
2454                            )
2455                        }
2456                        queue.execute(&mut ctx);
2457                    });
2458                }
2459            });
2460        } else {
2461            let mut queue = AggregationUpdateQueue::new();
2462            for dependent_task_id in output_dependent_tasks {
2463                process_output_dependents(
2464                    ctx,
2465                    task_id,
2466                    #[cfg(feature = "trace_task_dirty")]
2467                    &task_description,
2468                    dependent_task_id,
2469                    &mut queue,
2470                );
2471            }
2472            queue.execute(ctx);
2473        }
2474    }
2475
2476    fn task_execution_completed_unfinished_children_dirty(
2477        &self,
2478        ctx: &mut impl ExecuteContext<'_>,
2479        new_children: &FxHashSet<TaskId>,
2480    ) {
2481        debug_assert!(!new_children.is_empty());
2482
2483        let mut queue = AggregationUpdateQueue::new();
2484        ctx.for_each_task_all(
2485            new_children.iter().copied(),
2486            "unfinished children dirty",
2487            |child_task, ctx| {
2488                if !child_task.has_output() {
2489                    let child_id = child_task.id();
2490                    make_task_dirty_internal(
2491                        child_task,
2492                        child_id,
2493                        false,
2494                        #[cfg(feature = "trace_task_dirty")]
2495                        TaskDirtyCause::InitialDirty,
2496                        &mut queue,
2497                        ctx,
2498                    );
2499                }
2500            },
2501        );
2502
2503        queue.execute(ctx);
2504    }
2505
2506    fn task_execution_completed_connect(
2507        &self,
2508        ctx: &mut impl ExecuteContext<'_>,
2509        task_id: TaskId,
2510        new_children: FxHashSet<TaskId>,
2511    ) -> Option<TaskPriority> {
2512        debug_assert!(!new_children.is_empty());
2513
2514        let mut task = ctx.task(task_id, TaskDataCategory::All);
2515        let Some(in_progress) = task.get_in_progress() else {
2516            panic!("Task execution completed, but task is not in progress: {task:#?}");
2517        };
2518        if matches!(in_progress, InProgressState::Canceled) {
2519            // Task was canceled in the meantime, so we don't connect the children
2520            return None;
2521        }
2522        let InProgressState::InProgress(box InProgressStateInner {
2523            #[cfg(not(feature = "no_fast_stale"))]
2524            stale,
2525            once_task: is_once_task,
2526            ..
2527        }) = in_progress
2528        else {
2529            panic!("Task execution completed, but task is not in progress: {task:#?}");
2530        };
2531
2532        // If the task is stale, reschedule it
2533        #[cfg(not(feature = "no_fast_stale"))]
2534        if *stale && !is_once_task {
2535            let stale_priority = compute_stale_priority(&task);
2536            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2537                task.take_in_progress()
2538            else {
2539                unreachable!();
2540            };
2541            let old = task.set_in_progress(InProgressState::Scheduled {
2542                done_event,
2543                reason: TaskExecutionReason::Stale,
2544            });
2545            debug_assert!(old.is_none(), "InProgress already exists");
2546            drop(task);
2547
2548            // All `new_children` are currently hold active with an active count and we need to undo
2549            // that. (We already filtered out the old children from that list)
2550            AggregationUpdateQueue::run(
2551                AggregationUpdateJob::DecreaseActiveCounts {
2552                    task_ids: new_children.into_iter().collect(),
2553                },
2554                ctx,
2555            );
2556            return Some(stale_priority);
2557        }
2558
2559        let has_active_count = ctx.should_track_activeness()
2560            && task
2561                .get_activeness()
2562                .is_some_and(|activeness| activeness.active_counter > 0);
2563        connect_children(
2564            ctx,
2565            task_id,
2566            task,
2567            new_children,
2568            has_active_count,
2569            ctx.should_track_activeness(),
2570        );
2571
2572        None
2573    }
2574
2575    #[allow(clippy::type_complexity)]
2576    fn task_execution_completed_finish(
2577        &self,
2578        ctx: &mut impl ExecuteContext<'_>,
2579        task_id: TaskId,
2580        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2581        new_output: Option<OutputValue>,
2582        is_now_immutable: bool,
2583        is_session_dependent: bool,
2584    ) -> (
2585        Option<TaskPriority>,
2586        Option<
2587            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2588        >,
2589    ) {
2590        let mut task = ctx.task(task_id, TaskDataCategory::All);
2591        let Some(in_progress) = task.take_in_progress() else {
2592            panic!("Task execution completed, but task is not in progress: {task:#?}");
2593        };
2594        if matches!(in_progress, InProgressState::Canceled) {
2595            // Task was canceled in the meantime, so we don't finish it
2596            return (None, None);
2597        }
2598        let InProgressState::InProgress(box InProgressStateInner {
2599            done_event,
2600            once_task: is_once_task,
2601            stale,
2602            marked_as_completed: _,
2603            new_children,
2604        }) = in_progress
2605        else {
2606            panic!("Task execution completed, but task is not in progress: {task:#?}");
2607        };
2608        debug_assert!(new_children.is_empty());
2609
2610        // If the task is stale, reschedule it
2611        if stale && !is_once_task {
2612            let stale_priority = compute_stale_priority(&task);
2613            let old = task.set_in_progress(InProgressState::Scheduled {
2614                done_event,
2615                reason: TaskExecutionReason::Stale,
2616            });
2617            debug_assert!(old.is_none(), "InProgress already exists");
2618            return (Some(stale_priority), None);
2619        }
2620
2621        // Set the output if it has changed
2622        let mut old_content = None;
2623        if let Some(value) = new_output {
2624            old_content = task.set_output(value);
2625        }
2626
2627        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2628        // to be invalidated and we can mark it as immutable.
2629        if is_now_immutable {
2630            task.set_immutable(true);
2631        }
2632
2633        // Notify in progress cells and remove all of them
2634        let in_progress_cells = task.take_in_progress_cells();
2635        if let Some(ref cells) = in_progress_cells {
2636            for state in cells.values() {
2637                state.event.notify(usize::MAX);
2638            }
2639        }
2640
2641        // Compute and apply the new dirty state, propagating to aggregating ancestors
2642        let new_dirtyness = if is_session_dependent {
2643            Some(Dirtyness::SessionDependent)
2644        } else {
2645            None
2646        };
2647        #[cfg(feature = "verify_determinism")]
2648        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2649        let data_update = task.update_dirty_state(new_dirtyness);
2650
2651        // Under verify_determinism we re-run a task whose dirty state or output unexpectedly
2652        // changed during execution to confirm determinism. The leaf priority keeps these
2653        // verification reruns at the highest priority.
2654        #[cfg(feature = "verify_determinism")]
2655        let stale_priority: Option<TaskPriority> =
2656            ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2657                .then(TaskPriority::leaf);
2658        #[cfg(not(feature = "verify_determinism"))]
2659        let stale_priority: Option<TaskPriority> = None;
2660        if stale_priority.is_some() {
2661            let old = task.set_in_progress(InProgressState::Scheduled {
2662                done_event,
2663                reason: TaskExecutionReason::Stale,
2664            });
2665            debug_assert!(old.is_none(), "InProgress already exists");
2666            drop(task);
2667        } else {
2668            drop(task);
2669
2670            // Notify dependent tasks that are waiting for this task to finish
2671            done_event.notify(usize::MAX);
2672        }
2673
2674        drop(old_content);
2675
2676        if let Some(data_update) = data_update {
2677            AggregationUpdateQueue::run(data_update, ctx);
2678        }
2679
2680        // We return so the data can be dropped outside of critical sections
2681        (stale_priority, in_progress_cells)
2682    }
2683
2684    fn task_execution_completed_cleanup(
2685        &self,
2686        ctx: &mut impl ExecuteContext<'_>,
2687        task_id: TaskId,
2688        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2689        is_error: bool,
2690        is_recomputation: bool,
2691    ) -> Vec<SharedReference> {
2692        let mut task = ctx.task(task_id, TaskDataCategory::All);
2693        let mut removed_cell_data = Vec::new();
2694        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2695        // after an error as it is likely transient and we want to keep the dependent tasks
2696        // clean to avoid re-executions.
2697        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2698        // skips cell_type_max_index updates on error.
2699        if !is_error || is_recomputation {
2700            // Remove no longer existing cells and
2701            // find all outdated data items (removed cells, outdated edges)
2702            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2703            // anyway and we want to avoid needless re-executions. When the cells become
2704            // used again, they are invalidated from the update cell operation.
2705            let to_remove: Vec<_> = task
2706                .iter_cell_data()
2707                .filter_map(|(cell, _)| {
2708                    cell_counters
2709                        .get(&cell.type_id)
2710                        .is_none_or(|start_index| cell.index >= *start_index)
2711                        .then_some(*cell)
2712                })
2713                .collect();
2714            removed_cell_data.reserve_exact(to_remove.len());
2715            for cell in to_remove {
2716                if let Some(data) = task.remove_cell_data(&cell) {
2717                    removed_cell_data.push(data);
2718                }
2719            }
2720            // Remove cell_data_hash entries for cells that no longer exist
2721            let to_remove_hash: Vec<_> = task
2722                .iter_cell_data_hash()
2723                .filter_map(|(cell, _)| {
2724                    cell_counters
2725                        .get(&cell.type_id)
2726                        .is_none_or(|start_index| cell.index >= *start_index)
2727                        .then_some(*cell)
2728                })
2729                .collect();
2730            for cell in to_remove_hash {
2731                task.remove_cell_data_hash(&cell);
2732            }
2733        }
2734
2735        // Clean up task storage after execution:
2736        // - Shrink collections marked with shrink_on_completion
2737        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2738        task.cleanup_after_execution();
2739
2740        drop(task);
2741
2742        // Return so we can drop outside of critical sections
2743        removed_cell_data
2744    }
2745
2746    /// Prints the standard message emitted when the background persisting process stops due to an
2747    /// unrecoverable write error. The caller is responsible for returning from the background job.
2748    fn log_unrecoverable_persist_error() {
2749        eprintln!(
2750            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2751             background persisting process."
2752        );
2753    }
2754
2755    fn run_backend_job<'a>(
2756        self: &'a Arc<Self>,
2757        job: TurboTasksBackendJob,
2758        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2759    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2760        Box::pin(async move {
2761            match job {
2762                TurboTasksBackendJob::Snapshot => {
2763                    debug_assert!(self.should_persist());
2764
2765                    let mut last_snapshot = self.start_time;
2766                    let mut idle_start_listener = self.idle_start_event.listen();
2767                    let mut idle_end_listener = self.idle_end_event.listen();
2768                    // Whether to immediately set an idle timeout if possible.
2769                    // Set to false if we don't persist anything in a cycle.
2770                    let mut fresh_idle = true;
2771                    let mut evicted = false;
2772                    let mut is_first = true;
2773                    'outer: loop {
2774                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2775                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2776                        let idle_timeout = *IDLE_TIMEOUT;
2777                        let (time, mut reason) = if is_first {
2778                            (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2779                        } else {
2780                            (SNAPSHOT_INTERVAL, "regular snapshot interval")
2781                        };
2782
2783                        let until = last_snapshot + time;
2784                        if until > Instant::now() {
2785                            let mut stop_listener = self.stopping_event.listen();
2786                            if self.stopping.load(Ordering::Acquire) {
2787                                return;
2788                            }
2789                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2790                                Instant::now() + idle_timeout
2791                            } else {
2792                                far_future()
2793                            };
2794                            loop {
2795                                tokio::select! {
2796                                    _ = &mut stop_listener => {
2797                                        return;
2798                                    },
2799                                    _ = &mut idle_start_listener => {
2800                                        idle_time = Instant::now() + idle_timeout;
2801                                        idle_start_listener = self.idle_start_event.listen()
2802                                    },
2803                                    _ = &mut idle_end_listener => {
2804                                        idle_time = far_future();
2805                                        idle_end_listener = self.idle_end_event.listen()
2806                                    },
2807                                    _ = tokio::time::sleep_until(until) => {
2808                                        break;
2809                                    },
2810                                    _ = tokio::time::sleep_until(idle_time) => {
2811                                        if turbo_tasks.is_idle() {
2812                                            reason = "idle timeout";
2813                                            break;
2814                                        }
2815                                    },
2816                                }
2817                            }
2818                        }
2819
2820                        let this = self.clone();
2821                        // Create a root span shared by both the snapshot/persist
2822                        // work and the subsequent compaction so they appear
2823                        // grouped together in trace viewers.
2824                        let background_span =
2825                            tracing::info_span!(parent: None, "background snapshot");
2826                        match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2827                            Err(err) => {
2828                                // save_snapshot consumed persisted_task_cache_log entries;
2829                                // further snapshots would corrupt the task graph.
2830                                eprintln!("Persisting failed: {err:?}");
2831                                Self::log_unrecoverable_persist_error();
2832                                return;
2833                            }
2834                            Ok((snapshot_start, new_data)) => {
2835                                // if we see 'new_data' then the next idle transition is 'fresh'
2836                                fresh_idle = new_data;
2837                                is_first = false;
2838                                last_snapshot = snapshot_start;
2839
2840                                // Polls the idle-end event without blocking. Returns
2841                                // `true` and refreshes the listener if idle has ended,
2842                                // `false` if we are still idle.
2843                                macro_rules! check_idle_ended {
2844                                    () => {{
2845                                        tokio::select! {
2846                                            biased;
2847                                            _ = &mut idle_end_listener => {
2848                                                idle_end_listener = self.idle_end_event.listen();
2849                                                true
2850                                            },
2851                                            _ = std::future::ready(()) => false,
2852                                        }
2853                                    }};
2854                                }
2855                                // Evict persisted tasks from memory to reclaim space.
2856                                // Like compaction, this runs after snapshot_and_persist
2857                                // as a separate concern.
2858                                //
2859                                // TODO: improve eviction policy — current approach is a full sweep
2860                                // after every snapshot. Better strategies to consider:
2861                                //   - Memory pressure signals: only evict when RSS exceeds a
2862                                //     threshold rather than unconditionally.
2863                                //   - Recency data: track last-access time per task and evict
2864                                //     least-recently-used entries first rather than all at once.
2865                                //   - Eviction intensity: partial sweeps (evict a fraction of
2866                                //     eligible tasks per cycle) to reduce latency spikes.
2867                                // Evict when there is new data to persist (the common
2868                                // case) or on the very first snapshot after startup
2869                                // (data was already on disk from a prior run, so
2870                                // new_data may be false but in-memory state can still
2871                                // be evicted).
2872                                let mut ran_eviction = false;
2873                                if this.should_evict() && (new_data || !evicted) {
2874                                    if check_idle_ended!() {
2875                                        // need to start all the way over so we catch the next
2876                                        // signal
2877                                        continue 'outer;
2878                                    }
2879                                    evicted = true;
2880                                    ran_eviction = true;
2881                                    this.storage.evict_after_snapshot(background_span.id());
2882                                }
2883
2884                                // Compact while idle (up to limit), regardless of
2885                                // whether the snapshot had new data.
2886                                // `background_span` is not entered here because
2887                                // `EnteredSpan` is `!Send` and would prevent the
2888                                // future from being sent across threads when it
2889                                // suspends at the `select!` await below.
2890                                let mut ran_compaction = false;
2891                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2892                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2893                                    if check_idle_ended!() {
2894                                        continue 'outer;
2895                                    }
2896                                    // Enter the span only around the synchronous
2897                                    // compact() call so we never hold an
2898                                    // `EnteredSpan` across an await point.
2899                                    let _compact_span = tracing::info_span!(
2900                                        parent: background_span.id(),
2901                                        "compact database"
2902                                    )
2903                                    .entered();
2904                                    match self.backing_storage.compact() {
2905                                        Ok(true) => {
2906                                            ran_compaction = true;
2907                                        }
2908                                        Ok(false) => break,
2909                                        Err(err) => {
2910                                            eprintln!("Compaction failed: {err:?}");
2911                                            if self.backing_storage.has_unrecoverable_write_error()
2912                                            {
2913                                                Self::log_unrecoverable_persist_error();
2914                                                return;
2915                                            }
2916                                            break;
2917                                        }
2918                                    }
2919                                }
2920                                if check_idle_ended!() {
2921                                    continue 'outer;
2922                                }
2923                                // After running snapshotting/eviction/compaction we have churned a
2924                                // _lot_ of memory if we are still
2925                                // idle tell `mimalloc` that now would be a good time to release
2926                                // memory back to the OS
2927                                if new_data || ran_compaction || ran_eviction {
2928                                    turbo_tasks_malloc::TurboMalloc::collect(true);
2929                                }
2930                            }
2931                        }
2932                    }
2933                }
2934            }
2935        })
2936    }
2937
2938    fn try_read_own_task_cell(
2939        &self,
2940        task_id: TaskId,
2941        cell: CellId,
2942        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2943    ) -> Result<TypedCellContent> {
2944        let mut ctx = self.execute_context(turbo_tasks);
2945        let task = ctx.task(task_id, TaskDataCategory::Data);
2946        if let Some(content) = task.get_cell_data(&cell).cloned() {
2947            Ok(CellContent(Some(content)).into_typed(cell.type_id))
2948        } else {
2949            Ok(CellContent(None).into_typed(cell.type_id))
2950        }
2951    }
2952
2953    fn read_task_collectibles(
2954        &self,
2955        task_id: TaskId,
2956        collectible_type: TraitTypeId,
2957        reader_id: Option<TaskId>,
2958        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2959    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2960        let mut ctx = self.execute_context(turbo_tasks);
2961        let mut collectibles = AutoMap::default();
2962        {
2963            let mut task = ctx.task(task_id, TaskDataCategory::All);
2964            if task
2965                .get_persistent_task_type()
2966                .is_some_and(|t| !t.native_fn.is_root)
2967            {
2968                drop(task);
2969                panic!(
2970                    "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
2971                     is missing on the task.",
2972                    self.debug_get_task_description(task_id),
2973                    reader_id.map_or_else(
2974                        || "unknown".to_string(),
2975                        |r| self.debug_get_task_description(r)
2976                    )
2977                );
2978            }
2979            for (collectible, count) in task.iter_aggregated_collectibles() {
2980                if *count > 0 && collectible.collectible_type == collectible_type {
2981                    *collectibles
2982                        .entry(RawVc::TaskCell(
2983                            collectible.cell.task,
2984                            collectible.cell.cell,
2985                        ))
2986                        .or_insert(0) += 1;
2987                }
2988            }
2989            for (&collectible, &count) in task.iter_collectibles() {
2990                if collectible.collectible_type == collectible_type {
2991                    *collectibles
2992                        .entry(RawVc::TaskCell(
2993                            collectible.cell.task,
2994                            collectible.cell.cell,
2995                        ))
2996                        .or_insert(0) += count;
2997                }
2998            }
2999            if let Some(reader_id) = reader_id {
3000                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3001            }
3002        }
3003        if let Some(reader_id) = reader_id {
3004            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3005            let target = CollectiblesRef {
3006                task: task_id,
3007                collectible_type,
3008            };
3009            if !reader.remove_outdated_collectibles_dependencies(&target) {
3010                let _ = reader.add_collectibles_dependencies(target);
3011            }
3012        }
3013        collectibles
3014    }
3015
3016    fn emit_collectible(
3017        &self,
3018        collectible_type: TraitTypeId,
3019        collectible: RawVc,
3020        task_id: TaskId,
3021        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3022    ) {
3023        self.assert_valid_collectible(task_id, collectible);
3024
3025        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3026            panic!("Collectibles need to be resolved");
3027        };
3028        let cell = CellRef {
3029            task: collectible_task,
3030            cell,
3031        };
3032        operation::UpdateCollectibleOperation::run(
3033            task_id,
3034            CollectibleRef {
3035                collectible_type,
3036                cell,
3037            },
3038            1,
3039            self.execute_context(turbo_tasks),
3040        );
3041    }
3042
3043    fn unemit_collectible(
3044        &self,
3045        collectible_type: TraitTypeId,
3046        collectible: RawVc,
3047        count: u32,
3048        task_id: TaskId,
3049        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3050    ) {
3051        self.assert_valid_collectible(task_id, collectible);
3052
3053        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3054            panic!("Collectibles need to be resolved");
3055        };
3056        let cell = CellRef {
3057            task: collectible_task,
3058            cell,
3059        };
3060        operation::UpdateCollectibleOperation::run(
3061            task_id,
3062            CollectibleRef {
3063                collectible_type,
3064                cell,
3065            },
3066            -(i32::try_from(count).unwrap()),
3067            self.execute_context(turbo_tasks),
3068        );
3069    }
3070
3071    fn update_task_cell(
3072        &self,
3073        task_id: TaskId,
3074        cell: CellId,
3075        content: CellContent,
3076        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3077        content_hash: Option<CellHash>,
3078        verification_mode: VerificationMode,
3079        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3080    ) {
3081        operation::UpdateCellOperation::run(
3082            task_id,
3083            cell,
3084            content,
3085            updated_key_hashes,
3086            content_hash,
3087            verification_mode,
3088            self.execute_context(turbo_tasks),
3089        );
3090    }
3091
3092    fn mark_own_task_as_finished(
3093        &self,
3094        task: TaskId,
3095        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3096    ) {
3097        let mut ctx = self.execute_context(turbo_tasks);
3098        let mut task = ctx.task(task, TaskDataCategory::Data);
3099        if let Some(InProgressState::InProgress(box InProgressStateInner {
3100            marked_as_completed,
3101            ..
3102        })) = task.get_in_progress_mut()
3103        {
3104            *marked_as_completed = true;
3105            // TODO this should remove the dirty state (also check session_dependent)
3106            // but this would break some assumptions for strongly consistent reads.
3107            // Client tasks are not connected yet, so we wouldn't wait for them.
3108            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3109        }
3110    }
3111
3112    fn connect_task(
3113        &self,
3114        task: TaskId,
3115        parent_task: Option<TaskId>,
3116        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3117    ) {
3118        self.assert_not_persistent_calling_transient(parent_task, task, None);
3119        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3120    }
3121
3122    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3123        let task_id = self.transient_task_id_factory.get();
3124        {
3125            let mut task = self.storage.access_mut(task_id);
3126            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3127        }
3128        #[cfg(feature = "verify_aggregation_graph")]
3129        self.root_tasks.lock().insert(task_id);
3130        task_id
3131    }
3132
3133    fn dispose_root_task(
3134        &self,
3135        task_id: TaskId,
3136        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3137    ) {
3138        #[cfg(feature = "verify_aggregation_graph")]
3139        self.root_tasks.lock().remove(&task_id);
3140
3141        let mut ctx = self.execute_context(turbo_tasks);
3142        let mut task = ctx.task(task_id, TaskDataCategory::All);
3143        let is_dirty = task.is_dirty();
3144        let has_dirty_containers = task.has_dirty_containers();
3145        if is_dirty.is_some() || has_dirty_containers {
3146            if let Some(activeness_state) = task.get_activeness_mut() {
3147                // We will finish the task, but it would be removed after the task is done
3148                activeness_state.unset_root_type();
3149                activeness_state.set_active_until_clean();
3150            };
3151        } else if let Some(activeness_state) = task.take_activeness() {
3152            // Technically nobody should be listening to this event, but just in case
3153            // we notify it anyway
3154            activeness_state.all_clean_event.notify(usize::MAX);
3155        }
3156    }
3157
3158    #[cfg(feature = "verify_aggregation_graph")]
3159    fn verify_aggregation_graph(
3160        &self,
3161        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3162        idle: bool,
3163    ) {
3164        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3165            return;
3166        }
3167        use std::{collections::VecDeque, env, io::stdout};
3168
3169        use crate::backend::operation::{get_uppers, is_aggregating_node};
3170
3171        let mut ctx = self.execute_context(turbo_tasks);
3172        let root_tasks = self.root_tasks.lock().clone();
3173
3174        for task_id in root_tasks.into_iter() {
3175            let mut queue = VecDeque::new();
3176            let mut visited = FxHashSet::default();
3177            let mut aggregated_nodes = FxHashSet::default();
3178            let mut collectibles = FxHashMap::default();
3179            let root_task_id = task_id;
3180            visited.insert(task_id);
3181            aggregated_nodes.insert(task_id);
3182            queue.push_back(task_id);
3183            let mut counter = 0;
3184            while let Some(task_id) = queue.pop_front() {
3185                counter += 1;
3186                if counter % 100000 == 0 {
3187                    println!(
3188                        "queue={}, visited={}, aggregated_nodes={}",
3189                        queue.len(),
3190                        visited.len(),
3191                        aggregated_nodes.len()
3192                    );
3193                }
3194                let task = ctx.task(task_id, TaskDataCategory::All);
3195                if idle && !self.is_idle.load(Ordering::Relaxed) {
3196                    return;
3197                }
3198
3199                let uppers = get_uppers(&task);
3200                if task_id != root_task_id
3201                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3202                {
3203                    panic!(
3204                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3205                         {:?})",
3206                        task_id,
3207                        task.get_task_description(),
3208                        uppers
3209                    );
3210                }
3211
3212                for (collectible, _) in task.iter_aggregated_collectibles() {
3213                    collectibles
3214                        .entry(*collectible)
3215                        .or_insert_with(|| (false, Vec::new()))
3216                        .1
3217                        .push(task_id);
3218                }
3219
3220                for (&collectible, &value) in task.iter_collectibles() {
3221                    if value > 0 {
3222                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3223                            *flag = true
3224                        } else {
3225                            panic!(
3226                                "Task {} has a collectible {:?} that is not in any upper task",
3227                                task_id, collectible
3228                            );
3229                        }
3230                    }
3231                }
3232
3233                let is_dirty = task.has_dirty();
3234                let has_dirty_container = task.has_dirty_containers();
3235                let should_be_in_upper = is_dirty || has_dirty_container;
3236
3237                let aggregation_number = get_aggregation_number(&task);
3238                if is_aggregating_node(aggregation_number) {
3239                    aggregated_nodes.insert(task_id);
3240                }
3241                // println!(
3242                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3243                //     ctx.get_task_description(task_id),
3244                //     uppers
3245                // );
3246
3247                for child_id in task.iter_children() {
3248                    // println!("{task_id}: child -> {child_id}");
3249                    if visited.insert(child_id) {
3250                        queue.push_back(child_id);
3251                    }
3252                }
3253                drop(task);
3254
3255                if should_be_in_upper {
3256                    for upper_id in uppers {
3257                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3258                        let in_upper = upper
3259                            .get_aggregated_dirty_containers(&task_id)
3260                            .is_some_and(|&dirty| dirty > 0);
3261                        if !in_upper {
3262                            let containers: Vec<_> = upper
3263                                .iter_aggregated_dirty_containers()
3264                                .map(|(&k, &v)| (k, v))
3265                                .collect();
3266                            let upper_task_desc = upper.get_task_description();
3267                            drop(upper);
3268                            panic!(
3269                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3270                                 ({})\nThese dirty containers are present:\n{:#?}",
3271                                task_id,
3272                                ctx.task(task_id, TaskDataCategory::Data)
3273                                    .get_task_description(),
3274                                upper_id,
3275                                upper_task_desc,
3276                                containers,
3277                            );
3278                        }
3279                    }
3280                }
3281            }
3282
3283            for (collectible, (flag, task_ids)) in collectibles {
3284                if !flag {
3285                    use std::io::Write;
3286                    let mut stdout = stdout().lock();
3287                    writeln!(
3288                        stdout,
3289                        "{:?} that is not emitted in any child task but in these aggregated \
3290                         tasks: {:#?}",
3291                        collectible,
3292                        task_ids
3293                            .iter()
3294                            .map(|t| format!(
3295                                "{t} {}",
3296                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3297                            ))
3298                            .collect::<Vec<_>>()
3299                    )
3300                    .unwrap();
3301
3302                    let task_id = collectible.cell.task;
3303                    let mut queue = {
3304                        let task = ctx.task(task_id, TaskDataCategory::All);
3305                        get_uppers(&task)
3306                    };
3307                    let mut visited = FxHashSet::default();
3308                    for &upper_id in queue.iter() {
3309                        visited.insert(upper_id);
3310                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3311                    }
3312                    while let Some(task_id) = queue.pop() {
3313                        let task = ctx.task(task_id, TaskDataCategory::All);
3314                        let desc = task.get_task_description();
3315                        let aggregated_collectible = task
3316                            .get_aggregated_collectibles(&collectible)
3317                            .copied()
3318                            .unwrap_or_default();
3319                        let uppers = get_uppers(&task);
3320                        drop(task);
3321                        writeln!(
3322                            stdout,
3323                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3324                        )
3325                        .unwrap();
3326                        if task_ids.contains(&task_id) {
3327                            writeln!(
3328                                stdout,
3329                                "Task has an upper connection to an aggregated task that doesn't \
3330                                 reference it. Upper connection is invalid!"
3331                            )
3332                            .unwrap();
3333                        }
3334                        for upper_id in uppers {
3335                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3336                            if !visited.contains(&upper_id) {
3337                                queue.push(upper_id);
3338                            }
3339                        }
3340                    }
3341                    panic!("See stdout for more details");
3342                }
3343            }
3344        }
3345    }
3346
3347    fn assert_not_persistent_calling_transient(
3348        &self,
3349        parent_id: Option<TaskId>,
3350        child_id: TaskId,
3351        cell_id: Option<CellId>,
3352    ) {
3353        if let Some(parent_id) = parent_id
3354            && !parent_id.is_transient()
3355            && child_id.is_transient()
3356        {
3357            self.panic_persistent_calling_transient(
3358                self.debug_get_task_description(parent_id),
3359                self.debug_get_cached_task_type(child_id).as_deref(),
3360                cell_id,
3361            );
3362        }
3363    }
3364
3365    fn panic_persistent_calling_transient(
3366        &self,
3367        parent: String,
3368        child: Option<&CachedTaskType>,
3369        cell_id: Option<CellId>,
3370    ) -> ! {
3371        let transient_reason = if let Some(child) = child {
3372            Cow::Owned(format!(
3373                " The callee is transient because it depends on:\n{}",
3374                self.debug_trace_transient_task(child, cell_id),
3375            ))
3376        } else {
3377            Cow::Borrowed("")
3378        };
3379        panic!(
3380            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3381            parent,
3382            child.map_or("unknown", |t| t.get_name()),
3383            transient_reason,
3384        );
3385    }
3386
3387    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3388        // these checks occur in a potentially hot codepath, but they're cheap
3389        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3390            // This should never happen: The collectible APIs use ResolvedVc
3391            let task_info = if let Some(col_task_ty) = collectible
3392                .try_get_task_id()
3393                .map(|t| self.debug_get_task_description(t))
3394            {
3395                Cow::Owned(format!(" (return type of {col_task_ty})"))
3396            } else {
3397                Cow::Borrowed("")
3398            };
3399            panic!("Collectible{task_info} must be a ResolvedVc")
3400        };
3401        if col_task_id.is_transient() && !task_id.is_transient() {
3402            let transient_reason =
3403                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3404                    Cow::Owned(format!(
3405                        ". The collectible is transient because it depends on:\n{}",
3406                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3407                    ))
3408                } else {
3409                    Cow::Borrowed("")
3410                };
3411            // this should never happen: How would a persistent function get a transient Vc?
3412            panic!(
3413                "Collectible is transient, transient collectibles cannot be emitted from \
3414                 persistent tasks{transient_reason}",
3415            )
3416        }
3417    }
3418}
3419
3420impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3421    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3422        self.0.startup(turbo_tasks);
3423    }
3424
3425    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3426        self.0.stopping();
3427    }
3428
3429    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3430        self.0.stop(turbo_tasks);
3431    }
3432
3433    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3434        self.0.idle_start(turbo_tasks);
3435    }
3436
3437    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3438        self.0.idle_end();
3439    }
3440
3441    fn get_or_create_task(
3442        &self,
3443        native_fn: &'static NativeFunction,
3444        this: Option<RawVc>,
3445        arg: &mut dyn StackDynTaskInputs,
3446        parent_task: Option<TaskId>,
3447        persistence: TaskPersistence,
3448        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3449    ) -> TaskId {
3450        self.0
3451            .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3452    }
3453
3454    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3455        self.0.invalidate_task(task_id, turbo_tasks);
3456    }
3457
3458    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3459        self.0.invalidate_tasks(tasks, turbo_tasks);
3460    }
3461
3462    fn invalidate_tasks_set(
3463        &self,
3464        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3465        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3466    ) {
3467        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3468    }
3469
3470    fn invalidate_serialization(
3471        &self,
3472        task_id: TaskId,
3473        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3474    ) {
3475        self.0.invalidate_serialization(task_id, turbo_tasks);
3476    }
3477
3478    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3479        self.0.task_execution_canceled(task, turbo_tasks)
3480    }
3481
3482    fn try_start_task_execution(
3483        &self,
3484        task_id: TaskId,
3485        priority: TaskPriority,
3486        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3487    ) -> Option<TaskExecutionSpec<'_>> {
3488        self.0
3489            .try_start_task_execution(task_id, priority, turbo_tasks)
3490    }
3491
3492    fn task_execution_completed(
3493        &self,
3494        task_id: TaskId,
3495        result: Result<RawVc, TurboTasksExecutionError>,
3496        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3497        #[cfg(feature = "verify_determinism")] stateful: bool,
3498        has_invalidator: bool,
3499        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3500    ) -> Option<TaskPriority> {
3501        self.0.task_execution_completed(
3502            task_id,
3503            result,
3504            cell_counters,
3505            #[cfg(feature = "verify_determinism")]
3506            stateful,
3507            has_invalidator,
3508            turbo_tasks,
3509        )
3510    }
3511
3512    type BackendJob = TurboTasksBackendJob;
3513
3514    fn run_backend_job<'a>(
3515        &'a self,
3516        job: Self::BackendJob,
3517        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3518    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3519        self.0.run_backend_job(job, turbo_tasks)
3520    }
3521
3522    fn try_read_task_output(
3523        &self,
3524        task_id: TaskId,
3525        reader: Option<TaskId>,
3526        options: ReadOutputOptions,
3527        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3528    ) -> Result<Result<RawVc, EventListener>> {
3529        self.0
3530            .try_read_task_output(task_id, reader, options, turbo_tasks)
3531    }
3532
3533    fn try_read_task_cell(
3534        &self,
3535        task_id: TaskId,
3536        cell: CellId,
3537        reader: Option<TaskId>,
3538        options: ReadCellOptions,
3539        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3540    ) -> Result<Result<TypedCellContent, EventListener>> {
3541        self.0
3542            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3543    }
3544
3545    fn try_read_own_task_cell(
3546        &self,
3547        task_id: TaskId,
3548        cell: CellId,
3549        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3550    ) -> Result<TypedCellContent> {
3551        self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3552    }
3553
3554    fn read_task_collectibles(
3555        &self,
3556        task_id: TaskId,
3557        collectible_type: TraitTypeId,
3558        reader: Option<TaskId>,
3559        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3560    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3561        self.0
3562            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3563    }
3564
3565    fn emit_collectible(
3566        &self,
3567        collectible_type: TraitTypeId,
3568        collectible: RawVc,
3569        task_id: TaskId,
3570        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3571    ) {
3572        self.0
3573            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3574    }
3575
3576    fn unemit_collectible(
3577        &self,
3578        collectible_type: TraitTypeId,
3579        collectible: RawVc,
3580        count: u32,
3581        task_id: TaskId,
3582        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3583    ) {
3584        self.0
3585            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3586    }
3587
3588    fn update_task_cell(
3589        &self,
3590        task_id: TaskId,
3591        cell: CellId,
3592        content: CellContent,
3593        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3594        content_hash: Option<CellHash>,
3595        verification_mode: VerificationMode,
3596        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3597    ) {
3598        self.0.update_task_cell(
3599            task_id,
3600            cell,
3601            content,
3602            updated_key_hashes,
3603            content_hash,
3604            verification_mode,
3605            turbo_tasks,
3606        );
3607    }
3608
3609    fn mark_own_task_as_finished(
3610        &self,
3611        task_id: TaskId,
3612        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3613    ) {
3614        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3615    }
3616
3617    fn connect_task(
3618        &self,
3619        task: TaskId,
3620        parent_task: Option<TaskId>,
3621        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3622    ) {
3623        self.0.connect_task(task, parent_task, turbo_tasks);
3624    }
3625
3626    fn create_transient_task(
3627        &self,
3628        task_type: TransientTaskType,
3629        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3630    ) -> TaskId {
3631        self.0.create_transient_task(task_type)
3632    }
3633
3634    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3635        self.0.dispose_root_task(task_id, turbo_tasks);
3636    }
3637
3638    fn task_statistics(&self) -> &TaskStatisticsApi {
3639        &self.0.task_statistics
3640    }
3641
3642    fn is_tracking_dependencies(&self) -> bool {
3643        self.0.options.dependency_tracking
3644    }
3645
3646    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3647        self.0.get_task_name(task, turbo_tasks)
3648    }
3649}
3650
3651enum DebugTraceTransientTask {
3652    Cached {
3653        task_name: &'static str,
3654        cell_type_id: Option<ValueTypeId>,
3655        cause_self: Option<Box<DebugTraceTransientTask>>,
3656        cause_args: Vec<DebugTraceTransientTask>,
3657    },
3658    /// This representation is used when this task is a duplicate of one previously shown
3659    Collapsed {
3660        task_name: &'static str,
3661        cell_type_id: Option<ValueTypeId>,
3662    },
3663    Uncached {
3664        cell_type_id: Option<ValueTypeId>,
3665    },
3666}
3667
3668impl DebugTraceTransientTask {
3669    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3670        let indent = "    ".repeat(level);
3671        f.write_str(&indent)?;
3672
3673        fn fmt_cell_type_id(
3674            f: &mut fmt::Formatter<'_>,
3675            cell_type_id: Option<ValueTypeId>,
3676        ) -> fmt::Result {
3677            if let Some(ty) = cell_type_id {
3678                write!(
3679                    f,
3680                    " (read cell of type {})",
3681                    get_value_type(ty).ty.global_name
3682                )
3683            } else {
3684                Ok(())
3685            }
3686        }
3687
3688        // write the name and type
3689        match self {
3690            Self::Cached {
3691                task_name,
3692                cell_type_id,
3693                ..
3694            }
3695            | Self::Collapsed {
3696                task_name,
3697                cell_type_id,
3698                ..
3699            } => {
3700                f.write_str(task_name)?;
3701                fmt_cell_type_id(f, *cell_type_id)?;
3702                if matches!(self, Self::Collapsed { .. }) {
3703                    f.write_str(" (collapsed)")?;
3704                }
3705            }
3706            Self::Uncached { cell_type_id } => {
3707                f.write_str("unknown transient task")?;
3708                fmt_cell_type_id(f, *cell_type_id)?;
3709            }
3710        }
3711        f.write_char('\n')?;
3712
3713        // write any extra "cause" information we might have
3714        if let Self::Cached {
3715            cause_self,
3716            cause_args,
3717            ..
3718        } = self
3719        {
3720            if let Some(c) = cause_self {
3721                writeln!(f, "{indent}  self:")?;
3722                c.fmt_indented(f, level + 1)?;
3723            }
3724            if !cause_args.is_empty() {
3725                writeln!(f, "{indent}  args:")?;
3726                for c in cause_args {
3727                    c.fmt_indented(f, level + 1)?;
3728                }
3729            }
3730        }
3731        Ok(())
3732    }
3733}
3734
3735impl fmt::Display for DebugTraceTransientTask {
3736    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3737        self.fmt_indented(f, 0)
3738    }
3739}
3740
3741// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3742fn far_future() -> Instant {
3743    // Roughly 30 years from now.
3744    // API does not provide a way to obtain max `Instant`
3745    // or convert specific date in the future to instant.
3746    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3747    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3748}
3749
3750/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3751/// buffer.
3752/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3753/// resulting buffer sizes.
3754///
3755/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3756/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3757/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3758/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3759/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3760/// spurious `Result` threading throughout the encode path.
3761fn encode_task_data(
3762    task: TaskId,
3763    data: &TaskStorage,
3764    category: SpecificTaskDataCategory,
3765    scratch_buffer: &mut TurboBincodeBuffer,
3766) -> Result<TurboBincodeBuffer> {
3767    scratch_buffer.clear();
3768    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3769    data.encode(category, &mut encoder)?;
3770
3771    if cfg!(feature = "verify_serialization") {
3772        TaskStorage::new()
3773            .decode(
3774                category,
3775                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3776            )
3777            .with_context(|| {
3778                format!(
3779                    "expected to be able to decode serialized data for '{category:?}' information \
3780                     for {task}"
3781                )
3782            })?;
3783    }
3784    Ok(SmallVec::from_slice(scratch_buffer))
3785}