Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9    borrow::Cow,
10    fmt::{self, Write},
11    future::Future,
12    hash::BuildHasherDefault,
13    mem::take,
14    pin::Pin,
15    sync::{
16        Arc, LazyLock,
17        atomic::{AtomicBool, Ordering},
18    },
19    time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32    CellId, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions,
33    ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, TaskExecutionReason,
34    TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic,
35    ValueTypeId,
36    backend::{
37        Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
38        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40        VerificationMode,
41    },
42    event::{Event, EventDescription, EventListener},
43    macro_helpers::NativeFunction,
44    message_queue::{TimingEvent, TraceEvent},
45    registry::get_value_type,
46    scope::scope_and_block,
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51
52pub use self::{
53    operation::AnyOperation,
54    storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
55};
56#[cfg(feature = "trace_task_dirty")]
57use crate::backend::operation::TaskDirtyCause;
58use crate::{
59    backend::{
60        operation::{
61            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64            connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65            prepare_new_children,
66        },
67        snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68        storage::Storage,
69        storage_schema::{TaskStorage, TaskStorageAccessors},
70    },
71    backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
72    data::{
73        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
74        InProgressState, InProgressStateInner, OutputValue, TransientTask,
75    },
76    error::TaskError,
77    utils::{
78        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79        shard_amount::compute_shard_amount,
80    },
81};
82
83/// Threshold for parallelizing making dependent tasks dirty.
84/// If the number of dependent tasks exceeds this threshold,
85/// the operation will be parallelized.
86const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88/// Configurable idle timeout for snapshot persistence.
89/// Defaults to 2 seconds if not set or if the value is invalid.
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92        .ok()
93        .and_then(|v| v.parse::<u64>().ok())
94        .map(Duration::from_millis)
95        .unwrap_or(Duration::from_secs(2))
96});
97
98/// Priority used to re-schedule a task that became stale during execution.
99///
100/// Stale tasks must run again, but at a priority that reflects why they're being re-run rather
101/// than the (likely higher) priority of the original schedule. We use invalidation priority
102/// based on the task's leaf distance, parented under either the task's current dirty priority
103/// or `leaf()` if it is no longer dirty.
104fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105    TaskPriority::invalidation(
106        task.get_leaf_distance()
107            .copied()
108            .unwrap_or_default()
109            .distance,
110    )
111    .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115    /// Queries the storage for cache entries that don't exist locally.
116    ReadOnly,
117    /// Queries the storage for cache entries that don't exist locally.
118    /// Regularly pushes changes to the backing storage.
119    ReadWrite,
120    /// Queries the storage for cache entries that don't exist locally.
121    /// On shutdown, pushes all changes to the backing storage.
122    ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126    /// Enables dependency tracking.
127    ///
128    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
129    /// forever.
130    pub dependency_tracking: bool,
131
132    /// Enables active tracking.
133    ///
134    /// Automatically disabled when `dependency_tracking` is disabled.
135    ///
136    /// When disabled: All tasks are considered as active.
137    pub active_tracking: bool,
138
139    /// Enables the backing storage.
140    pub storage_mode: Option<StorageMode>,
141
142    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
143    /// datastructures. If `None`, it will use the available parallelism.
144    pub num_workers: Option<usize>,
145
146    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
147    pub small_preallocation: bool,
148
149    /// When enabled, evict all evictable tasks from in-memory storage after every snapshot.
150    /// This reclaims memory by clearing persisted data that can be re-loaded from disk on demand.
151    /// This is an EXPERIMENTAL FEATURE under development
152    pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156    fn default() -> Self {
157        Self {
158            dependency_tracking: true,
159            active_tracking: true,
160            storage_mode: Some(StorageMode::ReadWrite),
161            num_workers: None,
162            small_preallocation: false,
163            evict_after_snapshot: false,
164        }
165    }
166}
167
168pub enum TurboTasksBackendJob {
169    Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175    options: BackendOptions,
176
177    start_time: Instant,
178
179    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182    storage: Storage,
183
184    /// Coordinates the operation/snapshot interleaving protocol. See
185    /// [`SnapshotCoordinator`] for details.
186    snapshot_coord: SnapshotCoordinator,
187    /// Serializes calls to `snapshot_and_persist`. The coordinator's
188    /// `begin_snapshot` asserts that snapshots don't overlap; this mutex
189    /// enforces that contract for our two callers (background loop and
190    /// `stop_and_wait`).
191    snapshot_in_progress: Mutex<()>,
192
193    stopping: AtomicBool,
194    stopping_event: Event,
195    idle_start_event: Event,
196    idle_end_event: Event,
197    #[cfg(feature = "verify_aggregation_graph")]
198    is_idle: AtomicBool,
199
200    task_statistics: TaskStatisticsApi,
201
202    backing_storage: B,
203
204    #[cfg(feature = "verify_aggregation_graph")]
205    root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210        Self(Arc::new(TurboTasksBackendInner::new(
211            options,
212            backing_storage,
213        )))
214    }
215
216    pub fn backing_storage(&self) -> &B {
217        &self.0.backing_storage
218    }
219
220    /// Perform a snapshot and then evict all evictable tasks from memory.
221    ///
222    /// This is exposed for integration tests that need to verify the
223    /// snapshot → evict → restore cycle works correctly.
224    ///
225    /// Returns `(snapshot_had_new_data, eviction_counts)`.
226    pub fn snapshot_and_evict_for_testing(
227        &self,
228        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
229    ) -> (bool, EvictionCounts) {
230        self.0.snapshot_and_evict_for_testing(turbo_tasks)
231    }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237        if !options.dependency_tracking {
238            options.active_tracking = false;
239        }
240        let small_preallocation = options.small_preallocation;
241        let next_task_id = backing_storage
242            .next_free_task_id()
243            .expect("Failed to get task id");
244        Self {
245            options,
246            start_time: Instant::now(),
247            persisted_task_id_factory: IdFactoryWithReuse::new(
248                next_task_id,
249                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250            ),
251            transient_task_id_factory: IdFactoryWithReuse::new(
252                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253                TaskId::MAX,
254            ),
255            storage: Storage::new(shard_amount, small_preallocation),
256            snapshot_coord: SnapshotCoordinator::new(),
257            snapshot_in_progress: Mutex::new(()),
258            stopping: AtomicBool::new(false),
259            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262            #[cfg(feature = "verify_aggregation_graph")]
263            is_idle: AtomicBool::new(false),
264            task_statistics: TaskStatisticsApi::default(),
265            backing_storage,
266            #[cfg(feature = "verify_aggregation_graph")]
267            root_tasks: Default::default(),
268        }
269    }
270
271    fn execute_context<'a>(
272        &'a self,
273        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
274    ) -> impl ExecuteContext<'a> {
275        ExecuteContextImpl::new(self, turbo_tasks)
276    }
277
278    fn suspending_requested(&self) -> bool {
279        self.should_persist() && self.snapshot_coord.snapshot_pending()
280    }
281
282    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283        if self.should_persist() {
284            self.snapshot_coord.suspend_point(suspend);
285        }
286    }
287
288    pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289        if !self.should_persist() {
290            return OperationGuard::noop();
291        }
292        self.snapshot_coord.begin_operation()
293    }
294
295    fn should_persist(&self) -> bool {
296        matches!(
297            self.options.storage_mode,
298            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299        )
300    }
301
302    fn should_evict(&self) -> bool {
303        self.options.evict_after_snapshot && self.should_persist()
304    }
305
306    /// Perform a snapshot and then evict all evictable tasks from memory.
307    ///
308    /// This is exposed for integration tests that need to verify the
309    /// snapshot → evict → restore cycle works correctly.
310    ///
311    /// Returns `(snapshot_had_new_data, eviction_counts)`.
312    #[doc(hidden)]
313    pub fn snapshot_and_evict_for_testing(
314        &self,
315        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
316    ) -> (bool, EvictionCounts) {
317        assert!(
318            self.should_persist(),
319            "snapshot_and_evict requires persistence"
320        );
321        let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322        let had_new_data = match snapshot_result {
323            Ok((_, new_data)) => new_data,
324            Err(_) => {
325                // Snapshot/persist failed — skip eviction since the data may not
326                // be on disk yet. Evicting now could lose in-memory state that
327                // can't be restored.
328                return (false, EvictionCounts::default());
329            }
330        };
331        let counts = self.storage.evict_after_snapshot(None);
332        (had_new_data, counts)
333    }
334
335    fn should_restore(&self) -> bool {
336        self.options.storage_mode.is_some()
337    }
338
339    fn should_track_dependencies(&self) -> bool {
340        self.options.dependency_tracking
341    }
342
343    fn should_track_activeness(&self) -> bool {
344        self.options.active_tracking
345    }
346
347    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348        self.task_statistics
349            .map(|stats| stats.increment_cache_hit(native_fn));
350    }
351
352    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353        self.task_statistics
354            .map(|stats| stats.increment_cache_miss(native_fn));
355    }
356
357    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
358    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
359    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
360    /// references for lazy name resolution.
361    fn task_error_to_turbo_tasks_execution_error(
362        &self,
363        error: &TaskError,
364        ctx: &mut impl ExecuteContext<'_>,
365    ) -> TurboTasksExecutionError {
366        match error {
367            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369                message: item.message.clone(),
370                source: item
371                    .source
372                    .as_ref()
373                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374            })),
375            TaskError::LocalTaskContext(local_task_context) => {
376                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377                    name: local_task_context.name.clone(),
378                    source: local_task_context
379                        .source
380                        .as_ref()
381                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382                }))
383            }
384            TaskError::TaskChain(chain) => {
385                let task_id = chain.last().unwrap();
386                let error = {
387                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
388                    if let Some(OutputValue::Error(error)) = task.get_output() {
389                        Some(error.clone())
390                    } else {
391                        None
392                    }
393                };
394                let error = error.map_or_else(
395                    || {
396                        // Eventual consistency will cause errors to no longer be available
397                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399                                "Error no longer available",
400                            )),
401                            location: None,
402                        }))
403                    },
404                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405                );
406                let mut current_error = error;
407                for &task_id in chain.iter().rev() {
408                    current_error =
409                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410                            task_id,
411                            source: Some(current_error),
412                            turbo_tasks: ctx.turbo_tasks(),
413                        }));
414                }
415                current_error
416            }
417        }
418    }
419}
420
421/// Intermediate result of step 1 of task execution completion.
422struct TaskExecutionCompletePrepareResult {
423    pub new_children: FxHashSet<TaskId>,
424    pub is_now_immutable: bool,
425    #[cfg(feature = "verify_determinism")]
426    pub no_output_set: bool,
427    pub new_output: Option<OutputValue>,
428    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
429    pub is_recomputation: bool,
430    pub is_session_dependent: bool,
431}
432
433// Operations
434impl<B: BackingStorage> TurboTasksBackendInner<B> {
435    fn try_read_task_output(
436        self: &Arc<Self>,
437        task_id: TaskId,
438        reader: Option<TaskId>,
439        options: ReadOutputOptions,
440        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
441    ) -> Result<Result<RawVc, EventListener>> {
442        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
443
444        let mut ctx = self.execute_context(turbo_tasks);
445        let need_reader_task = if self.should_track_dependencies()
446            && !matches!(options.tracking, ReadTracking::Untracked)
447            && reader.is_some_and(|reader_id| reader_id != task_id)
448            && let Some(reader_id) = reader
449            && reader_id != task_id
450        {
451            Some(reader_id)
452        } else {
453            None
454        };
455        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
456            // Having a task_pair here is not optimal, but otherwise this would lead to a race
457            // condition. See below.
458            // TODO(sokra): solve that in a more performant way.
459            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
460            (task, Some(reader))
461        } else {
462            (ctx.task(task_id, TaskDataCategory::All), None)
463        };
464
465        fn listen_to_done_event(
466            reader_description: Option<EventDescription>,
467            tracking: ReadTracking,
468            done_event: &Event,
469        ) -> EventListener {
470            done_event.listen_with_note(move || {
471                move || {
472                    if let Some(reader_description) = reader_description.as_ref() {
473                        format!(
474                            "try_read_task_output from {} ({})",
475                            reader_description, tracking
476                        )
477                    } else {
478                        format!("try_read_task_output ({})", tracking)
479                    }
480                }
481            })
482        }
483
484        fn check_in_progress(
485            task: &impl TaskGuard,
486            reader_description: Option<EventDescription>,
487            tracking: ReadTracking,
488        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
489        {
490            match task.get_in_progress() {
491                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
492                    listen_to_done_event(reader_description, tracking, done_event),
493                ))),
494                Some(InProgressState::InProgress(box InProgressStateInner {
495                    done_event, ..
496                })) => Some(Ok(Err(listen_to_done_event(
497                    reader_description,
498                    tracking,
499                    done_event,
500                )))),
501                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
502                    "{} was canceled",
503                    task.get_task_description()
504                ))),
505                None => None,
506            }
507        }
508
509        if matches!(options.consistency, ReadConsistency::Strong) {
510            if task
511                .get_persistent_task_type()
512                .is_some_and(|t| !t.native_fn.is_root)
513            {
514                drop(task);
515                drop(reader_task);
516                panic!(
517                    "Strongly consistent read of non-root task {} (reader: {}). The `root` \
518                     attribute is missing on the task.",
519                    self.debug_get_task_description(task_id),
520                    reader.map_or_else(
521                        || "unknown".to_string(),
522                        |r| self.debug_get_task_description(r)
523                    )
524                );
525            }
526
527            let is_dirty = task.is_dirty();
528
529            // Check the dirty count of the root node
530            let has_dirty_containers = task.has_dirty_containers();
531            if has_dirty_containers || is_dirty.is_some() {
532                let activeness = task.get_activeness_mut();
533                let mut task_ids_to_schedule: Vec<_> = Vec::new();
534                // When there are dirty task, subscribe to the all_clean_event
535                let activeness = if let Some(activeness) = activeness {
536                    // This makes sure all tasks stay active and this task won't stale.
537                    // active_until_clean is automatically removed when this
538                    // task is clean.
539                    activeness.set_active_until_clean();
540                    activeness
541                } else {
542                    // If we don't have a root state, add one. This also makes sure all tasks stay
543                    // active and this task won't stale. active_until_clean
544                    // is automatically removed when this task is clean.
545                    if ctx.should_track_activeness() {
546                        // A newly added Activeness need to make sure to schedule the tasks
547                        task_ids_to_schedule = task.dirty_containers().collect();
548                        task_ids_to_schedule.push(task_id);
549                    }
550                    let activeness =
551                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
552                    activeness.set_active_until_clean();
553                    activeness
554                };
555                let listener = activeness.all_clean_event.listen_with_note(move || {
556                    let this = self.clone();
557                    let tt = turbo_tasks.pin();
558                    move || {
559                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
560                        let mut ctx = this.execute_context(tt);
561                        let mut visited = FxHashSet::default();
562                        fn indent(s: &str) -> String {
563                            s.split_inclusive('\n')
564                                .flat_map(|line: &str| ["  ", line].into_iter())
565                                .collect::<String>()
566                        }
567                        fn get_info(
568                            ctx: &mut impl ExecuteContext<'_>,
569                            task_id: TaskId,
570                            parent_and_count: Option<(TaskId, i32)>,
571                            visited: &mut FxHashSet<TaskId>,
572                        ) -> String {
573                            let task = ctx.task(task_id, TaskDataCategory::All);
574                            let is_dirty = task.is_dirty();
575                            let in_progress =
576                                task.get_in_progress()
577                                    .map_or("not in progress", |p| match p {
578                                        InProgressState::InProgress(_) => "in progress",
579                                        InProgressState::Scheduled { .. } => "scheduled",
580                                        InProgressState::Canceled => "canceled",
581                                    });
582                            let activeness = task.get_activeness().map_or_else(
583                                || "not active".to_string(),
584                                |activeness| format!("{activeness:?}"),
585                            );
586                            let aggregation_number = get_aggregation_number(&task);
587                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
588                            {
589                                let uppers = get_uppers(&task);
590                                !uppers.contains(&parent_task_id)
591                            } else {
592                                false
593                            };
594
595                            // Check the dirty count of the root node
596                            let has_dirty_containers = task.has_dirty_containers();
597
598                            let task_description = task.get_task_description();
599                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
600                                format!(", dirty({parent_priority})")
601                            } else {
602                                String::new()
603                            };
604                            let has_dirty_containers_label = if has_dirty_containers {
605                                ", dirty containers"
606                            } else {
607                                ""
608                            };
609                            let count = if let Some((_, count)) = parent_and_count {
610                                format!(" {count}")
611                            } else {
612                                String::new()
613                            };
614                            let mut info = format!(
615                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
616                                 {in_progress}, \
617                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
618                            );
619                            let children: Vec<_> = task.dirty_containers_with_count().collect();
620                            drop(task);
621
622                            if missing_upper {
623                                info.push_str("\n  ERROR: missing upper connection");
624                            }
625
626                            if has_dirty_containers || !children.is_empty() {
627                                writeln!(info, "\n  dirty tasks:").unwrap();
628
629                                for (child_task_id, count) in children {
630                                    let task_description = ctx
631                                        .task(child_task_id, TaskDataCategory::Data)
632                                        .get_task_description();
633                                    if visited.insert(child_task_id) {
634                                        let child_info = get_info(
635                                            ctx,
636                                            child_task_id,
637                                            Some((task_id, count)),
638                                            visited,
639                                        );
640                                        info.push_str(&indent(&child_info));
641                                        if !info.ends_with('\n') {
642                                            info.push('\n');
643                                        }
644                                    } else {
645                                        writeln!(
646                                            info,
647                                            "  {child_task_id} {task_description} {count} \
648                                             (already visited)"
649                                        )
650                                        .unwrap();
651                                    }
652                                }
653                            }
654                            info
655                        }
656                        let info = get_info(&mut ctx, task_id, None, &mut visited);
657                        format!(
658                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
659                        )
660                    }
661                });
662                drop(reader_task);
663                drop(task);
664                if !task_ids_to_schedule.is_empty() {
665                    let mut queue = AggregationUpdateQueue::new();
666                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
667                    queue.execute(&mut ctx);
668                }
669
670                return Ok(Err(listener));
671            }
672        }
673
674        let reader_description = reader_task
675            .as_ref()
676            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
677        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
678        {
679            return value;
680        }
681
682        if let Some(output) = task.get_output() {
683            let result = match output {
684                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
685                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
686                OutputValue::Error(error) => Err(error.clone()),
687            };
688            if let Some(mut reader_task) = reader_task.take()
689                && options.tracking.should_track(result.is_err())
690                && (!task.immutable() || cfg!(feature = "verify_immutable"))
691            {
692                #[cfg(feature = "trace_task_output_dependencies")]
693                let _span = tracing::trace_span!(
694                    "add output dependency",
695                    task = %task_id,
696                    dependent_task = ?reader
697                )
698                .entered();
699                let mut queue = LeafDistanceUpdateQueue::new();
700                let reader = reader.unwrap();
701                if task.add_output_dependent(reader) {
702                    // Ensure that dependent leaf distance is strictly monotonic increasing
703                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
704                    let reader_leaf_distance =
705                        reader_task.get_leaf_distance().copied().unwrap_or_default();
706                    if reader_leaf_distance.distance <= leaf_distance.distance {
707                        queue.push(
708                            reader,
709                            leaf_distance.distance,
710                            leaf_distance.max_distance_in_buffer,
711                        );
712                    }
713                }
714
715                drop(task);
716
717                // Note: We use `task_pair` earlier to lock the task and its reader at the same
718                // time. If we didn't and just locked the reader here, an invalidation could occur
719                // between grabbing the locks. If that happened, and if the task is "outdated" or
720                // doesn't have the dependency edge yet, the invalidation would be lost.
721
722                if !reader_task.remove_outdated_output_dependencies(&task_id) {
723                    let _ = reader_task.add_output_dependencies(task_id);
724                }
725                drop(reader_task);
726
727                queue.execute(&mut ctx);
728            } else {
729                drop(task);
730            }
731
732            return result.map_err(|error| {
733                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
734                    .with_task_context(task_id, turbo_tasks.pin())
735                    .into()
736            });
737        }
738        drop(reader_task);
739
740        let note = EventDescription::new(|| {
741            move || {
742                if let Some(reader) = reader_description.as_ref() {
743                    format!("try_read_task_output (recompute) from {reader}",)
744                } else {
745                    "try_read_task_output (recompute, untracked)".to_string()
746                }
747            }
748        });
749
750        // Output doesn't exist. We need to schedule the task to compute it.
751        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
752            TaskExecutionReason::OutputNotAvailable,
753            EventDescription::new(|| task.get_task_desc_fn()),
754            note,
755        );
756
757        // It's not possible that the task is InProgress at this point. If it is InProgress {
758        // done: true } it must have Output and would early return.
759        let old = task.set_in_progress(in_progress_state);
760        debug_assert!(old.is_none(), "InProgress already exists");
761        ctx.schedule_task(task, TaskPriority::Recomputation);
762
763        Ok(Err(listener))
764    }
765
766    fn try_read_task_cell(
767        &self,
768        task_id: TaskId,
769        reader: Option<TaskId>,
770        cell: CellId,
771        options: ReadCellOptions,
772        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
773    ) -> Result<Result<TypedCellContent, EventListener>> {
774        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
775
776        fn add_cell_dependency(
777            task_id: TaskId,
778            mut task: impl TaskGuard,
779            reader: Option<TaskId>,
780            reader_task: Option<impl TaskGuard>,
781            cell: CellId,
782            key: Option<u64>,
783        ) {
784            if let Some(mut reader_task) = reader_task
785                && (!task.immutable() || cfg!(feature = "verify_immutable"))
786            {
787                let reader = reader.unwrap();
788                let _ = task.add_cell_dependents((cell, key, reader));
789                drop(task);
790
791                // Note: We use `task_pair` earlier to lock the task and its reader at the same
792                // time. If we didn't and just locked the reader here, an invalidation could occur
793                // between grabbing the locks. If that happened, and if the task is "outdated" or
794                // doesn't have the dependency edge yet, the invalidation would be lost.
795
796                let target = CellRef {
797                    task: task_id,
798                    cell,
799                };
800                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
801                    let _ = reader_task.add_cell_dependencies((target, key));
802                }
803                drop(reader_task);
804            }
805        }
806
807        let ReadCellOptions {
808            tracking,
809            final_read_hint,
810        } = options;
811
812        let mut ctx = self.execute_context(turbo_tasks);
813        let (mut task, reader_task) = if self.should_track_dependencies()
814            && !matches!(tracking, ReadCellTracking::Untracked)
815            && let Some(reader_id) = reader
816            && reader_id != task_id
817        {
818            // Having a task_pair here is not optimal, but otherwise this would lead to a race
819            // condition. See below.
820            // TODO(sokra): solve that in a more performant way.
821            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
822            (task, Some(reader))
823        } else {
824            (ctx.task(task_id, TaskDataCategory::All), None)
825        };
826
827        let content = if final_read_hint {
828            task.remove_cell_data(&cell)
829        } else {
830            task.get_cell_data(&cell).cloned()
831        };
832        if let Some(content) = content {
833            if tracking.should_track(false) {
834                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
835            }
836            return Ok(Ok(TypedCellContent(
837                cell.type_id,
838                CellContent(Some(content)),
839            )));
840        }
841
842        let in_progress = task.get_in_progress();
843        if matches!(
844            in_progress,
845            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
846        ) {
847            return Ok(Err(self
848                .listen_to_cell(&mut task, task_id, &reader_task, cell)
849                .0));
850        }
851        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
852
853        // Check cell index range (cell might not exist at all)
854        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
855        let Some(max_id) = max_id else {
856            let task_desc = task.get_task_description();
857            if tracking.should_track(true) {
858                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
859            }
860            bail!(
861                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
862            );
863        };
864        if cell.index >= max_id {
865            let task_desc = task.get_task_description();
866            if tracking.should_track(true) {
867                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
868            }
869            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
870        }
871
872        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
873        // task to get the cell content.
874
875        // Bail early if the task was cancelled — no point in registering a listener
876        // on a task that won't execute again.
877        if is_cancelled {
878            bail!("{} was canceled", task.get_task_description());
879        }
880
881        // Listen to the cell and potentially schedule the task
882        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
883        drop(reader_task);
884        if !new_listener {
885            return Ok(Err(listener));
886        }
887
888        let _span = tracing::trace_span!(
889            "recomputation",
890            cell_type = get_value_type(cell.type_id).ty.global_name,
891            cell_index = cell.index
892        )
893        .entered();
894
895        let _ = task.add_scheduled(
896            TaskExecutionReason::CellNotAvailable,
897            EventDescription::new(|| task.get_task_desc_fn()),
898        );
899        ctx.schedule_task(task, TaskPriority::Recomputation);
900
901        Ok(Err(listener))
902    }
903
904    fn listen_to_cell(
905        &self,
906        task: &mut impl TaskGuard,
907        task_id: TaskId,
908        reader_task: &Option<impl TaskGuard>,
909        cell: CellId,
910    ) -> (EventListener, bool) {
911        let note = || {
912            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
913            move || {
914                if let Some(reader_desc) = reader_desc.as_ref() {
915                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
916                } else {
917                    "try_read_task_cell (in progress, untracked)".to_string()
918                }
919            }
920        };
921        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
922            // Someone else is already computing the cell
923            let listener = in_progress.event.listen_with_note(note);
924            return (listener, false);
925        }
926        let in_progress = InProgressCellState::new(task_id, cell);
927        let listener = in_progress.event.listen_with_note(note);
928        let old = task.insert_in_progress_cells(cell, in_progress);
929        debug_assert!(old.is_none(), "InProgressCell already exists");
930        (listener, true)
931    }
932
933    fn snapshot_and_persist(
934        &self,
935        parent_span: Option<tracing::Id>,
936        reason: &str,
937        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
938    ) -> Result<(Instant, bool), anyhow::Error> {
939        let snapshot_span =
940            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
941                .entered();
942        // Serialize snapshots. The internal protocol (snapshot_mode, snapshot
943        // request bit, suspended_operations) assumes only one snapshot runs at
944        // a time. Held for the entire snapshot lifecycle.
945        let _snapshot_in_progress = self.snapshot_in_progress.lock();
946        let start = Instant::now();
947        // SystemTime for wall-clock timestamps in trace events (milliseconds
948        // since epoch). Instant is monotonic but has no defined epoch, so it
949        // can't be used for cross-process trace correlation.
950        let wall_start = SystemTime::now();
951        debug_assert!(self.should_persist());
952
953        let mut snapshot_phase = {
954            let _span = tracing::info_span!("blocking").entered();
955            self.snapshot_coord.begin_snapshot()
956        };
957        // Enter snapshot mode, which atomically reads and resets the modified count.
958        // Checking after start_snapshot ensures no concurrent increments can race.
959        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
960
961        let suspended_operations = snapshot_phase.take_suspended_operations();
962
963        let snapshot_time = Instant::now();
964        drop(snapshot_phase);
965
966        if !has_modifications {
967            // No tasks modified since the last snapshot — drop the guard (which
968            // calls end_snapshot) and skip the expensive O(N) scan.
969            drop(snapshot_guard);
970            return Ok((start, false));
971        }
972
973        #[cfg(feature = "print_cache_item_size")]
974        #[derive(Default)]
975        struct TaskCacheStats {
976            data: usize,
977            #[cfg(feature = "print_cache_item_size_with_compressed")]
978            data_compressed: usize,
979            data_count: usize,
980            meta: usize,
981            #[cfg(feature = "print_cache_item_size_with_compressed")]
982            meta_compressed: usize,
983            meta_count: usize,
984            upper_count: usize,
985            collectibles_count: usize,
986            aggregated_collectibles_count: usize,
987            children_count: usize,
988            followers_count: usize,
989            collectibles_dependents_count: usize,
990            aggregated_dirty_containers_count: usize,
991            output_size: usize,
992        }
993        /// Formats a byte size, optionally including the compressed size when the
994        /// `print_cache_item_size_with_compressed` feature is enabled.
995        #[cfg(feature = "print_cache_item_size")]
996        struct FormatSizes {
997            size: usize,
998            #[cfg(feature = "print_cache_item_size_with_compressed")]
999            compressed_size: usize,
1000        }
1001        #[cfg(feature = "print_cache_item_size")]
1002        impl std::fmt::Display for FormatSizes {
1003            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1004                use turbo_tasks::util::FormatBytes;
1005                #[cfg(feature = "print_cache_item_size_with_compressed")]
1006                {
1007                    write!(
1008                        f,
1009                        "{} ({} compressed)",
1010                        FormatBytes(self.size),
1011                        FormatBytes(self.compressed_size)
1012                    )
1013                }
1014                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1015                {
1016                    write!(f, "{}", FormatBytes(self.size))
1017                }
1018            }
1019        }
1020        #[cfg(feature = "print_cache_item_size")]
1021        impl TaskCacheStats {
1022            #[cfg(feature = "print_cache_item_size_with_compressed")]
1023            fn compressed_size(data: &[u8]) -> Result<usize> {
1024                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1025                    data,
1026                    &mut Vec::new(),
1027                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1028                )?)
1029            }
1030
1031            fn add_data(&mut self, data: &[u8]) {
1032                self.data += data.len();
1033                #[cfg(feature = "print_cache_item_size_with_compressed")]
1034                {
1035                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1036                }
1037                self.data_count += 1;
1038            }
1039
1040            fn add_meta(&mut self, data: &[u8]) {
1041                self.meta += data.len();
1042                #[cfg(feature = "print_cache_item_size_with_compressed")]
1043                {
1044                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1045                }
1046                self.meta_count += 1;
1047            }
1048
1049            fn add_counts(&mut self, storage: &TaskStorage) {
1050                let counts = storage.meta_counts();
1051                self.upper_count += counts.upper;
1052                self.collectibles_count += counts.collectibles;
1053                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1054                self.children_count += counts.children;
1055                self.followers_count += counts.followers;
1056                self.collectibles_dependents_count += counts.collectibles_dependents;
1057                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1058                if let Some(output) = storage.get_output() {
1059                    use turbo_bincode::turbo_bincode_encode;
1060
1061                    self.output_size += turbo_bincode_encode(&output)
1062                        .map(|data| data.len())
1063                        .unwrap_or(0);
1064                }
1065            }
1066
1067            /// Returns the task name used as the stats grouping key.
1068            fn task_name(storage: &TaskStorage) -> String {
1069                storage
1070                    .get_persistent_task_type()
1071                    .map(|t| t.to_string())
1072                    .unwrap_or_else(|| "<unknown>".to_string())
1073            }
1074
1075            /// Returns the primary sort key: compressed total when
1076            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1077            fn sort_key(&self) -> usize {
1078                #[cfg(feature = "print_cache_item_size_with_compressed")]
1079                {
1080                    self.data_compressed + self.meta_compressed
1081                }
1082                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1083                {
1084                    self.data + self.meta
1085                }
1086            }
1087
1088            fn format_total(&self) -> FormatSizes {
1089                FormatSizes {
1090                    size: self.data + self.meta,
1091                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1092                    compressed_size: self.data_compressed + self.meta_compressed,
1093                }
1094            }
1095
1096            fn format_data(&self) -> FormatSizes {
1097                FormatSizes {
1098                    size: self.data,
1099                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1100                    compressed_size: self.data_compressed,
1101                }
1102            }
1103
1104            fn format_avg_data(&self) -> FormatSizes {
1105                FormatSizes {
1106                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1107                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1108                    compressed_size: self
1109                        .data_compressed
1110                        .checked_div(self.data_count)
1111                        .unwrap_or(0),
1112                }
1113            }
1114
1115            fn format_meta(&self) -> FormatSizes {
1116                FormatSizes {
1117                    size: self.meta,
1118                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1119                    compressed_size: self.meta_compressed,
1120                }
1121            }
1122
1123            fn format_avg_meta(&self) -> FormatSizes {
1124                FormatSizes {
1125                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1126                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1127                    compressed_size: self
1128                        .meta_compressed
1129                        .checked_div(self.meta_count)
1130                        .unwrap_or(0),
1131                }
1132            }
1133        }
1134        #[cfg(feature = "print_cache_item_size")]
1135        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1136            Mutex::new(FxHashMap::default());
1137
1138        // Encode each task's modified categories. We only encode categories with `modified` set,
1139        // meaning the category was actually dirtied. Categories restored from disk but never
1140        // modified don't need re-persisting since the on-disk version is still valid.
1141        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1142        // flags were copied from the live task at snapshot creation time, reflecting which
1143        // categories were dirtied before the snapshot was taken.
1144        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1145            let encode_category = |task_id: TaskId,
1146                                   data: &TaskStorage,
1147                                   category: SpecificTaskDataCategory,
1148                                   buffer: &mut TurboBincodeBuffer|
1149             -> Option<TurboBincodeBuffer> {
1150                match encode_task_data(task_id, data, category, buffer) {
1151                    Ok(encoded) => {
1152                        #[cfg(feature = "print_cache_item_size")]
1153                        {
1154                            let mut stats = task_cache_stats.lock();
1155                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1156                            match category {
1157                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1158                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1159                            }
1160                        }
1161                        Some(encoded)
1162                    }
1163                    Err(err) => {
1164                        panic!(
1165                            "Serializing task {} failed ({:?}): {:?}",
1166                            self.debug_get_task_description(task_id),
1167                            category,
1168                            err
1169                        );
1170                    }
1171                }
1172            };
1173            if task_id.is_transient() {
1174                unreachable!("transient task_ids should never be enqueued to be persisted");
1175            }
1176
1177            let encode_meta = inner.flags.meta_modified();
1178            let encode_data = inner.flags.data_modified();
1179
1180            #[cfg(feature = "print_cache_item_size")]
1181            if encode_data || encode_meta {
1182                task_cache_stats
1183                    .lock()
1184                    .entry(TaskCacheStats::task_name(inner))
1185                    .or_default()
1186                    .add_counts(inner);
1187            }
1188
1189            let meta = if encode_meta {
1190                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1191            } else {
1192                None
1193            };
1194
1195            let data = if encode_data {
1196                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1197            } else {
1198                None
1199            };
1200            let task_type_hash = if inner.flags.new_task() {
1201                let task_type = inner.get_persistent_task_type().expect(
1202                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1203                     creation for persistent tasks uses a single ExecutionContextImpl for \
1204                     creating the task (which sets new_task) and connect_child (which sets \
1205                     persistent_task_type) and take_snapshot waits for all operations to complete \
1206                     or suspend before we start snapshotting.  So task creation will always set \
1207                     the task_type.",
1208                );
1209                Some(compute_task_type_hash(task_type))
1210            } else {
1211                None
1212            };
1213
1214            SnapshotItem {
1215                task_id,
1216                meta,
1217                data,
1218                task_type_hash,
1219            }
1220        };
1221
1222        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1223
1224        drop(snapshot_span);
1225        let snapshot_duration = start.elapsed();
1226        let task_count = task_snapshots.len();
1227
1228        if task_snapshots.is_empty() {
1229            // This should be impossible — if we got here, modified_count was nonzero, and every
1230            // modification that increments the count also failed during encoding.
1231            std::hint::cold_path();
1232            return Ok((snapshot_time, false));
1233        }
1234
1235        let persist_start = Instant::now();
1236        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1237        {
1238            // Tasks were already consumed by take_snapshot, so a future snapshot
1239            // would not re-persist them — returning an error signals to the caller
1240            // that further persist attempts would corrupt the task graph in storage.
1241            self.backing_storage
1242                .save_snapshot(suspended_operations, task_snapshots)?;
1243            #[cfg(feature = "print_cache_item_size")]
1244            {
1245                let mut task_cache_stats = task_cache_stats
1246                    .into_inner()
1247                    .into_iter()
1248                    .collect::<Vec<_>>();
1249                if !task_cache_stats.is_empty() {
1250                    use turbo_tasks::util::FormatBytes;
1251
1252                    use crate::utils::markdown_table::print_markdown_table;
1253
1254                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1255                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1256                    });
1257
1258                    println!(
1259                        "Task cache stats: {}",
1260                        FormatSizes {
1261                            size: task_cache_stats
1262                                .iter()
1263                                .map(|(_, s)| s.data + s.meta)
1264                                .sum::<usize>(),
1265                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1266                            compressed_size: task_cache_stats
1267                                .iter()
1268                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1269                                .sum::<usize>()
1270                        },
1271                    );
1272
1273                    print_markdown_table(
1274                        [
1275                            "Task",
1276                            " Total Size",
1277                            " Data Size",
1278                            " Data Count x Avg",
1279                            " Data Count x Avg",
1280                            " Meta Size",
1281                            " Meta Count x Avg",
1282                            " Meta Count x Avg",
1283                            " Uppers",
1284                            " Coll",
1285                            " Agg Coll",
1286                            " Children",
1287                            " Followers",
1288                            " Coll Deps",
1289                            " Agg Dirty",
1290                            " Output Size",
1291                        ],
1292                        task_cache_stats.iter(),
1293                        |(task_desc, stats)| {
1294                            [
1295                                task_desc.to_string(),
1296                                format!(" {}", stats.format_total()),
1297                                format!(" {}", stats.format_data()),
1298                                format!(" {} x", stats.data_count),
1299                                format!("{}", stats.format_avg_data()),
1300                                format!(" {}", stats.format_meta()),
1301                                format!(" {} x", stats.meta_count),
1302                                format!("{}", stats.format_avg_meta()),
1303                                format!(" {}", stats.upper_count),
1304                                format!(" {}", stats.collectibles_count),
1305                                format!(" {}", stats.aggregated_collectibles_count),
1306                                format!(" {}", stats.children_count),
1307                                format!(" {}", stats.followers_count),
1308                                format!(" {}", stats.collectibles_dependents_count),
1309                                format!(" {}", stats.aggregated_dirty_containers_count),
1310                                format!(" {}", FormatBytes(stats.output_size)),
1311                            ]
1312                        },
1313                    );
1314                }
1315            }
1316        }
1317
1318        let elapsed = start.elapsed();
1319        let persist_duration = persist_start.elapsed();
1320        // avoid spamming the event queue with information about fast operations
1321        if elapsed > Duration::from_secs(10) {
1322            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1323                "Finished writing to filesystem cache".to_string(),
1324                elapsed,
1325            )));
1326        }
1327
1328        let wall_start_ms = wall_start
1329            .duration_since(SystemTime::UNIX_EPOCH)
1330            .unwrap_or_default()
1331            // as_millis_f64 is not stable yet
1332            .as_secs_f64()
1333            * 1000.0;
1334        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1335        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1336            "turbopack-persistence",
1337            wall_start_ms,
1338            wall_end_ms,
1339            vec![
1340                ("reason", serde_json::Value::from(reason)),
1341                (
1342                    "snapshot_duration_ms",
1343                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1344                ),
1345                (
1346                    "persist_duration_ms",
1347                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1348                ),
1349                ("task_count", serde_json::Value::from(task_count)),
1350            ],
1351        )));
1352
1353        Ok((snapshot_time, true))
1354    }
1355
1356    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1357        if self.should_restore() {
1358            // Continue all uncompleted operations
1359            // They can't be interrupted by a snapshot since the snapshotting job has not been
1360            // scheduled yet.
1361            let uncompleted_operations = self
1362                .backing_storage
1363                .uncompleted_operations()
1364                .expect("Failed to get uncompleted operations");
1365            if !uncompleted_operations.is_empty() {
1366                let mut ctx = self.execute_context(turbo_tasks);
1367                for op in uncompleted_operations {
1368                    op.execute(&mut ctx);
1369                }
1370            }
1371        }
1372
1373        // Only when it should write regularly to the storage, we schedule the initial snapshot
1374        // job.
1375        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1376            // Schedule the snapshot job
1377            let _span = trace_span!("persisting background job").entered();
1378            let _span = tracing::info_span!("thread").entered();
1379            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1380        }
1381    }
1382
1383    fn stopping(&self) {
1384        self.stopping.store(true, Ordering::Release);
1385        self.stopping_event.notify(usize::MAX);
1386    }
1387
1388    #[allow(unused_variables)]
1389    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1390        #[cfg(feature = "verify_aggregation_graph")]
1391        {
1392            self.is_idle.store(false, Ordering::Release);
1393            self.verify_aggregation_graph(turbo_tasks, false);
1394        }
1395        if self.should_persist()
1396            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1397        {
1398            eprintln!("Persisting failed during shutdown: {err:?}");
1399        }
1400        self.storage.drop_contents();
1401        if let Err(err) = self.backing_storage.shutdown() {
1402            println!("Shutting down failed: {err}");
1403        }
1404    }
1405
1406    #[allow(unused_variables)]
1407    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1408        self.idle_start_event.notify(usize::MAX);
1409
1410        #[cfg(feature = "verify_aggregation_graph")]
1411        {
1412            use tokio::select;
1413
1414            self.is_idle.store(true, Ordering::Release);
1415            let this = self.clone();
1416            let turbo_tasks = turbo_tasks.pin();
1417            tokio::task::spawn(async move {
1418                select! {
1419                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1420                        // do nothing
1421                    }
1422                    _ = this.idle_end_event.listen() => {
1423                        return;
1424                    }
1425                }
1426                if !this.is_idle.load(Ordering::Relaxed) {
1427                    return;
1428                }
1429                this.verify_aggregation_graph(&*turbo_tasks, true);
1430            });
1431        }
1432    }
1433
1434    fn idle_end(&self) {
1435        #[cfg(feature = "verify_aggregation_graph")]
1436        self.is_idle.store(false, Ordering::Release);
1437        self.idle_end_event.notify(usize::MAX);
1438    }
1439
1440    fn get_or_create_task(
1441        &self,
1442        native_fn: &'static NativeFunction,
1443        this: Option<RawVc>,
1444        arg: &mut dyn StackDynTaskInputs,
1445        parent_task: Option<TaskId>,
1446        persistence: TaskPersistence,
1447        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1448    ) -> TaskId {
1449        let transient = matches!(persistence, TaskPersistence::Transient);
1450
1451        if transient
1452            && let Some(parent_task) = parent_task
1453            && !parent_task.is_transient()
1454        {
1455            let task_type = CachedTaskType {
1456                native_fn,
1457                this,
1458                arg: arg.take_box(),
1459            };
1460            self.panic_persistent_calling_transient(
1461                self.debug_get_task_description(parent_task),
1462                Some(&task_type),
1463                /* cell_id */ None,
1464            );
1465        }
1466
1467        let is_root = native_fn.is_root;
1468
1469        // Compute hash and shard index once from borrowed components (no heap allocation).
1470        let arg_ref = arg.as_ref();
1471        let hash = CachedTaskType::hash_from_components(
1472            self.storage.task_cache.hasher(),
1473            native_fn,
1474            this,
1475            arg_ref,
1476        );
1477        // Locate the shard once so that the read-only lookup and any
1478        // write-lock retry below share the same reference (saves a modulo +
1479        // memory lookup on the miss path).
1480        let shard = get_shard(&self.storage.task_cache, hash);
1481
1482        let mut ctx = self.execute_context(turbo_tasks);
1483        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1484        // Use a read lock rather than a write lock to avoid contention. connect_child
1485        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1486        if let Some(task_id) =
1487            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1488        {
1489            self.track_cache_hit_by_fn(native_fn);
1490            operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1491            return task_id;
1492        }
1493
1494        // Step 2: Check backing storage using borrowed components (no box needed yet).
1495
1496        // Task exists in backing storage.
1497        // We only need to insert it into the in-memory cache.
1498        let task_id = if !transient
1499            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1500        {
1501            self.track_cache_hit_by_fn(native_fn);
1502            // Step 3a: Insert into in-memory cache using the pre-located shard.
1503            // Use the existing Arc from storage to avoid a duplicate allocation.
1504            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1505                k.eq_components(native_fn, this, arg_ref)
1506            }) {
1507                RawEntry::Occupied(_) => {}
1508                RawEntry::Vacant(e) => {
1509                    e.insert(stored_type, task_id);
1510                }
1511            };
1512            task_id
1513        } else {
1514            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1515                k.eq_components(native_fn, this, arg_ref)
1516            }) {
1517                RawEntry::Occupied(e) => {
1518                    // Another thread beat us to creating this task — use their task_id.
1519                    // They will handle logging the new task as modified.
1520                    let task_id = *e.get();
1521                    drop(e);
1522                    self.track_cache_hit_by_fn(native_fn);
1523                    task_id
1524                }
1525                RawEntry::Vacant(e) => {
1526                    // Only now do we force the allocation.
1527                    // NOTE: if our caller had to perform resolution, then this will have already
1528                    // been boxed and take_box just takes it.
1529                    let task_type = Arc::new(CachedTaskType {
1530                        native_fn,
1531                        this,
1532                        arg: arg.take_box(),
1533                    });
1534                    let task_id = if transient {
1535                        self.transient_task_id_factory.get()
1536                    } else {
1537                        self.persisted_task_id_factory.get()
1538                    };
1539                    // Initialize storage BEFORE making task_id visible in the cache.
1540                    // This ensures any thread that reads task_id from the cache sees
1541                    // the storage entry already initialized (restored flags set).
1542                    self.storage
1543                        .initialize_new_task(task_id, Some(task_type.clone()));
1544                    // insert() consumes e, releasing the shard write lock.
1545                    e.insert(task_type, task_id);
1546                    self.track_cache_miss_by_fn(native_fn);
1547                    // Update the aggregation number before connecting the child
1548                    // We don't need this on any of the task recovery paths above because the
1549                    // aggregation number will already be set.
1550                    if is_root {
1551                        AggregationUpdateQueue::run(
1552                            AggregationUpdateJob::UpdateAggregationNumber {
1553                                task_id,
1554                                base_aggregation_number: u32::MAX,
1555                                distance: None,
1556                            },
1557                            &mut ctx,
1558                        );
1559                    } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1560                        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1561                        AggregationUpdateQueue::run(
1562                            AggregationUpdateJob::UpdateAggregationNumber {
1563                                task_id,
1564                                base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1565                                distance: None,
1566                            },
1567                            &mut ctx,
1568                        );
1569                    };
1570
1571                    task_id
1572                }
1573            }
1574        };
1575
1576        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1577
1578        task_id
1579    }
1580
1581    /// Generate an object that implements [`fmt::Display`] explaining why the given
1582    /// [`CachedTaskType`] is transient.
1583    fn debug_trace_transient_task(
1584        &self,
1585        task_type: &CachedTaskType,
1586        cell_id: Option<CellId>,
1587    ) -> DebugTraceTransientTask {
1588        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1589        // from tracing the same task many times, so use a visited_set
1590        fn inner_id(
1591            backend: &TurboTasksBackendInner<impl BackingStorage>,
1592            task_id: TaskId,
1593            cell_type_id: Option<ValueTypeId>,
1594            visited_set: &mut FxHashSet<TaskId>,
1595        ) -> DebugTraceTransientTask {
1596            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1597                if visited_set.contains(&task_id) {
1598                    let task_name = task_type.get_name();
1599                    DebugTraceTransientTask::Collapsed {
1600                        task_name,
1601                        cell_type_id,
1602                    }
1603                } else {
1604                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1605                }
1606            } else {
1607                DebugTraceTransientTask::Uncached { cell_type_id }
1608            }
1609        }
1610        fn inner_cached(
1611            backend: &TurboTasksBackendInner<impl BackingStorage>,
1612            task_type: &CachedTaskType,
1613            cell_type_id: Option<ValueTypeId>,
1614            visited_set: &mut FxHashSet<TaskId>,
1615        ) -> DebugTraceTransientTask {
1616            let task_name = task_type.get_name();
1617
1618            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1619                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1620                    // `task_id` should never be `None` at this point, as that would imply a
1621                    // non-local task is returning a local `Vc`...
1622                    // Just ignore if it happens, as we're likely already panicking.
1623                    return None;
1624                };
1625                if task_id.is_transient() {
1626                    Some(Box::new(inner_id(
1627                        backend,
1628                        task_id,
1629                        cause_self_raw_vc.try_get_type_id(),
1630                        visited_set,
1631                    )))
1632                } else {
1633                    None
1634                }
1635            });
1636            let cause_args = task_type
1637                .arg
1638                .get_raw_vcs()
1639                .into_iter()
1640                .filter_map(|raw_vc| {
1641                    let Some(task_id) = raw_vc.try_get_task_id() else {
1642                        // `task_id` should never be `None` (see comment above)
1643                        return None;
1644                    };
1645                    if !task_id.is_transient() {
1646                        return None;
1647                    }
1648                    Some((task_id, raw_vc.try_get_type_id()))
1649                })
1650                .collect::<IndexSet<_>>() // dedupe
1651                .into_iter()
1652                .map(|(task_id, cell_type_id)| {
1653                    inner_id(backend, task_id, cell_type_id, visited_set)
1654                })
1655                .collect();
1656
1657            DebugTraceTransientTask::Cached {
1658                task_name,
1659                cell_type_id,
1660                cause_self,
1661                cause_args,
1662            }
1663        }
1664        inner_cached(
1665            self,
1666            task_type,
1667            cell_id.map(|c| c.type_id),
1668            &mut FxHashSet::default(),
1669        )
1670    }
1671
1672    fn invalidate_task(
1673        &self,
1674        task_id: TaskId,
1675        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1676    ) {
1677        if !self.should_track_dependencies() {
1678            panic!("Dependency tracking is disabled so invalidation is not allowed");
1679        }
1680        operation::InvalidateOperation::run(
1681            smallvec![task_id],
1682            #[cfg(feature = "trace_task_dirty")]
1683            TaskDirtyCause::Invalidator,
1684            self.execute_context(turbo_tasks),
1685        );
1686    }
1687
1688    fn invalidate_tasks(
1689        &self,
1690        tasks: &[TaskId],
1691        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1692    ) {
1693        if !self.should_track_dependencies() {
1694            panic!("Dependency tracking is disabled so invalidation is not allowed");
1695        }
1696        operation::InvalidateOperation::run(
1697            tasks.iter().copied().collect(),
1698            #[cfg(feature = "trace_task_dirty")]
1699            TaskDirtyCause::Unknown,
1700            self.execute_context(turbo_tasks),
1701        );
1702    }
1703
1704    fn invalidate_tasks_set(
1705        &self,
1706        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1707        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1708    ) {
1709        if !self.should_track_dependencies() {
1710            panic!("Dependency tracking is disabled so invalidation is not allowed");
1711        }
1712        operation::InvalidateOperation::run(
1713            tasks.iter().copied().collect(),
1714            #[cfg(feature = "trace_task_dirty")]
1715            TaskDirtyCause::Unknown,
1716            self.execute_context(turbo_tasks),
1717        );
1718    }
1719
1720    fn invalidate_serialization(
1721        &self,
1722        task_id: TaskId,
1723        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1724    ) {
1725        if task_id.is_transient() {
1726            return;
1727        }
1728        let mut ctx = self.execute_context(turbo_tasks);
1729        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1730        task.invalidate_serialization();
1731    }
1732
1733    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1734        let task = self.storage.access_mut(task_id);
1735        if let Some(value) = task.get_persistent_task_type() {
1736            format!("{task_id:?} {}", value)
1737        } else if let Some(value) = task.get_transient_task_type() {
1738            format!("{task_id:?} {}", value)
1739        } else {
1740            format!("{task_id:?} unknown")
1741        }
1742    }
1743
1744    fn get_task_name(
1745        &self,
1746        task_id: TaskId,
1747        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1748    ) -> String {
1749        let mut ctx = self.execute_context(turbo_tasks);
1750        let task = ctx.task(task_id, TaskDataCategory::Data);
1751        if let Some(value) = task.get_persistent_task_type() {
1752            value.to_string()
1753        } else if let Some(value) = task.get_transient_task_type() {
1754            value.to_string()
1755        } else {
1756            "unknown".to_string()
1757        }
1758    }
1759
1760    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1761        let task = self.storage.access_mut(task_id);
1762        task.get_persistent_task_type().cloned()
1763    }
1764
1765    fn task_execution_canceled(
1766        &self,
1767        task_id: TaskId,
1768        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1769    ) {
1770        let mut ctx = self.execute_context(turbo_tasks);
1771        let mut task = ctx.task(task_id, TaskDataCategory::All);
1772        if let Some(in_progress) = task.take_in_progress() {
1773            match in_progress {
1774                InProgressState::Scheduled {
1775                    done_event,
1776                    reason: _,
1777                } => done_event.notify(usize::MAX),
1778                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1779                    done_event.notify(usize::MAX)
1780                }
1781                InProgressState::Canceled => {}
1782            }
1783        }
1784        // Notify any readers waiting on in-progress cells so their listeners
1785        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1786        let in_progress_cells = task.take_in_progress_cells();
1787        if let Some(ref cells) = in_progress_cells {
1788            for state in cells.values() {
1789                state.event.notify(usize::MAX);
1790            }
1791        }
1792
1793        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1794        // in the next session. Without this, any reader that encounters the cancelled task
1795        // records an error in its output. That error is persisted and would poison
1796        // subsequent builds. By marking the task session-dependent dirty, the next build
1797        // re-executes it, which invalidates dependents and corrects the stale errors.
1798        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1799            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1800        } else {
1801            None
1802        };
1803
1804        let old = task.set_in_progress(InProgressState::Canceled);
1805        debug_assert!(old.is_none(), "InProgress already exists");
1806        drop(task);
1807
1808        if let Some(data_update) = data_update {
1809            AggregationUpdateQueue::run(data_update, &mut ctx);
1810        }
1811
1812        drop(in_progress_cells);
1813    }
1814
1815    fn try_start_task_execution(
1816        &self,
1817        task_id: TaskId,
1818        priority: TaskPriority,
1819        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1820    ) -> Option<TaskExecutionSpec<'_>> {
1821        let execution_reason;
1822        let task_type;
1823        {
1824            let mut ctx = self.execute_context(turbo_tasks);
1825            let mut task = ctx.task(task_id, TaskDataCategory::All);
1826            task_type = task.get_task_type().to_owned();
1827            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1828            if let Some(tasks) = task.prefetch() {
1829                drop(task);
1830                ctx.prepare_tasks(tasks, "prefetch");
1831                task = ctx.task(task_id, TaskDataCategory::All);
1832            }
1833            let in_progress = task.take_in_progress()?;
1834            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1835                let old = task.set_in_progress(in_progress);
1836                debug_assert!(old.is_none(), "InProgress already exists");
1837                return None;
1838            };
1839            execution_reason = reason;
1840            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1841                InProgressStateInner {
1842                    stale: false,
1843                    once_task,
1844                    done_event,
1845                    marked_as_completed: false,
1846                    new_children: Default::default(),
1847                },
1848            )));
1849            debug_assert!(old.is_none(), "InProgress already exists");
1850
1851            // Make all current collectibles outdated (remove left-over outdated collectibles)
1852            enum Collectible {
1853                Current(CollectibleRef, i32),
1854                Outdated(CollectibleRef),
1855            }
1856            let collectibles = task
1857                .iter_collectibles()
1858                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1859                .chain(
1860                    task.iter_outdated_collectibles()
1861                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1862                )
1863                .collect::<Vec<_>>();
1864            for collectible in collectibles {
1865                match collectible {
1866                    Collectible::Current(collectible, value) => {
1867                        let _ = task.insert_outdated_collectible(collectible, value);
1868                    }
1869                    Collectible::Outdated(collectible) => {
1870                        if task
1871                            .collectibles()
1872                            .is_none_or(|m| m.get(&collectible).is_none())
1873                        {
1874                            task.remove_outdated_collectibles(&collectible);
1875                        }
1876                    }
1877                }
1878            }
1879
1880            if self.should_track_dependencies() {
1881                // Make all dependencies outdated
1882                let cell_dependencies = task.iter_cell_dependencies().collect();
1883                task.set_outdated_cell_dependencies(cell_dependencies);
1884
1885                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1886                task.set_outdated_output_dependencies(outdated_output_dependencies);
1887            }
1888        }
1889
1890        let (span, future) = match task_type {
1891            TaskType::Cached(task_type) => {
1892                let CachedTaskType {
1893                    native_fn,
1894                    this,
1895                    arg,
1896                } = &*task_type;
1897                (
1898                    native_fn.span(task_id.persistence(), execution_reason, priority),
1899                    native_fn.execute(*this, &**arg),
1900                )
1901            }
1902            TaskType::Transient(task_type) => {
1903                let span = tracing::trace_span!("turbo_tasks::root_task");
1904                let future = match &*task_type {
1905                    TransientTask::Root(f) => f(),
1906                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1907                };
1908                (span, future)
1909            }
1910        };
1911        Some(TaskExecutionSpec { future, span })
1912    }
1913
1914    /// Returns `Some(priority)` if the task became stale during execution and needs to be
1915    /// re-scheduled at the given priority. The caller (turbo-tasks manager) hands it to the
1916    /// priority runner so re-execution doesn't inherit the original schedule priority.
1917    fn task_execution_completed(
1918        &self,
1919        task_id: TaskId,
1920        result: Result<RawVc, TurboTasksExecutionError>,
1921        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1922        #[cfg(feature = "verify_determinism")] stateful: bool,
1923        has_invalidator: bool,
1924        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1925    ) -> Option<TaskPriority> {
1926        // Task completion is a 4 step process:
1927        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1928        //    aggregation number of the task and the new children.
1929        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1930        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1931        // 4. Shrink the task memory to reduce footprint of the task.
1932
1933        // Due to persistence it is possible that the process is cancelled after any step. This is
1934        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1935        // in-memory representation.
1936
1937        // The task might be invalidated during this process, so we need to check the stale flag
1938        // at the start of every step.
1939
1940        #[cfg(not(feature = "trace_task_details"))]
1941        let span = tracing::trace_span!(
1942            "task execution completed",
1943            new_children = tracing::field::Empty
1944        )
1945        .entered();
1946        #[cfg(feature = "trace_task_details")]
1947        let span = tracing::trace_span!(
1948            "task execution completed",
1949            task_id = display(task_id),
1950            result = match result.as_ref() {
1951                Ok(value) => display(either::Either::Left(value)),
1952                Err(err) => display(either::Either::Right(err)),
1953            },
1954            new_children = tracing::field::Empty,
1955            immutable = tracing::field::Empty,
1956            new_output = tracing::field::Empty,
1957            output_dependents = tracing::field::Empty,
1958            aggregation_number = tracing::field::Empty,
1959            stale = tracing::field::Empty,
1960        )
1961        .entered();
1962
1963        let is_error = result.is_err();
1964
1965        let mut ctx = self.execute_context(turbo_tasks);
1966
1967        let TaskExecutionCompletePrepareResult {
1968            new_children,
1969            is_now_immutable,
1970            #[cfg(feature = "verify_determinism")]
1971            no_output_set,
1972            new_output,
1973            output_dependent_tasks,
1974            is_recomputation,
1975            is_session_dependent,
1976        } = match self.task_execution_completed_prepare(
1977            &mut ctx,
1978            #[cfg(feature = "trace_task_details")]
1979            &span,
1980            task_id,
1981            result,
1982            cell_counters,
1983            #[cfg(feature = "verify_determinism")]
1984            stateful,
1985            has_invalidator,
1986        ) {
1987            Ok(r) => r,
1988            Err(stale_priority) => {
1989                // Task was stale and has been rescheduled
1990                #[cfg(feature = "trace_task_details")]
1991                span.record("stale", "prepare");
1992                return Some(stale_priority);
1993            }
1994        };
1995
1996        #[cfg(feature = "trace_task_details")]
1997        span.record("new_output", new_output.is_some());
1998        #[cfg(feature = "trace_task_details")]
1999        span.record("output_dependents", output_dependent_tasks.len());
2000
2001        // When restoring from filesystem cache the following might not be executed (since we can
2002        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2003        // would be executed again.
2004
2005        if !output_dependent_tasks.is_empty() {
2006            self.task_execution_completed_invalidate_output_dependent(
2007                &mut ctx,
2008                task_id,
2009                output_dependent_tasks,
2010            );
2011        }
2012
2013        let has_new_children = !new_children.is_empty();
2014        span.record("new_children", new_children.len());
2015
2016        if has_new_children {
2017            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2018        }
2019
2020        if has_new_children
2021            && let Some(stale_priority) =
2022                self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2023        {
2024            // Task was stale and has been rescheduled
2025            #[cfg(feature = "trace_task_details")]
2026            span.record("stale", "connect");
2027            return Some(stale_priority);
2028        }
2029
2030        let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2031            &mut ctx,
2032            task_id,
2033            #[cfg(feature = "verify_determinism")]
2034            no_output_set,
2035            new_output,
2036            is_now_immutable,
2037            is_session_dependent,
2038        );
2039        if let Some(stale_priority) = stale_priority {
2040            // Task was stale and has been rescheduled
2041            #[cfg(feature = "trace_task_details")]
2042            span.record("stale", "finish");
2043            return Some(stale_priority);
2044        }
2045
2046        let removed_data = self.task_execution_completed_cleanup(
2047            &mut ctx,
2048            task_id,
2049            cell_counters,
2050            is_error,
2051            is_recomputation,
2052        );
2053
2054        // Drop data outside of critical sections
2055        drop(removed_data);
2056        drop(in_progress_cells);
2057
2058        None
2059    }
2060
2061    fn task_execution_completed_prepare(
2062        &self,
2063        ctx: &mut impl ExecuteContext<'_>,
2064        #[cfg(feature = "trace_task_details")] span: &Span,
2065        task_id: TaskId,
2066        result: Result<RawVc, TurboTasksExecutionError>,
2067        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2068        #[cfg(feature = "verify_determinism")] stateful: bool,
2069        has_invalidator: bool,
2070    ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2071        let mut task = ctx.task(task_id, TaskDataCategory::All);
2072        let is_recomputation = task.is_dirty().is_none();
2073        // Without dependency tracking, the SessionDependent dirty state is never read (no session
2074        // restore), so skip the work
2075        let is_session_dependent = self.should_track_dependencies()
2076            && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2077        let Some(in_progress) = task.get_in_progress_mut() else {
2078            panic!("Task execution completed, but task is not in progress: {task:#?}");
2079        };
2080        if matches!(in_progress, InProgressState::Canceled) {
2081            return Ok(TaskExecutionCompletePrepareResult {
2082                new_children: Default::default(),
2083                is_now_immutable: false,
2084                #[cfg(feature = "verify_determinism")]
2085                no_output_set: false,
2086                new_output: None,
2087                output_dependent_tasks: Default::default(),
2088                is_recomputation,
2089                is_session_dependent,
2090            });
2091        }
2092        let &mut InProgressState::InProgress(box InProgressStateInner {
2093            stale,
2094            ref mut new_children,
2095            once_task: is_once_task,
2096            ..
2097        }) = in_progress
2098        else {
2099            panic!("Task execution completed, but task is not in progress: {task:#?}");
2100        };
2101
2102        // If the task is stale, reschedule it
2103        #[cfg(not(feature = "no_fast_stale"))]
2104        if stale && !is_once_task {
2105            let stale_priority = compute_stale_priority(&task);
2106            let Some(InProgressState::InProgress(box InProgressStateInner {
2107                done_event,
2108                mut new_children,
2109                ..
2110            })) = task.take_in_progress()
2111            else {
2112                unreachable!();
2113            };
2114            let old = task.set_in_progress(InProgressState::Scheduled {
2115                done_event,
2116                reason: TaskExecutionReason::Stale,
2117            });
2118            debug_assert!(old.is_none(), "InProgress already exists");
2119            // Remove old children from new_children to leave only the children that had their
2120            // active count increased
2121            for task in task.iter_children() {
2122                new_children.remove(&task);
2123            }
2124            drop(task);
2125
2126            // We need to undo the active count increase for the children since we throw away the
2127            // new_children list now.
2128            AggregationUpdateQueue::run(
2129                AggregationUpdateJob::DecreaseActiveCounts {
2130                    task_ids: new_children.into_iter().collect(),
2131                },
2132                ctx,
2133            );
2134            return Err(stale_priority);
2135        }
2136
2137        // take the children from the task to process them
2138        let mut new_children = take(new_children);
2139
2140        // handle stateful (only tracked when verify_determinism is enabled)
2141        #[cfg(feature = "verify_determinism")]
2142        if stateful {
2143            task.set_stateful(true);
2144        }
2145
2146        // handle has_invalidator
2147        if has_invalidator {
2148            task.set_invalidator(true);
2149        }
2150
2151        // handle cell counters: update max index and remove cells that are no longer used
2152        // On error, skip this update: the task may have failed before creating all cells it
2153        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2154        // based on partial counters would cause "cell no longer exists" errors for tasks that
2155        // still hold dependencies on those cells. The old cell data is preserved on error
2156        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2157        // that data is correct.
2158        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2159        // skips cell data removal on error.
2160        if result.is_ok() || is_recomputation {
2161            let old_counters: FxHashMap<_, _> = task
2162                .iter_cell_type_max_index()
2163                .map(|(&k, &v)| (k, v))
2164                .collect();
2165            let mut counters_to_remove = old_counters.clone();
2166
2167            for (&cell_type, &max_index) in cell_counters.iter() {
2168                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2169                    if old_max_index != max_index {
2170                        task.insert_cell_type_max_index(cell_type, max_index);
2171                    }
2172                } else {
2173                    task.insert_cell_type_max_index(cell_type, max_index);
2174                }
2175            }
2176            for (cell_type, _) in counters_to_remove {
2177                task.remove_cell_type_max_index(&cell_type);
2178            }
2179        }
2180
2181        let mut queue = AggregationUpdateQueue::new();
2182
2183        let mut old_edges = Vec::new();
2184
2185        let has_children = !new_children.is_empty();
2186        let is_immutable = task.immutable();
2187        let task_dependencies_for_immutable =
2188            // Task was previously marked as immutable
2189            if !is_immutable
2190            // Task is not session dependent (session dependent tasks can change between sessions)
2191            && !is_session_dependent
2192            // Task has no invalidator
2193            && !task.invalidator()
2194            // Task has no dependencies on collectibles
2195            && task.is_collectibles_dependencies_empty()
2196        {
2197            Some(
2198                // Collect all dependencies on tasks to check if all dependencies are immutable
2199                task.iter_output_dependencies()
2200                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2201                    .collect::<FxHashSet<_>>(),
2202            )
2203        } else {
2204            None
2205        };
2206
2207        if has_children {
2208            // Prepare all new children
2209            let _aggregation_number =
2210                prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2211
2212            #[cfg(feature = "trace_task_details")]
2213            span.record("aggregation_number", _aggregation_number);
2214
2215            // Filter actual new children
2216            old_edges.extend(
2217                task.iter_children()
2218                    .filter(|task| !new_children.remove(task))
2219                    .map(OutdatedEdge::Child),
2220            );
2221        } else {
2222            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2223        }
2224
2225        old_edges.extend(
2226            task.iter_outdated_collectibles()
2227                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2228        );
2229
2230        if self.should_track_dependencies() {
2231            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2232            // At execution start, active deps are copied to outdated as a "before" snapshot.
2233            // During execution, new deps are added to active.
2234            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2235            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2236            // breaking dependency tracking.
2237            old_edges.extend(
2238                task.iter_outdated_cell_dependencies()
2239                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2240            );
2241            old_edges.extend(
2242                task.iter_outdated_output_dependencies()
2243                    .map(OutdatedEdge::OutputDependency),
2244            );
2245        }
2246
2247        // Check if output need to be updated
2248        let current_output = task.get_output();
2249        #[cfg(feature = "verify_determinism")]
2250        let no_output_set = current_output.is_none();
2251        let new_output = match result {
2252            Ok(RawVc::TaskOutput(output_task_id)) => {
2253                if let Some(OutputValue::Output(current_task_id)) = current_output
2254                    && *current_task_id == output_task_id
2255                {
2256                    None
2257                } else {
2258                    Some(OutputValue::Output(output_task_id))
2259                }
2260            }
2261            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2262                if let Some(OutputValue::Cell(CellRef {
2263                    task: current_task_id,
2264                    cell: current_cell,
2265                })) = current_output
2266                    && *current_task_id == output_task_id
2267                    && *current_cell == cell
2268                {
2269                    None
2270                } else {
2271                    Some(OutputValue::Cell(CellRef {
2272                        task: output_task_id,
2273                        cell,
2274                    }))
2275                }
2276            }
2277            Ok(RawVc::LocalOutput(..)) => {
2278                panic!("Non-local tasks must not return a local Vc");
2279            }
2280            Err(err) => {
2281                if let Some(OutputValue::Error(old_error)) = current_output
2282                    && **old_error == err
2283                {
2284                    None
2285                } else {
2286                    Some(OutputValue::Error(Arc::new((&err).into())))
2287                }
2288            }
2289        };
2290        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2291        // When output has changed, grab the dependent tasks
2292        if new_output.is_some() && ctx.should_track_dependencies() {
2293            output_dependent_tasks = task.iter_output_dependent().collect();
2294        }
2295
2296        drop(task);
2297
2298        // Check if the task can be marked as immutable
2299        let mut is_now_immutable = false;
2300        if let Some(dependencies) = task_dependencies_for_immutable
2301            && dependencies
2302                .iter()
2303                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2304        {
2305            is_now_immutable = true;
2306        }
2307        #[cfg(feature = "trace_task_details")]
2308        span.record("immutable", is_immutable || is_now_immutable);
2309
2310        if !queue.is_empty() || !old_edges.is_empty() {
2311            #[cfg(any(
2312                feature = "trace_task_completion",
2313                feature = "trace_aggregation_update_stats"
2314            ))]
2315            let _span =
2316                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2317                    .entered();
2318            // Remove outdated edges first, before removing in_progress+dirty flag.
2319            // We need to make sure all outdated edges are removed before the task can potentially
2320            // be scheduled and executed again
2321            #[cfg(feature = "trace_aggregation_update_stats")]
2322            {
2323                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2324                _span.record("stats", tracing::field::debug(stats));
2325            }
2326            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2327            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2328        }
2329
2330        Ok(TaskExecutionCompletePrepareResult {
2331            new_children,
2332            is_now_immutable,
2333            #[cfg(feature = "verify_determinism")]
2334            no_output_set,
2335            new_output,
2336            output_dependent_tasks,
2337            is_recomputation,
2338            is_session_dependent,
2339        })
2340    }
2341
2342    fn task_execution_completed_invalidate_output_dependent(
2343        &self,
2344        ctx: &mut impl ExecuteContext<'_>,
2345        task_id: TaskId,
2346        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2347    ) {
2348        debug_assert!(!output_dependent_tasks.is_empty());
2349
2350        #[cfg(feature = "trace_task_dirty")]
2351        let task_description = self.debug_get_task_description(task_id);
2352
2353        if output_dependent_tasks.len() > 1 {
2354            ctx.prepare_tasks(
2355                output_dependent_tasks
2356                    .iter()
2357                    .map(|&id| (id, TaskDataCategory::All)),
2358                "invalidate output dependents",
2359            );
2360        }
2361
2362        fn process_output_dependents(
2363            ctx: &mut impl ExecuteContext<'_>,
2364            task_id: TaskId,
2365            #[cfg(feature = "trace_task_dirty")] task_description: &str,
2366            dependent_task_id: TaskId,
2367            queue: &mut AggregationUpdateQueue,
2368        ) {
2369            #[cfg(feature = "trace_task_output_dependencies")]
2370            let span = tracing::trace_span!(
2371                "invalidate output dependency",
2372                task = %task_id,
2373                dependent_task = %dependent_task_id,
2374                result = tracing::field::Empty,
2375            )
2376            .entered();
2377            let mut make_stale = true;
2378            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2379            let transient_task_type = dependent.get_transient_task_type();
2380            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2381                // once tasks are never invalidated
2382                #[cfg(feature = "trace_task_output_dependencies")]
2383                span.record("result", "once task");
2384                return;
2385            }
2386            if dependent.outdated_output_dependencies_contains(&task_id) {
2387                #[cfg(feature = "trace_task_output_dependencies")]
2388                span.record("result", "outdated dependency");
2389                // output dependency is outdated, so it hasn't read the output yet
2390                // and doesn't need to be invalidated
2391                // But importantly we still need to make the task dirty as it should no longer
2392                // be considered as "recomputation".
2393                make_stale = false;
2394            } else if !dependent.output_dependencies_contains(&task_id) {
2395                // output dependency has been removed, so the task doesn't depend on the
2396                // output anymore and doesn't need to be invalidated
2397                #[cfg(feature = "trace_task_output_dependencies")]
2398                span.record("result", "no backward dependency");
2399                return;
2400            }
2401            make_task_dirty_internal(
2402                dependent,
2403                dependent_task_id,
2404                make_stale,
2405                #[cfg(feature = "trace_task_dirty")]
2406                TaskDirtyCause::OutputChange {
2407                    task_description: task_description.to_string(),
2408                },
2409                queue,
2410                ctx,
2411            );
2412            #[cfg(feature = "trace_task_output_dependencies")]
2413            span.record("result", "marked dirty");
2414        }
2415
2416        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2417            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2418            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2419            let _ = scope_and_block(chunks.len(), |scope| {
2420                for chunk in chunks {
2421                    let child_ctx = ctx.child_context();
2422                    #[cfg(feature = "trace_task_dirty")]
2423                    let task_description = &task_description;
2424                    scope.spawn(move || {
2425                        let mut ctx = child_ctx.create();
2426                        let mut queue = AggregationUpdateQueue::new();
2427                        for dependent_task_id in chunk {
2428                            process_output_dependents(
2429                                &mut ctx,
2430                                task_id,
2431                                #[cfg(feature = "trace_task_dirty")]
2432                                task_description,
2433                                dependent_task_id,
2434                                &mut queue,
2435                            )
2436                        }
2437                        queue.execute(&mut ctx);
2438                    });
2439                }
2440            });
2441        } else {
2442            let mut queue = AggregationUpdateQueue::new();
2443            for dependent_task_id in output_dependent_tasks {
2444                process_output_dependents(
2445                    ctx,
2446                    task_id,
2447                    #[cfg(feature = "trace_task_dirty")]
2448                    &task_description,
2449                    dependent_task_id,
2450                    &mut queue,
2451                );
2452            }
2453            queue.execute(ctx);
2454        }
2455    }
2456
2457    fn task_execution_completed_unfinished_children_dirty(
2458        &self,
2459        ctx: &mut impl ExecuteContext<'_>,
2460        new_children: &FxHashSet<TaskId>,
2461    ) {
2462        debug_assert!(!new_children.is_empty());
2463
2464        let mut queue = AggregationUpdateQueue::new();
2465        ctx.for_each_task_all(
2466            new_children.iter().copied(),
2467            "unfinished children dirty",
2468            |child_task, ctx| {
2469                if !child_task.has_output() {
2470                    let child_id = child_task.id();
2471                    make_task_dirty_internal(
2472                        child_task,
2473                        child_id,
2474                        false,
2475                        #[cfg(feature = "trace_task_dirty")]
2476                        TaskDirtyCause::InitialDirty,
2477                        &mut queue,
2478                        ctx,
2479                    );
2480                }
2481            },
2482        );
2483
2484        queue.execute(ctx);
2485    }
2486
2487    fn task_execution_completed_connect(
2488        &self,
2489        ctx: &mut impl ExecuteContext<'_>,
2490        task_id: TaskId,
2491        new_children: FxHashSet<TaskId>,
2492    ) -> Option<TaskPriority> {
2493        debug_assert!(!new_children.is_empty());
2494
2495        let mut task = ctx.task(task_id, TaskDataCategory::All);
2496        let Some(in_progress) = task.get_in_progress() else {
2497            panic!("Task execution completed, but task is not in progress: {task:#?}");
2498        };
2499        if matches!(in_progress, InProgressState::Canceled) {
2500            // Task was canceled in the meantime, so we don't connect the children
2501            return None;
2502        }
2503        let InProgressState::InProgress(box InProgressStateInner {
2504            #[cfg(not(feature = "no_fast_stale"))]
2505            stale,
2506            once_task: is_once_task,
2507            ..
2508        }) = in_progress
2509        else {
2510            panic!("Task execution completed, but task is not in progress: {task:#?}");
2511        };
2512
2513        // If the task is stale, reschedule it
2514        #[cfg(not(feature = "no_fast_stale"))]
2515        if *stale && !is_once_task {
2516            let stale_priority = compute_stale_priority(&task);
2517            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2518                task.take_in_progress()
2519            else {
2520                unreachable!();
2521            };
2522            let old = task.set_in_progress(InProgressState::Scheduled {
2523                done_event,
2524                reason: TaskExecutionReason::Stale,
2525            });
2526            debug_assert!(old.is_none(), "InProgress already exists");
2527            drop(task);
2528
2529            // All `new_children` are currently hold active with an active count and we need to undo
2530            // that. (We already filtered out the old children from that list)
2531            AggregationUpdateQueue::run(
2532                AggregationUpdateJob::DecreaseActiveCounts {
2533                    task_ids: new_children.into_iter().collect(),
2534                },
2535                ctx,
2536            );
2537            return Some(stale_priority);
2538        }
2539
2540        let has_active_count = ctx.should_track_activeness()
2541            && task
2542                .get_activeness()
2543                .is_some_and(|activeness| activeness.active_counter > 0);
2544        connect_children(
2545            ctx,
2546            task_id,
2547            task,
2548            new_children,
2549            has_active_count,
2550            ctx.should_track_activeness(),
2551        );
2552
2553        None
2554    }
2555
2556    #[allow(clippy::type_complexity)]
2557    fn task_execution_completed_finish(
2558        &self,
2559        ctx: &mut impl ExecuteContext<'_>,
2560        task_id: TaskId,
2561        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2562        new_output: Option<OutputValue>,
2563        is_now_immutable: bool,
2564        is_session_dependent: bool,
2565    ) -> (
2566        Option<TaskPriority>,
2567        Option<
2568            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2569        >,
2570    ) {
2571        let mut task = ctx.task(task_id, TaskDataCategory::All);
2572        let Some(in_progress) = task.take_in_progress() else {
2573            panic!("Task execution completed, but task is not in progress: {task:#?}");
2574        };
2575        if matches!(in_progress, InProgressState::Canceled) {
2576            // Task was canceled in the meantime, so we don't finish it
2577            return (None, None);
2578        }
2579        let InProgressState::InProgress(box InProgressStateInner {
2580            done_event,
2581            once_task: is_once_task,
2582            stale,
2583            marked_as_completed: _,
2584            new_children,
2585        }) = in_progress
2586        else {
2587            panic!("Task execution completed, but task is not in progress: {task:#?}");
2588        };
2589        debug_assert!(new_children.is_empty());
2590
2591        // If the task is stale, reschedule it
2592        if stale && !is_once_task {
2593            let stale_priority = compute_stale_priority(&task);
2594            let old = task.set_in_progress(InProgressState::Scheduled {
2595                done_event,
2596                reason: TaskExecutionReason::Stale,
2597            });
2598            debug_assert!(old.is_none(), "InProgress already exists");
2599            return (Some(stale_priority), None);
2600        }
2601
2602        // Set the output if it has changed
2603        let mut old_content = None;
2604        if let Some(value) = new_output {
2605            old_content = task.set_output(value);
2606        }
2607
2608        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2609        // to be invalidated and we can mark it as immutable.
2610        if is_now_immutable {
2611            task.set_immutable(true);
2612        }
2613
2614        // Notify in progress cells and remove all of them
2615        let in_progress_cells = task.take_in_progress_cells();
2616        if let Some(ref cells) = in_progress_cells {
2617            for state in cells.values() {
2618                state.event.notify(usize::MAX);
2619            }
2620        }
2621
2622        // Compute and apply the new dirty state, propagating to aggregating ancestors
2623        let new_dirtyness = if is_session_dependent {
2624            Some(Dirtyness::SessionDependent)
2625        } else {
2626            None
2627        };
2628        #[cfg(feature = "verify_determinism")]
2629        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2630        let data_update = task.update_dirty_state(new_dirtyness);
2631
2632        // Under verify_determinism we re-run a task whose dirty state or output unexpectedly
2633        // changed during execution to confirm determinism. The leaf priority keeps these
2634        // verification reruns at the highest priority.
2635        #[cfg(feature = "verify_determinism")]
2636        let stale_priority: Option<TaskPriority> =
2637            ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2638                .then(TaskPriority::leaf);
2639        #[cfg(not(feature = "verify_determinism"))]
2640        let stale_priority: Option<TaskPriority> = None;
2641        if stale_priority.is_some() {
2642            let old = task.set_in_progress(InProgressState::Scheduled {
2643                done_event,
2644                reason: TaskExecutionReason::Stale,
2645            });
2646            debug_assert!(old.is_none(), "InProgress already exists");
2647            drop(task);
2648        } else {
2649            drop(task);
2650
2651            // Notify dependent tasks that are waiting for this task to finish
2652            done_event.notify(usize::MAX);
2653        }
2654
2655        drop(old_content);
2656
2657        if let Some(data_update) = data_update {
2658            AggregationUpdateQueue::run(data_update, ctx);
2659        }
2660
2661        // We return so the data can be dropped outside of critical sections
2662        (stale_priority, in_progress_cells)
2663    }
2664
2665    fn task_execution_completed_cleanup(
2666        &self,
2667        ctx: &mut impl ExecuteContext<'_>,
2668        task_id: TaskId,
2669        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2670        is_error: bool,
2671        is_recomputation: bool,
2672    ) -> Vec<SharedReference> {
2673        let mut task = ctx.task(task_id, TaskDataCategory::All);
2674        let mut removed_cell_data = Vec::new();
2675        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2676        // after an error as it is likely transient and we want to keep the dependent tasks
2677        // clean to avoid re-executions.
2678        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2679        // skips cell_type_max_index updates on error.
2680        if !is_error || is_recomputation {
2681            // Remove no longer existing cells and
2682            // find all outdated data items (removed cells, outdated edges)
2683            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2684            // anyway and we want to avoid needless re-executions. When the cells become
2685            // used again, they are invalidated from the update cell operation.
2686            let to_remove: Vec<_> = task
2687                .iter_cell_data()
2688                .filter_map(|(cell, _)| {
2689                    cell_counters
2690                        .get(&cell.type_id)
2691                        .is_none_or(|start_index| cell.index >= *start_index)
2692                        .then_some(*cell)
2693                })
2694                .collect();
2695            removed_cell_data.reserve_exact(to_remove.len());
2696            for cell in to_remove {
2697                if let Some(data) = task.remove_cell_data(&cell) {
2698                    removed_cell_data.push(data);
2699                }
2700            }
2701            // Remove cell_data_hash entries for cells that no longer exist
2702            let to_remove_hash: Vec<_> = task
2703                .iter_cell_data_hash()
2704                .filter_map(|(cell, _)| {
2705                    cell_counters
2706                        .get(&cell.type_id)
2707                        .is_none_or(|start_index| cell.index >= *start_index)
2708                        .then_some(*cell)
2709                })
2710                .collect();
2711            for cell in to_remove_hash {
2712                task.remove_cell_data_hash(&cell);
2713            }
2714        }
2715
2716        // Clean up task storage after execution:
2717        // - Shrink collections marked with shrink_on_completion
2718        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2719        task.cleanup_after_execution();
2720
2721        drop(task);
2722
2723        // Return so we can drop outside of critical sections
2724        removed_cell_data
2725    }
2726
2727    /// Prints the standard message emitted when the background persisting process stops due to an
2728    /// unrecoverable write error. The caller is responsible for returning from the background job.
2729    fn log_unrecoverable_persist_error() {
2730        eprintln!(
2731            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2732             background persisting process."
2733        );
2734    }
2735
2736    fn run_backend_job<'a>(
2737        self: &'a Arc<Self>,
2738        job: TurboTasksBackendJob,
2739        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2740    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2741        Box::pin(async move {
2742            match job {
2743                TurboTasksBackendJob::Snapshot => {
2744                    debug_assert!(self.should_persist());
2745
2746                    let mut last_snapshot = self.start_time;
2747                    let mut idle_start_listener = self.idle_start_event.listen();
2748                    let mut idle_end_listener = self.idle_end_event.listen();
2749                    // Whether to immediately set an idle timeout if possible.
2750                    // Set to false if we don't persist anything in a cycle.
2751                    let mut fresh_idle = true;
2752                    let mut evicted = false;
2753                    let mut is_first = true;
2754                    'outer: loop {
2755                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2756                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2757                        let idle_timeout = *IDLE_TIMEOUT;
2758                        let (time, mut reason) = if is_first {
2759                            (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2760                        } else {
2761                            (SNAPSHOT_INTERVAL, "regular snapshot interval")
2762                        };
2763
2764                        let until = last_snapshot + time;
2765                        if until > Instant::now() {
2766                            let mut stop_listener = self.stopping_event.listen();
2767                            if self.stopping.load(Ordering::Acquire) {
2768                                return;
2769                            }
2770                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2771                                Instant::now() + idle_timeout
2772                            } else {
2773                                far_future()
2774                            };
2775                            loop {
2776                                tokio::select! {
2777                                    _ = &mut stop_listener => {
2778                                        return;
2779                                    },
2780                                    _ = &mut idle_start_listener => {
2781                                        idle_time = Instant::now() + idle_timeout;
2782                                        idle_start_listener = self.idle_start_event.listen()
2783                                    },
2784                                    _ = &mut idle_end_listener => {
2785                                        idle_time = far_future();
2786                                        idle_end_listener = self.idle_end_event.listen()
2787                                    },
2788                                    _ = tokio::time::sleep_until(until) => {
2789                                        break;
2790                                    },
2791                                    _ = tokio::time::sleep_until(idle_time) => {
2792                                        if turbo_tasks.is_idle() {
2793                                            reason = "idle timeout";
2794                                            break;
2795                                        }
2796                                    },
2797                                }
2798                            }
2799                        }
2800
2801                        let this = self.clone();
2802                        // Create a root span shared by both the snapshot/persist
2803                        // work and the subsequent compaction so they appear
2804                        // grouped together in trace viewers.
2805                        let background_span =
2806                            tracing::info_span!(parent: None, "background snapshot");
2807                        match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2808                            Err(err) => {
2809                                // save_snapshot consumed persisted_task_cache_log entries;
2810                                // further snapshots would corrupt the task graph.
2811                                eprintln!("Persisting failed: {err:?}");
2812                                Self::log_unrecoverable_persist_error();
2813                                return;
2814                            }
2815                            Ok((snapshot_start, new_data)) => {
2816                                // if we see 'new_data' then the next idle transition is 'fresh'
2817                                fresh_idle = new_data;
2818                                is_first = false;
2819                                last_snapshot = snapshot_start;
2820
2821                                // Polls the idle-end event without blocking. Returns
2822                                // `true` and refreshes the listener if idle has ended,
2823                                // `false` if we are still idle.
2824                                macro_rules! check_idle_ended {
2825                                    () => {{
2826                                        tokio::select! {
2827                                            biased;
2828                                            _ = &mut idle_end_listener => {
2829                                                idle_end_listener = self.idle_end_event.listen();
2830                                                true
2831                                            },
2832                                            _ = std::future::ready(()) => false,
2833                                        }
2834                                    }};
2835                                }
2836                                // Evict persisted tasks from memory to reclaim space.
2837                                // Like compaction, this runs after snapshot_and_persist
2838                                // as a separate concern.
2839                                //
2840                                // TODO: improve eviction policy — current approach is a full sweep
2841                                // after every snapshot. Better strategies to consider:
2842                                //   - Memory pressure signals: only evict when RSS exceeds a
2843                                //     threshold rather than unconditionally.
2844                                //   - Recency data: track last-access time per task and evict
2845                                //     least-recently-used entries first rather than all at once.
2846                                //   - Eviction intensity: partial sweeps (evict a fraction of
2847                                //     eligible tasks per cycle) to reduce latency spikes.
2848                                // Evict when there is new data to persist (the common
2849                                // case) or on the very first snapshot after startup
2850                                // (data was already on disk from a prior run, so
2851                                // new_data may be false but in-memory state can still
2852                                // be evicted).
2853                                let mut ran_eviction = false;
2854                                if this.should_evict() && (new_data || !evicted) {
2855                                    if check_idle_ended!() {
2856                                        // need to start all the way over so we catch the next
2857                                        // signal
2858                                        continue 'outer;
2859                                    }
2860                                    evicted = true;
2861                                    ran_eviction = true;
2862                                    this.storage.evict_after_snapshot(background_span.id());
2863                                }
2864
2865                                // Compact while idle (up to limit), regardless of
2866                                // whether the snapshot had new data.
2867                                // `background_span` is not entered here because
2868                                // `EnteredSpan` is `!Send` and would prevent the
2869                                // future from being sent across threads when it
2870                                // suspends at the `select!` await below.
2871                                let mut ran_compaction = false;
2872                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2873                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2874                                    if check_idle_ended!() {
2875                                        continue 'outer;
2876                                    }
2877                                    // Enter the span only around the synchronous
2878                                    // compact() call so we never hold an
2879                                    // `EnteredSpan` across an await point.
2880                                    let _compact_span = tracing::info_span!(
2881                                        parent: background_span.id(),
2882                                        "compact database"
2883                                    )
2884                                    .entered();
2885                                    match self.backing_storage.compact() {
2886                                        Ok(true) => {
2887                                            ran_compaction = true;
2888                                        }
2889                                        Ok(false) => break,
2890                                        Err(err) => {
2891                                            eprintln!("Compaction failed: {err:?}");
2892                                            if self.backing_storage.has_unrecoverable_write_error()
2893                                            {
2894                                                Self::log_unrecoverable_persist_error();
2895                                                return;
2896                                            }
2897                                            break;
2898                                        }
2899                                    }
2900                                }
2901                                if check_idle_ended!() {
2902                                    continue 'outer;
2903                                }
2904                                // After running snapshotting/eviction/compaction we have churned a
2905                                // _lot_ of memory if we are still
2906                                // idle tell `mimalloc` that now would be a good time to release
2907                                // memory back to the OS
2908                                if new_data || ran_compaction || ran_eviction {
2909                                    turbo_tasks_malloc::TurboMalloc::collect(true);
2910                                }
2911                            }
2912                        }
2913                    }
2914                }
2915            }
2916        })
2917    }
2918
2919    fn try_read_own_task_cell(
2920        &self,
2921        task_id: TaskId,
2922        cell: CellId,
2923        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2924    ) -> Result<TypedCellContent> {
2925        let mut ctx = self.execute_context(turbo_tasks);
2926        let task = ctx.task(task_id, TaskDataCategory::Data);
2927        if let Some(content) = task.get_cell_data(&cell).cloned() {
2928            Ok(CellContent(Some(content)).into_typed(cell.type_id))
2929        } else {
2930            Ok(CellContent(None).into_typed(cell.type_id))
2931        }
2932    }
2933
2934    fn read_task_collectibles(
2935        &self,
2936        task_id: TaskId,
2937        collectible_type: TraitTypeId,
2938        reader_id: Option<TaskId>,
2939        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2940    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2941        let mut ctx = self.execute_context(turbo_tasks);
2942        let mut collectibles = AutoMap::default();
2943        {
2944            let mut task = ctx.task(task_id, TaskDataCategory::All);
2945            if task
2946                .get_persistent_task_type()
2947                .is_some_and(|t| !t.native_fn.is_root)
2948            {
2949                drop(task);
2950                panic!(
2951                    "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
2952                     is missing on the task.",
2953                    self.debug_get_task_description(task_id),
2954                    reader_id.map_or_else(
2955                        || "unknown".to_string(),
2956                        |r| self.debug_get_task_description(r)
2957                    )
2958                );
2959            }
2960            for (collectible, count) in task.iter_aggregated_collectibles() {
2961                if *count > 0 && collectible.collectible_type == collectible_type {
2962                    *collectibles
2963                        .entry(RawVc::TaskCell(
2964                            collectible.cell.task,
2965                            collectible.cell.cell,
2966                        ))
2967                        .or_insert(0) += 1;
2968                }
2969            }
2970            for (&collectible, &count) in task.iter_collectibles() {
2971                if collectible.collectible_type == collectible_type {
2972                    *collectibles
2973                        .entry(RawVc::TaskCell(
2974                            collectible.cell.task,
2975                            collectible.cell.cell,
2976                        ))
2977                        .or_insert(0) += count;
2978                }
2979            }
2980            if let Some(reader_id) = reader_id {
2981                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2982            }
2983        }
2984        if let Some(reader_id) = reader_id {
2985            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2986            let target = CollectiblesRef {
2987                task: task_id,
2988                collectible_type,
2989            };
2990            if !reader.remove_outdated_collectibles_dependencies(&target) {
2991                let _ = reader.add_collectibles_dependencies(target);
2992            }
2993        }
2994        collectibles
2995    }
2996
2997    fn emit_collectible(
2998        &self,
2999        collectible_type: TraitTypeId,
3000        collectible: RawVc,
3001        task_id: TaskId,
3002        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3003    ) {
3004        self.assert_valid_collectible(task_id, collectible);
3005
3006        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3007            panic!("Collectibles need to be resolved");
3008        };
3009        let cell = CellRef {
3010            task: collectible_task,
3011            cell,
3012        };
3013        operation::UpdateCollectibleOperation::run(
3014            task_id,
3015            CollectibleRef {
3016                collectible_type,
3017                cell,
3018            },
3019            1,
3020            self.execute_context(turbo_tasks),
3021        );
3022    }
3023
3024    fn unemit_collectible(
3025        &self,
3026        collectible_type: TraitTypeId,
3027        collectible: RawVc,
3028        count: u32,
3029        task_id: TaskId,
3030        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3031    ) {
3032        self.assert_valid_collectible(task_id, collectible);
3033
3034        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3035            panic!("Collectibles need to be resolved");
3036        };
3037        let cell = CellRef {
3038            task: collectible_task,
3039            cell,
3040        };
3041        operation::UpdateCollectibleOperation::run(
3042            task_id,
3043            CollectibleRef {
3044                collectible_type,
3045                cell,
3046            },
3047            -(i32::try_from(count).unwrap()),
3048            self.execute_context(turbo_tasks),
3049        );
3050    }
3051
3052    fn update_task_cell(
3053        &self,
3054        task_id: TaskId,
3055        cell: CellId,
3056        content: CellContent,
3057        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3058        content_hash: Option<CellHash>,
3059        verification_mode: VerificationMode,
3060        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3061    ) {
3062        operation::UpdateCellOperation::run(
3063            task_id,
3064            cell,
3065            content,
3066            updated_key_hashes,
3067            content_hash,
3068            verification_mode,
3069            self.execute_context(turbo_tasks),
3070        );
3071    }
3072
3073    fn mark_own_task_as_finished(
3074        &self,
3075        task: TaskId,
3076        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3077    ) {
3078        let mut ctx = self.execute_context(turbo_tasks);
3079        let mut task = ctx.task(task, TaskDataCategory::Data);
3080        if let Some(InProgressState::InProgress(box InProgressStateInner {
3081            marked_as_completed,
3082            ..
3083        })) = task.get_in_progress_mut()
3084        {
3085            *marked_as_completed = true;
3086            // TODO this should remove the dirty state (also check session_dependent)
3087            // but this would break some assumptions for strongly consistent reads.
3088            // Client tasks are not connected yet, so we wouldn't wait for them.
3089            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3090        }
3091    }
3092
3093    fn connect_task(
3094        &self,
3095        task: TaskId,
3096        parent_task: Option<TaskId>,
3097        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3098    ) {
3099        self.assert_not_persistent_calling_transient(parent_task, task, None);
3100        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3101    }
3102
3103    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3104        let task_id = self.transient_task_id_factory.get();
3105        {
3106            let mut task = self.storage.access_mut(task_id);
3107            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3108        }
3109        #[cfg(feature = "verify_aggregation_graph")]
3110        self.root_tasks.lock().insert(task_id);
3111        task_id
3112    }
3113
3114    fn dispose_root_task(
3115        &self,
3116        task_id: TaskId,
3117        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3118    ) {
3119        #[cfg(feature = "verify_aggregation_graph")]
3120        self.root_tasks.lock().remove(&task_id);
3121
3122        let mut ctx = self.execute_context(turbo_tasks);
3123        let mut task = ctx.task(task_id, TaskDataCategory::All);
3124        let is_dirty = task.is_dirty();
3125        let has_dirty_containers = task.has_dirty_containers();
3126        if is_dirty.is_some() || has_dirty_containers {
3127            if let Some(activeness_state) = task.get_activeness_mut() {
3128                // We will finish the task, but it would be removed after the task is done
3129                activeness_state.unset_root_type();
3130                activeness_state.set_active_until_clean();
3131            };
3132        } else if let Some(activeness_state) = task.take_activeness() {
3133            // Technically nobody should be listening to this event, but just in case
3134            // we notify it anyway
3135            activeness_state.all_clean_event.notify(usize::MAX);
3136        }
3137    }
3138
3139    #[cfg(feature = "verify_aggregation_graph")]
3140    fn verify_aggregation_graph(
3141        &self,
3142        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3143        idle: bool,
3144    ) {
3145        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3146            return;
3147        }
3148        use std::{collections::VecDeque, env, io::stdout};
3149
3150        use crate::backend::operation::{get_uppers, is_aggregating_node};
3151
3152        let mut ctx = self.execute_context(turbo_tasks);
3153        let root_tasks = self.root_tasks.lock().clone();
3154
3155        for task_id in root_tasks.into_iter() {
3156            let mut queue = VecDeque::new();
3157            let mut visited = FxHashSet::default();
3158            let mut aggregated_nodes = FxHashSet::default();
3159            let mut collectibles = FxHashMap::default();
3160            let root_task_id = task_id;
3161            visited.insert(task_id);
3162            aggregated_nodes.insert(task_id);
3163            queue.push_back(task_id);
3164            let mut counter = 0;
3165            while let Some(task_id) = queue.pop_front() {
3166                counter += 1;
3167                if counter % 100000 == 0 {
3168                    println!(
3169                        "queue={}, visited={}, aggregated_nodes={}",
3170                        queue.len(),
3171                        visited.len(),
3172                        aggregated_nodes.len()
3173                    );
3174                }
3175                let task = ctx.task(task_id, TaskDataCategory::All);
3176                if idle && !self.is_idle.load(Ordering::Relaxed) {
3177                    return;
3178                }
3179
3180                let uppers = get_uppers(&task);
3181                if task_id != root_task_id
3182                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3183                {
3184                    panic!(
3185                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3186                         {:?})",
3187                        task_id,
3188                        task.get_task_description(),
3189                        uppers
3190                    );
3191                }
3192
3193                for (collectible, _) in task.iter_aggregated_collectibles() {
3194                    collectibles
3195                        .entry(*collectible)
3196                        .or_insert_with(|| (false, Vec::new()))
3197                        .1
3198                        .push(task_id);
3199                }
3200
3201                for (&collectible, &value) in task.iter_collectibles() {
3202                    if value > 0 {
3203                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3204                            *flag = true
3205                        } else {
3206                            panic!(
3207                                "Task {} has a collectible {:?} that is not in any upper task",
3208                                task_id, collectible
3209                            );
3210                        }
3211                    }
3212                }
3213
3214                let is_dirty = task.has_dirty();
3215                let has_dirty_container = task.has_dirty_containers();
3216                let should_be_in_upper = is_dirty || has_dirty_container;
3217
3218                let aggregation_number = get_aggregation_number(&task);
3219                if is_aggregating_node(aggregation_number) {
3220                    aggregated_nodes.insert(task_id);
3221                }
3222                // println!(
3223                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3224                //     ctx.get_task_description(task_id),
3225                //     uppers
3226                // );
3227
3228                for child_id in task.iter_children() {
3229                    // println!("{task_id}: child -> {child_id}");
3230                    if visited.insert(child_id) {
3231                        queue.push_back(child_id);
3232                    }
3233                }
3234                drop(task);
3235
3236                if should_be_in_upper {
3237                    for upper_id in uppers {
3238                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3239                        let in_upper = upper
3240                            .get_aggregated_dirty_containers(&task_id)
3241                            .is_some_and(|&dirty| dirty > 0);
3242                        if !in_upper {
3243                            let containers: Vec<_> = upper
3244                                .iter_aggregated_dirty_containers()
3245                                .map(|(&k, &v)| (k, v))
3246                                .collect();
3247                            let upper_task_desc = upper.get_task_description();
3248                            drop(upper);
3249                            panic!(
3250                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3251                                 ({})\nThese dirty containers are present:\n{:#?}",
3252                                task_id,
3253                                ctx.task(task_id, TaskDataCategory::Data)
3254                                    .get_task_description(),
3255                                upper_id,
3256                                upper_task_desc,
3257                                containers,
3258                            );
3259                        }
3260                    }
3261                }
3262            }
3263
3264            for (collectible, (flag, task_ids)) in collectibles {
3265                if !flag {
3266                    use std::io::Write;
3267                    let mut stdout = stdout().lock();
3268                    writeln!(
3269                        stdout,
3270                        "{:?} that is not emitted in any child task but in these aggregated \
3271                         tasks: {:#?}",
3272                        collectible,
3273                        task_ids
3274                            .iter()
3275                            .map(|t| format!(
3276                                "{t} {}",
3277                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3278                            ))
3279                            .collect::<Vec<_>>()
3280                    )
3281                    .unwrap();
3282
3283                    let task_id = collectible.cell.task;
3284                    let mut queue = {
3285                        let task = ctx.task(task_id, TaskDataCategory::All);
3286                        get_uppers(&task)
3287                    };
3288                    let mut visited = FxHashSet::default();
3289                    for &upper_id in queue.iter() {
3290                        visited.insert(upper_id);
3291                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3292                    }
3293                    while let Some(task_id) = queue.pop() {
3294                        let task = ctx.task(task_id, TaskDataCategory::All);
3295                        let desc = task.get_task_description();
3296                        let aggregated_collectible = task
3297                            .get_aggregated_collectibles(&collectible)
3298                            .copied()
3299                            .unwrap_or_default();
3300                        let uppers = get_uppers(&task);
3301                        drop(task);
3302                        writeln!(
3303                            stdout,
3304                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3305                        )
3306                        .unwrap();
3307                        if task_ids.contains(&task_id) {
3308                            writeln!(
3309                                stdout,
3310                                "Task has an upper connection to an aggregated task that doesn't \
3311                                 reference it. Upper connection is invalid!"
3312                            )
3313                            .unwrap();
3314                        }
3315                        for upper_id in uppers {
3316                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3317                            if !visited.contains(&upper_id) {
3318                                queue.push(upper_id);
3319                            }
3320                        }
3321                    }
3322                    panic!("See stdout for more details");
3323                }
3324            }
3325        }
3326    }
3327
3328    fn assert_not_persistent_calling_transient(
3329        &self,
3330        parent_id: Option<TaskId>,
3331        child_id: TaskId,
3332        cell_id: Option<CellId>,
3333    ) {
3334        if let Some(parent_id) = parent_id
3335            && !parent_id.is_transient()
3336            && child_id.is_transient()
3337        {
3338            self.panic_persistent_calling_transient(
3339                self.debug_get_task_description(parent_id),
3340                self.debug_get_cached_task_type(child_id).as_deref(),
3341                cell_id,
3342            );
3343        }
3344    }
3345
3346    fn panic_persistent_calling_transient(
3347        &self,
3348        parent: String,
3349        child: Option<&CachedTaskType>,
3350        cell_id: Option<CellId>,
3351    ) -> ! {
3352        let transient_reason = if let Some(child) = child {
3353            Cow::Owned(format!(
3354                " The callee is transient because it depends on:\n{}",
3355                self.debug_trace_transient_task(child, cell_id),
3356            ))
3357        } else {
3358            Cow::Borrowed("")
3359        };
3360        panic!(
3361            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3362            parent,
3363            child.map_or("unknown", |t| t.get_name()),
3364            transient_reason,
3365        );
3366    }
3367
3368    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3369        // these checks occur in a potentially hot codepath, but they're cheap
3370        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3371            // This should never happen: The collectible APIs use ResolvedVc
3372            let task_info = if let Some(col_task_ty) = collectible
3373                .try_get_task_id()
3374                .map(|t| self.debug_get_task_description(t))
3375            {
3376                Cow::Owned(format!(" (return type of {col_task_ty})"))
3377            } else {
3378                Cow::Borrowed("")
3379            };
3380            panic!("Collectible{task_info} must be a ResolvedVc")
3381        };
3382        if col_task_id.is_transient() && !task_id.is_transient() {
3383            let transient_reason =
3384                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3385                    Cow::Owned(format!(
3386                        ". The collectible is transient because it depends on:\n{}",
3387                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3388                    ))
3389                } else {
3390                    Cow::Borrowed("")
3391                };
3392            // this should never happen: How would a persistent function get a transient Vc?
3393            panic!(
3394                "Collectible is transient, transient collectibles cannot be emitted from \
3395                 persistent tasks{transient_reason}",
3396            )
3397        }
3398    }
3399}
3400
3401impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3402    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3403        self.0.startup(turbo_tasks);
3404    }
3405
3406    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3407        self.0.stopping();
3408    }
3409
3410    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3411        self.0.stop(turbo_tasks);
3412    }
3413
3414    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3415        self.0.idle_start(turbo_tasks);
3416    }
3417
3418    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3419        self.0.idle_end();
3420    }
3421
3422    fn get_or_create_task(
3423        &self,
3424        native_fn: &'static NativeFunction,
3425        this: Option<RawVc>,
3426        arg: &mut dyn StackDynTaskInputs,
3427        parent_task: Option<TaskId>,
3428        persistence: TaskPersistence,
3429        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3430    ) -> TaskId {
3431        self.0
3432            .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3433    }
3434
3435    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3436        self.0.invalidate_task(task_id, turbo_tasks);
3437    }
3438
3439    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3440        self.0.invalidate_tasks(tasks, turbo_tasks);
3441    }
3442
3443    fn invalidate_tasks_set(
3444        &self,
3445        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3446        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3447    ) {
3448        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3449    }
3450
3451    fn invalidate_serialization(
3452        &self,
3453        task_id: TaskId,
3454        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3455    ) {
3456        self.0.invalidate_serialization(task_id, turbo_tasks);
3457    }
3458
3459    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3460        self.0.task_execution_canceled(task, turbo_tasks)
3461    }
3462
3463    fn try_start_task_execution(
3464        &self,
3465        task_id: TaskId,
3466        priority: TaskPriority,
3467        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3468    ) -> Option<TaskExecutionSpec<'_>> {
3469        self.0
3470            .try_start_task_execution(task_id, priority, turbo_tasks)
3471    }
3472
3473    fn task_execution_completed(
3474        &self,
3475        task_id: TaskId,
3476        result: Result<RawVc, TurboTasksExecutionError>,
3477        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3478        #[cfg(feature = "verify_determinism")] stateful: bool,
3479        has_invalidator: bool,
3480        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3481    ) -> Option<TaskPriority> {
3482        self.0.task_execution_completed(
3483            task_id,
3484            result,
3485            cell_counters,
3486            #[cfg(feature = "verify_determinism")]
3487            stateful,
3488            has_invalidator,
3489            turbo_tasks,
3490        )
3491    }
3492
3493    type BackendJob = TurboTasksBackendJob;
3494
3495    fn run_backend_job<'a>(
3496        &'a self,
3497        job: Self::BackendJob,
3498        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3499    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3500        self.0.run_backend_job(job, turbo_tasks)
3501    }
3502
3503    fn try_read_task_output(
3504        &self,
3505        task_id: TaskId,
3506        reader: Option<TaskId>,
3507        options: ReadOutputOptions,
3508        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3509    ) -> Result<Result<RawVc, EventListener>> {
3510        self.0
3511            .try_read_task_output(task_id, reader, options, turbo_tasks)
3512    }
3513
3514    fn try_read_task_cell(
3515        &self,
3516        task_id: TaskId,
3517        cell: CellId,
3518        reader: Option<TaskId>,
3519        options: ReadCellOptions,
3520        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3521    ) -> Result<Result<TypedCellContent, EventListener>> {
3522        self.0
3523            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3524    }
3525
3526    fn try_read_own_task_cell(
3527        &self,
3528        task_id: TaskId,
3529        cell: CellId,
3530        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3531    ) -> Result<TypedCellContent> {
3532        self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3533    }
3534
3535    fn read_task_collectibles(
3536        &self,
3537        task_id: TaskId,
3538        collectible_type: TraitTypeId,
3539        reader: Option<TaskId>,
3540        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3541    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3542        self.0
3543            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3544    }
3545
3546    fn emit_collectible(
3547        &self,
3548        collectible_type: TraitTypeId,
3549        collectible: RawVc,
3550        task_id: TaskId,
3551        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3552    ) {
3553        self.0
3554            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3555    }
3556
3557    fn unemit_collectible(
3558        &self,
3559        collectible_type: TraitTypeId,
3560        collectible: RawVc,
3561        count: u32,
3562        task_id: TaskId,
3563        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3564    ) {
3565        self.0
3566            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3567    }
3568
3569    fn update_task_cell(
3570        &self,
3571        task_id: TaskId,
3572        cell: CellId,
3573        content: CellContent,
3574        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3575        content_hash: Option<CellHash>,
3576        verification_mode: VerificationMode,
3577        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3578    ) {
3579        self.0.update_task_cell(
3580            task_id,
3581            cell,
3582            content,
3583            updated_key_hashes,
3584            content_hash,
3585            verification_mode,
3586            turbo_tasks,
3587        );
3588    }
3589
3590    fn mark_own_task_as_finished(
3591        &self,
3592        task_id: TaskId,
3593        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3594    ) {
3595        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3596    }
3597
3598    fn connect_task(
3599        &self,
3600        task: TaskId,
3601        parent_task: Option<TaskId>,
3602        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3603    ) {
3604        self.0.connect_task(task, parent_task, turbo_tasks);
3605    }
3606
3607    fn create_transient_task(
3608        &self,
3609        task_type: TransientTaskType,
3610        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3611    ) -> TaskId {
3612        self.0.create_transient_task(task_type)
3613    }
3614
3615    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3616        self.0.dispose_root_task(task_id, turbo_tasks);
3617    }
3618
3619    fn task_statistics(&self) -> &TaskStatisticsApi {
3620        &self.0.task_statistics
3621    }
3622
3623    fn is_tracking_dependencies(&self) -> bool {
3624        self.0.options.dependency_tracking
3625    }
3626
3627    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3628        self.0.get_task_name(task, turbo_tasks)
3629    }
3630}
3631
3632enum DebugTraceTransientTask {
3633    Cached {
3634        task_name: &'static str,
3635        cell_type_id: Option<ValueTypeId>,
3636        cause_self: Option<Box<DebugTraceTransientTask>>,
3637        cause_args: Vec<DebugTraceTransientTask>,
3638    },
3639    /// This representation is used when this task is a duplicate of one previously shown
3640    Collapsed {
3641        task_name: &'static str,
3642        cell_type_id: Option<ValueTypeId>,
3643    },
3644    Uncached {
3645        cell_type_id: Option<ValueTypeId>,
3646    },
3647}
3648
3649impl DebugTraceTransientTask {
3650    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3651        let indent = "    ".repeat(level);
3652        f.write_str(&indent)?;
3653
3654        fn fmt_cell_type_id(
3655            f: &mut fmt::Formatter<'_>,
3656            cell_type_id: Option<ValueTypeId>,
3657        ) -> fmt::Result {
3658            if let Some(ty) = cell_type_id {
3659                write!(
3660                    f,
3661                    " (read cell of type {})",
3662                    get_value_type(ty).ty.global_name
3663                )
3664            } else {
3665                Ok(())
3666            }
3667        }
3668
3669        // write the name and type
3670        match self {
3671            Self::Cached {
3672                task_name,
3673                cell_type_id,
3674                ..
3675            }
3676            | Self::Collapsed {
3677                task_name,
3678                cell_type_id,
3679                ..
3680            } => {
3681                f.write_str(task_name)?;
3682                fmt_cell_type_id(f, *cell_type_id)?;
3683                if matches!(self, Self::Collapsed { .. }) {
3684                    f.write_str(" (collapsed)")?;
3685                }
3686            }
3687            Self::Uncached { cell_type_id } => {
3688                f.write_str("unknown transient task")?;
3689                fmt_cell_type_id(f, *cell_type_id)?;
3690            }
3691        }
3692        f.write_char('\n')?;
3693
3694        // write any extra "cause" information we might have
3695        if let Self::Cached {
3696            cause_self,
3697            cause_args,
3698            ..
3699        } = self
3700        {
3701            if let Some(c) = cause_self {
3702                writeln!(f, "{indent}  self:")?;
3703                c.fmt_indented(f, level + 1)?;
3704            }
3705            if !cause_args.is_empty() {
3706                writeln!(f, "{indent}  args:")?;
3707                for c in cause_args {
3708                    c.fmt_indented(f, level + 1)?;
3709                }
3710            }
3711        }
3712        Ok(())
3713    }
3714}
3715
3716impl fmt::Display for DebugTraceTransientTask {
3717    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3718        self.fmt_indented(f, 0)
3719    }
3720}
3721
3722// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3723fn far_future() -> Instant {
3724    // Roughly 30 years from now.
3725    // API does not provide a way to obtain max `Instant`
3726    // or convert specific date in the future to instant.
3727    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3728    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3729}
3730
3731/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3732/// buffer.
3733/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3734/// resulting buffer sizes.
3735///
3736/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3737/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3738/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3739/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3740/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3741/// spurious `Result` threading throughout the encode path.
3742fn encode_task_data(
3743    task: TaskId,
3744    data: &TaskStorage,
3745    category: SpecificTaskDataCategory,
3746    scratch_buffer: &mut TurboBincodeBuffer,
3747) -> Result<TurboBincodeBuffer> {
3748    scratch_buffer.clear();
3749    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3750    data.encode(category, &mut encoder)?;
3751
3752    if cfg!(feature = "verify_serialization") {
3753        TaskStorage::new()
3754            .decode(
3755                category,
3756                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3757            )
3758            .with_context(|| {
3759                format!(
3760                    "expected to be able to decode serialized data for '{category:?}' information \
3761                     for {task}"
3762                )
3763            })?;
3764    }
3765    Ok(SmallVec::from_slice(scratch_buffer))
3766}