Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod storage;
5pub mod storage_schema;
6
7use std::{
8    borrow::Cow,
9    fmt::{self, Write},
10    future::Future,
11    hash::BuildHasherDefault,
12    mem::take,
13    pin::Pin,
14    sync::{
15        Arc, LazyLock,
16        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17    },
18    time::SystemTime,
19};
20
21use anyhow::{Context, Result, bail};
22use auto_hash_map::{AutoMap, AutoSet};
23use indexmap::IndexSet;
24use parking_lot::{Condvar, Mutex};
25use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
26use smallvec::{SmallVec, smallvec};
27use tokio::time::{Duration, Instant};
28use tracing::{Span, trace_span};
29use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
30use turbo_tasks::{
31    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
32    ReadOutputOptions, ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT,
33    TaskExecutionReason, TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi,
34    TurboTasksPanic, ValueTypeId,
35    backend::{
36        Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
37        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
38        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
39        VerificationMode,
40    },
41    event::{Event, EventDescription, EventListener},
42    macro_helpers::NativeFunction,
43    message_queue::{TimingEvent, TraceEvent},
44    registry::get_value_type,
45    scope::scope_and_block,
46    task_statistics::TaskStatisticsApi,
47    trace::TraceRawVcs,
48    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
49};
50
51pub use self::{
52    operation::AnyOperation,
53    storage::{SpecificTaskDataCategory, TaskDataCategory},
54};
55#[cfg(feature = "trace_task_dirty")]
56use crate::backend::operation::TaskDirtyCause;
57use crate::{
58    backend::{
59        operation::{
60            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
61            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
62            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
63            connect_children, get_aggregation_number, get_uppers, is_root_node,
64            make_task_dirty_internal, prepare_new_children,
65        },
66        storage::Storage,
67        storage_schema::{TaskStorage, TaskStorageAccessors},
68    },
69    backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
70    data::{
71        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
72        InProgressState, InProgressStateInner, OutputValue, TransientTask,
73    },
74    error::TaskError,
75    utils::{
76        dash_map_drop_contents::drop_contents,
77        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
78        ptr_eq_arc::PtrEqArc,
79        shard_amount::compute_shard_amount,
80    },
81};
82
83/// Threshold for parallelizing making dependent tasks dirty.
84/// If the number of dependent tasks exceeds this threshold,
85/// the operation will be parallelized.
86const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
87
88const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
89
90/// Configurable idle timeout for snapshot persistence.
91/// Defaults to 2 seconds if not set or if the value is invalid.
92static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
93    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
94        .ok()
95        .and_then(|v| v.parse::<u64>().ok())
96        .map(Duration::from_millis)
97        .unwrap_or(Duration::from_secs(2))
98});
99
100struct SnapshotRequest {
101    snapshot_requested: bool,
102    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
103}
104
105impl SnapshotRequest {
106    fn new() -> Self {
107        Self {
108            snapshot_requested: false,
109            suspended_operations: FxHashSet::default(),
110        }
111    }
112}
113
114pub enum StorageMode {
115    /// Queries the storage for cache entries that don't exist locally.
116    ReadOnly,
117    /// Queries the storage for cache entries that don't exist locally.
118    /// Regularly pushes changes to the backing storage.
119    ReadWrite,
120    /// Queries the storage for cache entries that don't exist locally.
121    /// On shutdown, pushes all changes to the backing storage.
122    ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126    /// Enables dependency tracking.
127    ///
128    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
129    /// forever.
130    pub dependency_tracking: bool,
131
132    /// Enables active tracking.
133    ///
134    /// Automatically disabled when `dependency_tracking` is disabled.
135    ///
136    /// When disabled: All tasks are considered as active.
137    pub active_tracking: bool,
138
139    /// Enables the backing storage.
140    pub storage_mode: Option<StorageMode>,
141
142    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
143    /// datastructures. If `None`, it will use the available parallelism.
144    pub num_workers: Option<usize>,
145
146    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
147    pub small_preallocation: bool,
148}
149
150impl Default for BackendOptions {
151    fn default() -> Self {
152        Self {
153            dependency_tracking: true,
154            active_tracking: true,
155            storage_mode: Some(StorageMode::ReadWrite),
156            num_workers: None,
157            small_preallocation: false,
158        }
159    }
160}
161
162pub enum TurboTasksBackendJob {
163    InitialSnapshot,
164    FollowUpSnapshot,
165}
166
167pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
168
169struct TurboTasksBackendInner<B: BackingStorage> {
170    options: BackendOptions,
171
172    start_time: Instant,
173
174    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
175    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
176
177    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
178
179    storage: Storage,
180
181    /// Number of executing operations + Highest bit is set when snapshot is
182    /// requested. When that bit is set, operations should pause until the
183    /// snapshot is completed. When the bit is set and in progress counter
184    /// reaches zero, `operations_completed_when_snapshot_requested` is
185    /// triggered.
186    in_progress_operations: AtomicUsize,
187
188    snapshot_request: Mutex<SnapshotRequest>,
189    /// Condition Variable that is triggered when `in_progress_operations`
190    /// reaches zero while snapshot is requested. All operations are either
191    /// completed or suspended.
192    operations_suspended: Condvar,
193    /// Condition Variable that is triggered when a snapshot is completed and
194    /// operations can continue.
195    snapshot_completed: Condvar,
196    /// The timestamp of the last started snapshot since [`Self::start_time`].
197    last_snapshot: AtomicU64,
198
199    stopping: AtomicBool,
200    stopping_event: Event,
201    idle_start_event: Event,
202    idle_end_event: Event,
203    #[cfg(feature = "verify_aggregation_graph")]
204    is_idle: AtomicBool,
205
206    task_statistics: TaskStatisticsApi,
207
208    backing_storage: B,
209
210    #[cfg(feature = "verify_aggregation_graph")]
211    root_tasks: Mutex<FxHashSet<TaskId>>,
212}
213
214impl<B: BackingStorage> TurboTasksBackend<B> {
215    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
216        Self(Arc::new(TurboTasksBackendInner::new(
217            options,
218            backing_storage,
219        )))
220    }
221
222    pub fn backing_storage(&self) -> &B {
223        &self.0.backing_storage
224    }
225}
226
227impl<B: BackingStorage> TurboTasksBackendInner<B> {
228    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
229        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
230        if !options.dependency_tracking {
231            options.active_tracking = false;
232        }
233        let small_preallocation = options.small_preallocation;
234        let next_task_id = backing_storage
235            .next_free_task_id()
236            .expect("Failed to get task id");
237        Self {
238            options,
239            start_time: Instant::now(),
240            persisted_task_id_factory: IdFactoryWithReuse::new(
241                next_task_id,
242                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
243            ),
244            transient_task_id_factory: IdFactoryWithReuse::new(
245                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
246                TaskId::MAX,
247            ),
248            task_cache: FxDashMap::default(),
249            storage: Storage::new(shard_amount, small_preallocation),
250            in_progress_operations: AtomicUsize::new(0),
251            snapshot_request: Mutex::new(SnapshotRequest::new()),
252            operations_suspended: Condvar::new(),
253            snapshot_completed: Condvar::new(),
254            last_snapshot: AtomicU64::new(0),
255            stopping: AtomicBool::new(false),
256            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
257            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
258            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
259            #[cfg(feature = "verify_aggregation_graph")]
260            is_idle: AtomicBool::new(false),
261            task_statistics: TaskStatisticsApi::default(),
262            backing_storage,
263            #[cfg(feature = "verify_aggregation_graph")]
264            root_tasks: Default::default(),
265        }
266    }
267
268    fn execute_context<'a>(
269        &'a self,
270        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
271    ) -> impl ExecuteContext<'a> {
272        ExecuteContextImpl::new(self, turbo_tasks)
273    }
274
275    fn suspending_requested(&self) -> bool {
276        self.should_persist()
277            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
278    }
279
280    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
281        #[cold]
282        fn operation_suspend_point_cold<B: BackingStorage>(
283            this: &TurboTasksBackendInner<B>,
284            suspend: impl FnOnce() -> AnyOperation,
285        ) {
286            let operation = Arc::new(suspend());
287            let mut snapshot_request = this.snapshot_request.lock();
288            if snapshot_request.snapshot_requested {
289                snapshot_request
290                    .suspended_operations
291                    .insert(operation.clone().into());
292                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
293                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
294                if value == SNAPSHOT_REQUESTED_BIT {
295                    this.operations_suspended.notify_all();
296                }
297                this.snapshot_completed
298                    .wait_while(&mut snapshot_request, |snapshot_request| {
299                        snapshot_request.snapshot_requested
300                    });
301                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
302                snapshot_request
303                    .suspended_operations
304                    .remove(&operation.into());
305            }
306        }
307
308        if self.suspending_requested() {
309            operation_suspend_point_cold(self, suspend);
310        }
311    }
312
313    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
314        if !self.should_persist() {
315            return OperationGuard { backend: None };
316        }
317        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
318        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
319            let mut snapshot_request = self.snapshot_request.lock();
320            if snapshot_request.snapshot_requested {
321                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
322                if value == SNAPSHOT_REQUESTED_BIT {
323                    self.operations_suspended.notify_all();
324                }
325                self.snapshot_completed
326                    .wait_while(&mut snapshot_request, |snapshot_request| {
327                        snapshot_request.snapshot_requested
328                    });
329                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
330            }
331        }
332        OperationGuard {
333            backend: Some(self),
334        }
335    }
336
337    fn should_persist(&self) -> bool {
338        matches!(
339            self.options.storage_mode,
340            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
341        )
342    }
343
344    fn should_restore(&self) -> bool {
345        self.options.storage_mode.is_some()
346    }
347
348    fn should_track_dependencies(&self) -> bool {
349        self.options.dependency_tracking
350    }
351
352    fn should_track_activeness(&self) -> bool {
353        self.options.active_tracking
354    }
355
356    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
357        self.task_statistics
358            .map(|stats| stats.increment_cache_hit(native_fn));
359    }
360
361    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
362        self.task_statistics
363            .map(|stats| stats.increment_cache_miss(native_fn));
364    }
365
366    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
367    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
368    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
369    /// references for lazy name resolution.
370    fn task_error_to_turbo_tasks_execution_error(
371        &self,
372        error: &TaskError,
373        ctx: &mut impl ExecuteContext<'_>,
374    ) -> TurboTasksExecutionError {
375        match error {
376            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
377            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
378                message: item.message.clone(),
379                source: item
380                    .source
381                    .as_ref()
382                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
383            })),
384            TaskError::LocalTaskContext(local_task_context) => {
385                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
386                    name: local_task_context.name.clone(),
387                    source: local_task_context
388                        .source
389                        .as_ref()
390                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
391                }))
392            }
393            TaskError::TaskChain(chain) => {
394                let task_id = chain.last().unwrap();
395                let error = {
396                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
397                    if let Some(OutputValue::Error(error)) = task.get_output() {
398                        Some(error.clone())
399                    } else {
400                        None
401                    }
402                };
403                let error = error.map_or_else(
404                    || {
405                        // Eventual consistency will cause errors to no longer be available
406                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
407                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
408                                "Error no longer available",
409                            )),
410                            location: None,
411                        }))
412                    },
413                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
414                );
415                let mut current_error = error;
416                for &task_id in chain.iter().rev() {
417                    current_error =
418                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
419                            task_id,
420                            source: Some(current_error),
421                            turbo_tasks: ctx.turbo_tasks(),
422                        }));
423                }
424                current_error
425            }
426        }
427    }
428}
429
430pub(crate) struct OperationGuard<'a, B: BackingStorage> {
431    backend: Option<&'a TurboTasksBackendInner<B>>,
432}
433
434impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
435    fn drop(&mut self) {
436        if let Some(backend) = self.backend {
437            let fetch_sub = backend
438                .in_progress_operations
439                .fetch_sub(1, Ordering::AcqRel);
440            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
441                backend.operations_suspended.notify_all();
442            }
443        }
444    }
445}
446
447/// Intermediate result of step 1 of task execution completion.
448struct TaskExecutionCompletePrepareResult {
449    pub new_children: FxHashSet<TaskId>,
450    pub is_now_immutable: bool,
451    #[cfg(feature = "verify_determinism")]
452    pub no_output_set: bool,
453    pub new_output: Option<OutputValue>,
454    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
455    pub is_recomputation: bool,
456}
457
458// Operations
459impl<B: BackingStorage> TurboTasksBackendInner<B> {
460    fn connect_child(
461        &self,
462        parent_task: Option<TaskId>,
463        child_task: TaskId,
464        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
465    ) {
466        operation::ConnectChildOperation::run(
467            parent_task,
468            child_task,
469            self.execute_context(turbo_tasks),
470        );
471    }
472
473    fn try_read_task_output(
474        self: &Arc<Self>,
475        task_id: TaskId,
476        reader: Option<TaskId>,
477        options: ReadOutputOptions,
478        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
479    ) -> Result<Result<RawVc, EventListener>> {
480        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
481
482        let mut ctx = self.execute_context(turbo_tasks);
483        let need_reader_task = if self.should_track_dependencies()
484            && !matches!(options.tracking, ReadTracking::Untracked)
485            && reader.is_some_and(|reader_id| reader_id != task_id)
486            && let Some(reader_id) = reader
487            && reader_id != task_id
488        {
489            Some(reader_id)
490        } else {
491            None
492        };
493        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
494            // Having a task_pair here is not optimal, but otherwise this would lead to a race
495            // condition. See below.
496            // TODO(sokra): solve that in a more performant way.
497            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
498            (task, Some(reader))
499        } else {
500            (ctx.task(task_id, TaskDataCategory::All), None)
501        };
502
503        fn listen_to_done_event(
504            reader_description: Option<EventDescription>,
505            tracking: ReadTracking,
506            done_event: &Event,
507        ) -> EventListener {
508            done_event.listen_with_note(move || {
509                move || {
510                    if let Some(reader_description) = reader_description.as_ref() {
511                        format!(
512                            "try_read_task_output from {} ({})",
513                            reader_description, tracking
514                        )
515                    } else {
516                        format!("try_read_task_output ({})", tracking)
517                    }
518                }
519            })
520        }
521
522        fn check_in_progress(
523            task: &impl TaskGuard,
524            reader_description: Option<EventDescription>,
525            tracking: ReadTracking,
526        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
527        {
528            match task.get_in_progress() {
529                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
530                    listen_to_done_event(reader_description, tracking, done_event),
531                ))),
532                Some(InProgressState::InProgress(box InProgressStateInner {
533                    done_event, ..
534                })) => Some(Ok(Err(listen_to_done_event(
535                    reader_description,
536                    tracking,
537                    done_event,
538                )))),
539                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
540                    "{} was canceled",
541                    task.get_task_description()
542                ))),
543                None => None,
544            }
545        }
546
547        if matches!(options.consistency, ReadConsistency::Strong) {
548            // Ensure it's an root node
549            loop {
550                let aggregation_number = get_aggregation_number(&task);
551                if is_root_node(aggregation_number) {
552                    break;
553                }
554                drop(task);
555                drop(reader_task);
556                {
557                    let _span = tracing::trace_span!(
558                        "make root node for strongly consistent read",
559                        task = self.debug_get_task_description(task_id)
560                    )
561                    .entered();
562                    AggregationUpdateQueue::run(
563                        AggregationUpdateJob::UpdateAggregationNumber {
564                            task_id,
565                            base_aggregation_number: u32::MAX,
566                            distance: None,
567                        },
568                        &mut ctx,
569                    );
570                }
571                (task, reader_task) = if let Some(reader_id) = need_reader_task {
572                    // TODO(sokra): see comment above
573                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
574                    (task, Some(reader))
575                } else {
576                    (ctx.task(task_id, TaskDataCategory::All), None)
577                }
578            }
579
580            let is_dirty = task.is_dirty();
581
582            // Check the dirty count of the root node
583            let has_dirty_containers = task.has_dirty_containers();
584            if has_dirty_containers || is_dirty.is_some() {
585                let activeness = task.get_activeness_mut();
586                let mut task_ids_to_schedule: Vec<_> = Vec::new();
587                // When there are dirty task, subscribe to the all_clean_event
588                let activeness = if let Some(activeness) = activeness {
589                    // This makes sure all tasks stay active and this task won't stale.
590                    // active_until_clean is automatically removed when this
591                    // task is clean.
592                    activeness.set_active_until_clean();
593                    activeness
594                } else {
595                    // If we don't have a root state, add one. This also makes sure all tasks stay
596                    // active and this task won't stale. active_until_clean
597                    // is automatically removed when this task is clean.
598                    if ctx.should_track_activeness() {
599                        // A newly added Activeness need to make sure to schedule the tasks
600                        task_ids_to_schedule = task.dirty_containers().collect();
601                        task_ids_to_schedule.push(task_id);
602                    }
603                    let activeness =
604                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
605                    activeness.set_active_until_clean();
606                    activeness
607                };
608                let listener = activeness.all_clean_event.listen_with_note(move || {
609                    let this = self.clone();
610                    let tt = turbo_tasks.pin();
611                    move || {
612                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
613                        let mut ctx = this.execute_context(tt);
614                        let mut visited = FxHashSet::default();
615                        fn indent(s: &str) -> String {
616                            s.split_inclusive('\n')
617                                .flat_map(|line: &str| ["  ", line].into_iter())
618                                .collect::<String>()
619                        }
620                        fn get_info(
621                            ctx: &mut impl ExecuteContext<'_>,
622                            task_id: TaskId,
623                            parent_and_count: Option<(TaskId, i32)>,
624                            visited: &mut FxHashSet<TaskId>,
625                        ) -> String {
626                            let task = ctx.task(task_id, TaskDataCategory::All);
627                            let is_dirty = task.is_dirty();
628                            let in_progress =
629                                task.get_in_progress()
630                                    .map_or("not in progress", |p| match p {
631                                        InProgressState::InProgress(_) => "in progress",
632                                        InProgressState::Scheduled { .. } => "scheduled",
633                                        InProgressState::Canceled => "canceled",
634                                    });
635                            let activeness = task.get_activeness().map_or_else(
636                                || "not active".to_string(),
637                                |activeness| format!("{activeness:?}"),
638                            );
639                            let aggregation_number = get_aggregation_number(&task);
640                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
641                            {
642                                let uppers = get_uppers(&task);
643                                !uppers.contains(&parent_task_id)
644                            } else {
645                                false
646                            };
647
648                            // Check the dirty count of the root node
649                            let has_dirty_containers = task.has_dirty_containers();
650
651                            let task_description = task.get_task_description();
652                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
653                                format!(", dirty({parent_priority})")
654                            } else {
655                                String::new()
656                            };
657                            let has_dirty_containers_label = if has_dirty_containers {
658                                ", dirty containers"
659                            } else {
660                                ""
661                            };
662                            let count = if let Some((_, count)) = parent_and_count {
663                                format!(" {count}")
664                            } else {
665                                String::new()
666                            };
667                            let mut info = format!(
668                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
669                                 {in_progress}, \
670                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
671                            );
672                            let children: Vec<_> = task.dirty_containers_with_count().collect();
673                            drop(task);
674
675                            if missing_upper {
676                                info.push_str("\n  ERROR: missing upper connection");
677                            }
678
679                            if has_dirty_containers || !children.is_empty() {
680                                writeln!(info, "\n  dirty tasks:").unwrap();
681
682                                for (child_task_id, count) in children {
683                                    let task_description = ctx
684                                        .task(child_task_id, TaskDataCategory::Data)
685                                        .get_task_description();
686                                    if visited.insert(child_task_id) {
687                                        let child_info = get_info(
688                                            ctx,
689                                            child_task_id,
690                                            Some((task_id, count)),
691                                            visited,
692                                        );
693                                        info.push_str(&indent(&child_info));
694                                        if !info.ends_with('\n') {
695                                            info.push('\n');
696                                        }
697                                    } else {
698                                        writeln!(
699                                            info,
700                                            "  {child_task_id} {task_description} {count} \
701                                             (already visited)"
702                                        )
703                                        .unwrap();
704                                    }
705                                }
706                            }
707                            info
708                        }
709                        let info = get_info(&mut ctx, task_id, None, &mut visited);
710                        format!(
711                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
712                        )
713                    }
714                });
715                drop(reader_task);
716                drop(task);
717                if !task_ids_to_schedule.is_empty() {
718                    let mut queue = AggregationUpdateQueue::new();
719                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
720                    queue.execute(&mut ctx);
721                }
722
723                return Ok(Err(listener));
724            }
725        }
726
727        let reader_description = reader_task
728            .as_ref()
729            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
730        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
731        {
732            return value;
733        }
734
735        if let Some(output) = task.get_output() {
736            let result = match output {
737                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
738                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
739                OutputValue::Error(error) => Err(error.clone()),
740            };
741            if let Some(mut reader_task) = reader_task.take()
742                && options.tracking.should_track(result.is_err())
743                && (!task.immutable() || cfg!(feature = "verify_immutable"))
744            {
745                #[cfg(feature = "trace_task_output_dependencies")]
746                let _span = tracing::trace_span!(
747                    "add output dependency",
748                    task = %task_id,
749                    dependent_task = ?reader
750                )
751                .entered();
752                let mut queue = LeafDistanceUpdateQueue::new();
753                let reader = reader.unwrap();
754                if task.add_output_dependent(reader) {
755                    // Ensure that dependent leaf distance is strictly monotonic increasing
756                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
757                    let reader_leaf_distance =
758                        reader_task.get_leaf_distance().copied().unwrap_or_default();
759                    if reader_leaf_distance.distance <= leaf_distance.distance {
760                        queue.push(
761                            reader,
762                            leaf_distance.distance,
763                            leaf_distance.max_distance_in_buffer,
764                        );
765                    }
766                }
767
768                drop(task);
769
770                // Note: We use `task_pair` earlier to lock the task and its reader at the same
771                // time. If we didn't and just locked the reader here, an invalidation could occur
772                // between grabbing the locks. If that happened, and if the task is "outdated" or
773                // doesn't have the dependency edge yet, the invalidation would be lost.
774
775                if !reader_task.remove_outdated_output_dependencies(&task_id) {
776                    let _ = reader_task.add_output_dependencies(task_id);
777                }
778                drop(reader_task);
779
780                queue.execute(&mut ctx);
781            } else {
782                drop(task);
783            }
784
785            return result.map_err(|error| {
786                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
787                    .with_task_context(task_id, turbo_tasks.pin())
788                    .into()
789            });
790        }
791        drop(reader_task);
792
793        let note = EventDescription::new(|| {
794            move || {
795                if let Some(reader) = reader_description.as_ref() {
796                    format!("try_read_task_output (recompute) from {reader}",)
797                } else {
798                    "try_read_task_output (recompute, untracked)".to_string()
799                }
800            }
801        });
802
803        // Output doesn't exist. We need to schedule the task to compute it.
804        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
805            TaskExecutionReason::OutputNotAvailable,
806            EventDescription::new(|| task.get_task_desc_fn()),
807            note,
808        );
809
810        // It's not possible that the task is InProgress at this point. If it is InProgress {
811        // done: true } it must have Output and would early return.
812        let old = task.set_in_progress(in_progress_state);
813        debug_assert!(old.is_none(), "InProgress already exists");
814        ctx.schedule_task(task, TaskPriority::Initial);
815
816        Ok(Err(listener))
817    }
818
819    fn try_read_task_cell(
820        &self,
821        task_id: TaskId,
822        reader: Option<TaskId>,
823        cell: CellId,
824        options: ReadCellOptions,
825        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
826    ) -> Result<Result<TypedCellContent, EventListener>> {
827        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
828
829        fn add_cell_dependency(
830            task_id: TaskId,
831            mut task: impl TaskGuard,
832            reader: Option<TaskId>,
833            reader_task: Option<impl TaskGuard>,
834            cell: CellId,
835            key: Option<u64>,
836        ) {
837            if let Some(mut reader_task) = reader_task
838                && (!task.immutable() || cfg!(feature = "verify_immutable"))
839            {
840                let reader = reader.unwrap();
841                let _ = task.add_cell_dependents((cell, key, reader));
842                drop(task);
843
844                // Note: We use `task_pair` earlier to lock the task and its reader at the same
845                // time. If we didn't and just locked the reader here, an invalidation could occur
846                // between grabbing the locks. If that happened, and if the task is "outdated" or
847                // doesn't have the dependency edge yet, the invalidation would be lost.
848
849                let target = CellRef {
850                    task: task_id,
851                    cell,
852                };
853                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
854                    let _ = reader_task.add_cell_dependencies((target, key));
855                }
856                drop(reader_task);
857            }
858        }
859
860        let ReadCellOptions {
861            tracking,
862            final_read_hint,
863        } = options;
864
865        let mut ctx = self.execute_context(turbo_tasks);
866        let (mut task, reader_task) = if self.should_track_dependencies()
867            && !matches!(tracking, ReadCellTracking::Untracked)
868            && let Some(reader_id) = reader
869            && reader_id != task_id
870        {
871            // Having a task_pair here is not optimal, but otherwise this would lead to a race
872            // condition. See below.
873            // TODO(sokra): solve that in a more performant way.
874            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
875            (task, Some(reader))
876        } else {
877            (ctx.task(task_id, TaskDataCategory::All), None)
878        };
879
880        let content = if final_read_hint {
881            task.remove_cell_data(&cell)
882        } else {
883            task.get_cell_data(&cell).cloned()
884        };
885        if let Some(content) = content {
886            if tracking.should_track(false) {
887                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
888            }
889            return Ok(Ok(TypedCellContent(
890                cell.type_id,
891                CellContent(Some(content)),
892            )));
893        }
894
895        let in_progress = task.get_in_progress();
896        if matches!(
897            in_progress,
898            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
899        ) {
900            return Ok(Err(self
901                .listen_to_cell(&mut task, task_id, &reader_task, cell)
902                .0));
903        }
904        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
905
906        // Check cell index range (cell might not exist at all)
907        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
908        let Some(max_id) = max_id else {
909            let task_desc = task.get_task_description();
910            if tracking.should_track(true) {
911                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
912            }
913            bail!(
914                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
915            );
916        };
917        if cell.index >= max_id {
918            let task_desc = task.get_task_description();
919            if tracking.should_track(true) {
920                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
921            }
922            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
923        }
924
925        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
926        // task to get the cell content.
927
928        // Bail early if the task was cancelled — no point in registering a listener
929        // on a task that won't execute again.
930        if is_cancelled {
931            bail!("{} was canceled", task.get_task_description());
932        }
933
934        // Listen to the cell and potentially schedule the task
935        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
936        drop(reader_task);
937        if !new_listener {
938            return Ok(Err(listener));
939        }
940
941        let _span = tracing::trace_span!(
942            "recomputation",
943            cell_type = get_value_type(cell.type_id).ty.global_name,
944            cell_index = cell.index
945        )
946        .entered();
947
948        let _ = task.add_scheduled(
949            TaskExecutionReason::CellNotAvailable,
950            EventDescription::new(|| task.get_task_desc_fn()),
951        );
952        ctx.schedule_task(task, TaskPriority::Initial);
953
954        Ok(Err(listener))
955    }
956
957    fn listen_to_cell(
958        &self,
959        task: &mut impl TaskGuard,
960        task_id: TaskId,
961        reader_task: &Option<impl TaskGuard>,
962        cell: CellId,
963    ) -> (EventListener, bool) {
964        let note = || {
965            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
966            move || {
967                if let Some(reader_desc) = reader_desc.as_ref() {
968                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
969                } else {
970                    "try_read_task_cell (in progress, untracked)".to_string()
971                }
972            }
973        };
974        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
975            // Someone else is already computing the cell
976            let listener = in_progress.event.listen_with_note(note);
977            return (listener, false);
978        }
979        let in_progress = InProgressCellState::new(task_id, cell);
980        let listener = in_progress.event.listen_with_note(note);
981        let old = task.insert_in_progress_cells(cell, in_progress);
982        debug_assert!(old.is_none(), "InProgressCell already exists");
983        (listener, true)
984    }
985
986    fn snapshot_and_persist(
987        &self,
988        parent_span: Option<tracing::Id>,
989        reason: &str,
990        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
991    ) -> Result<(Instant, bool), anyhow::Error> {
992        let snapshot_span =
993            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
994                .entered();
995        let start = Instant::now();
996        // SystemTime for wall-clock timestamps in trace events (milliseconds
997        // since epoch). Instant is monotonic but has no defined epoch, so it
998        // can't be used for cross-process trace correlation.
999        let wall_start = SystemTime::now();
1000        debug_assert!(self.should_persist());
1001
1002        let suspended_operations;
1003        {
1004            let _span = tracing::info_span!("blocking").entered();
1005            let mut snapshot_request = self.snapshot_request.lock();
1006            snapshot_request.snapshot_requested = true;
1007            let active_operations = self
1008                .in_progress_operations
1009                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1010            if active_operations != 0 {
1011                self.operations_suspended
1012                    .wait_while(&mut snapshot_request, |_| {
1013                        self.in_progress_operations.load(Ordering::Relaxed)
1014                            != SNAPSHOT_REQUESTED_BIT
1015                    });
1016            }
1017            suspended_operations = snapshot_request
1018                .suspended_operations
1019                .iter()
1020                .map(|op| op.arc().clone())
1021                .collect::<Vec<_>>();
1022        }
1023        // Enter snapshot mode, which atomically reads and resets the modified count.
1024        // Checking after start_snapshot ensures no concurrent increments can race.
1025        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
1026        let mut snapshot_request = self.snapshot_request.lock();
1027        snapshot_request.snapshot_requested = false;
1028        self.in_progress_operations
1029            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1030        self.snapshot_completed.notify_all();
1031        let snapshot_time = Instant::now();
1032        drop(snapshot_request);
1033
1034        if !has_modifications {
1035            // No tasks modified since the last snapshot — drop the guard (which
1036            // calls end_snapshot) and skip the expensive O(N) scan.
1037            drop(snapshot_guard);
1038            return Ok((start, false));
1039        }
1040
1041        #[cfg(feature = "print_cache_item_size")]
1042        #[derive(Default)]
1043        struct TaskCacheStats {
1044            data: usize,
1045            #[cfg(feature = "print_cache_item_size_with_compressed")]
1046            data_compressed: usize,
1047            data_count: usize,
1048            meta: usize,
1049            #[cfg(feature = "print_cache_item_size_with_compressed")]
1050            meta_compressed: usize,
1051            meta_count: usize,
1052            upper_count: usize,
1053            collectibles_count: usize,
1054            aggregated_collectibles_count: usize,
1055            children_count: usize,
1056            followers_count: usize,
1057            collectibles_dependents_count: usize,
1058            aggregated_dirty_containers_count: usize,
1059            output_size: usize,
1060        }
1061        /// Formats a byte size, optionally including the compressed size when the
1062        /// `print_cache_item_size_with_compressed` feature is enabled.
1063        #[cfg(feature = "print_cache_item_size")]
1064        struct FormatSizes {
1065            size: usize,
1066            #[cfg(feature = "print_cache_item_size_with_compressed")]
1067            compressed_size: usize,
1068        }
1069        #[cfg(feature = "print_cache_item_size")]
1070        impl std::fmt::Display for FormatSizes {
1071            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1072                use turbo_tasks::util::FormatBytes;
1073                #[cfg(feature = "print_cache_item_size_with_compressed")]
1074                {
1075                    write!(
1076                        f,
1077                        "{} ({} compressed)",
1078                        FormatBytes(self.size),
1079                        FormatBytes(self.compressed_size)
1080                    )
1081                }
1082                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1083                {
1084                    write!(f, "{}", FormatBytes(self.size))
1085                }
1086            }
1087        }
1088        #[cfg(feature = "print_cache_item_size")]
1089        impl TaskCacheStats {
1090            #[cfg(feature = "print_cache_item_size_with_compressed")]
1091            fn compressed_size(data: &[u8]) -> Result<usize> {
1092                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1093                    data,
1094                    &mut Vec::new(),
1095                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1096                )?)
1097            }
1098
1099            fn add_data(&mut self, data: &[u8]) {
1100                self.data += data.len();
1101                #[cfg(feature = "print_cache_item_size_with_compressed")]
1102                {
1103                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1104                }
1105                self.data_count += 1;
1106            }
1107
1108            fn add_meta(&mut self, data: &[u8]) {
1109                self.meta += data.len();
1110                #[cfg(feature = "print_cache_item_size_with_compressed")]
1111                {
1112                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1113                }
1114                self.meta_count += 1;
1115            }
1116
1117            fn add_counts(&mut self, storage: &TaskStorage) {
1118                let counts = storage.meta_counts();
1119                self.upper_count += counts.upper;
1120                self.collectibles_count += counts.collectibles;
1121                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1122                self.children_count += counts.children;
1123                self.followers_count += counts.followers;
1124                self.collectibles_dependents_count += counts.collectibles_dependents;
1125                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1126                if let Some(output) = storage.get_output() {
1127                    use turbo_bincode::turbo_bincode_encode;
1128
1129                    self.output_size += turbo_bincode_encode(&output)
1130                        .map(|data| data.len())
1131                        .unwrap_or(0);
1132                }
1133            }
1134
1135            /// Returns the task name used as the stats grouping key.
1136            fn task_name(storage: &TaskStorage) -> String {
1137                storage
1138                    .get_persistent_task_type()
1139                    .map(|t| t.to_string())
1140                    .unwrap_or_else(|| "<unknown>".to_string())
1141            }
1142
1143            /// Returns the primary sort key: compressed total when
1144            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1145            fn sort_key(&self) -> usize {
1146                #[cfg(feature = "print_cache_item_size_with_compressed")]
1147                {
1148                    self.data_compressed + self.meta_compressed
1149                }
1150                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1151                {
1152                    self.data + self.meta
1153                }
1154            }
1155
1156            fn format_total(&self) -> FormatSizes {
1157                FormatSizes {
1158                    size: self.data + self.meta,
1159                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1160                    compressed_size: self.data_compressed + self.meta_compressed,
1161                }
1162            }
1163
1164            fn format_data(&self) -> FormatSizes {
1165                FormatSizes {
1166                    size: self.data,
1167                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1168                    compressed_size: self.data_compressed,
1169                }
1170            }
1171
1172            fn format_avg_data(&self) -> FormatSizes {
1173                FormatSizes {
1174                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1175                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1176                    compressed_size: self
1177                        .data_compressed
1178                        .checked_div(self.data_count)
1179                        .unwrap_or(0),
1180                }
1181            }
1182
1183            fn format_meta(&self) -> FormatSizes {
1184                FormatSizes {
1185                    size: self.meta,
1186                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1187                    compressed_size: self.meta_compressed,
1188                }
1189            }
1190
1191            fn format_avg_meta(&self) -> FormatSizes {
1192                FormatSizes {
1193                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1194                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1195                    compressed_size: self
1196                        .meta_compressed
1197                        .checked_div(self.meta_count)
1198                        .unwrap_or(0),
1199                }
1200            }
1201        }
1202        #[cfg(feature = "print_cache_item_size")]
1203        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1204            Mutex::new(FxHashMap::default());
1205
1206        // Encode each task's modified categories. We only encode categories with `modified` set,
1207        // meaning the category was actually dirtied. Categories restored from disk but never
1208        // modified don't need re-persisting since the on-disk version is still valid.
1209        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1210        // flags were copied from the live task at snapshot creation time, reflecting which
1211        // categories were dirtied before the snapshot was taken.
1212        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1213            let encode_category = |task_id: TaskId,
1214                                   data: &TaskStorage,
1215                                   category: SpecificTaskDataCategory,
1216                                   buffer: &mut TurboBincodeBuffer|
1217             -> Option<TurboBincodeBuffer> {
1218                match encode_task_data(task_id, data, category, buffer) {
1219                    Ok(encoded) => {
1220                        #[cfg(feature = "print_cache_item_size")]
1221                        {
1222                            let mut stats = task_cache_stats.lock();
1223                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1224                            match category {
1225                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1226                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1227                            }
1228                        }
1229                        Some(encoded)
1230                    }
1231                    Err(err) => {
1232                        panic!(
1233                            "Serializing task {} failed ({:?}): {:?}",
1234                            self.debug_get_task_description(task_id),
1235                            category,
1236                            err
1237                        );
1238                    }
1239                }
1240            };
1241            if task_id.is_transient() {
1242                unreachable!("transient task_ids should never be enqueued to be persisted");
1243            }
1244
1245            let encode_meta = inner.flags.meta_modified();
1246            let encode_data = inner.flags.data_modified();
1247
1248            #[cfg(feature = "print_cache_item_size")]
1249            if encode_data || encode_meta {
1250                task_cache_stats
1251                    .lock()
1252                    .entry(TaskCacheStats::task_name(inner))
1253                    .or_default()
1254                    .add_counts(inner);
1255            }
1256
1257            let meta = if encode_meta {
1258                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1259            } else {
1260                None
1261            };
1262
1263            let data = if encode_data {
1264                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1265            } else {
1266                None
1267            };
1268            let task_type_hash = if inner.flags.new_task() {
1269                let task_type = inner.get_persistent_task_type().expect(
1270                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1271                     creation for persistent tasks uses a single ExecutionContextImpl for \
1272                     creating the task (which sets new_task) and connect_child (which sets \
1273                     persistent_task_type) and take_snapshot waits for all operations to complete \
1274                     or suspend before we start snapshotting.  So task creation will always set \
1275                     the task_type.",
1276                );
1277                Some(compute_task_type_hash(task_type))
1278            } else {
1279                None
1280            };
1281
1282            SnapshotItem {
1283                task_id,
1284                meta,
1285                data,
1286                task_type_hash,
1287            }
1288        };
1289
1290        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1291
1292        drop(snapshot_span);
1293        let snapshot_duration = start.elapsed();
1294        let task_count = task_snapshots.len();
1295
1296        if task_snapshots.is_empty() {
1297            // This should be impossible — if we got here, modified_count was nonzero, and every
1298            // modification that increments the count also failed during encoding.
1299            std::hint::cold_path();
1300            return Ok((snapshot_time, false));
1301        }
1302
1303        let persist_start = Instant::now();
1304        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1305        {
1306            // Tasks were already consumed by take_snapshot, so a future snapshot
1307            // would not re-persist them — returning an error signals to the caller
1308            // that further persist attempts would corrupt the task graph in storage.
1309            self.backing_storage
1310                .save_snapshot(suspended_operations, task_snapshots)?;
1311            #[cfg(feature = "print_cache_item_size")]
1312            {
1313                let mut task_cache_stats = task_cache_stats
1314                    .into_inner()
1315                    .into_iter()
1316                    .collect::<Vec<_>>();
1317                if !task_cache_stats.is_empty() {
1318                    use turbo_tasks::util::FormatBytes;
1319
1320                    use crate::utils::markdown_table::print_markdown_table;
1321
1322                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1323                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1324                    });
1325
1326                    println!(
1327                        "Task cache stats: {}",
1328                        FormatSizes {
1329                            size: task_cache_stats
1330                                .iter()
1331                                .map(|(_, s)| s.data + s.meta)
1332                                .sum::<usize>(),
1333                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1334                            compressed_size: task_cache_stats
1335                                .iter()
1336                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1337                                .sum::<usize>()
1338                        },
1339                    );
1340
1341                    print_markdown_table(
1342                        [
1343                            "Task",
1344                            " Total Size",
1345                            " Data Size",
1346                            " Data Count x Avg",
1347                            " Data Count x Avg",
1348                            " Meta Size",
1349                            " Meta Count x Avg",
1350                            " Meta Count x Avg",
1351                            " Uppers",
1352                            " Coll",
1353                            " Agg Coll",
1354                            " Children",
1355                            " Followers",
1356                            " Coll Deps",
1357                            " Agg Dirty",
1358                            " Output Size",
1359                        ],
1360                        task_cache_stats.iter(),
1361                        |(task_desc, stats)| {
1362                            [
1363                                task_desc.to_string(),
1364                                format!(" {}", stats.format_total()),
1365                                format!(" {}", stats.format_data()),
1366                                format!(" {} x", stats.data_count),
1367                                format!("{}", stats.format_avg_data()),
1368                                format!(" {}", stats.format_meta()),
1369                                format!(" {} x", stats.meta_count),
1370                                format!("{}", stats.format_avg_meta()),
1371                                format!(" {}", stats.upper_count),
1372                                format!(" {}", stats.collectibles_count),
1373                                format!(" {}", stats.aggregated_collectibles_count),
1374                                format!(" {}", stats.children_count),
1375                                format!(" {}", stats.followers_count),
1376                                format!(" {}", stats.collectibles_dependents_count),
1377                                format!(" {}", stats.aggregated_dirty_containers_count),
1378                                format!(" {}", FormatBytes(stats.output_size)),
1379                            ]
1380                        },
1381                    );
1382                }
1383            }
1384        }
1385
1386        let elapsed = start.elapsed();
1387        let persist_duration = persist_start.elapsed();
1388        // avoid spamming the event queue with information about fast operations
1389        if elapsed > Duration::from_secs(10) {
1390            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1391                "Finished writing to filesystem cache".to_string(),
1392                elapsed,
1393            )));
1394        }
1395
1396        let wall_start_ms = wall_start
1397            .duration_since(SystemTime::UNIX_EPOCH)
1398            .unwrap_or_default()
1399            // as_millis_f64 is not stable yet
1400            .as_secs_f64()
1401            * 1000.0;
1402        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1403        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1404            "turbopack-persistence",
1405            wall_start_ms,
1406            wall_end_ms,
1407            vec![
1408                ("reason", serde_json::Value::from(reason)),
1409                (
1410                    "snapshot_duration_ms",
1411                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1412                ),
1413                (
1414                    "persist_duration_ms",
1415                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1416                ),
1417                ("task_count", serde_json::Value::from(task_count)),
1418            ],
1419        )));
1420
1421        Ok((snapshot_time, true))
1422    }
1423
1424    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1425        if self.should_restore() {
1426            // Continue all uncompleted operations
1427            // They can't be interrupted by a snapshot since the snapshotting job has not been
1428            // scheduled yet.
1429            let uncompleted_operations = self
1430                .backing_storage
1431                .uncompleted_operations()
1432                .expect("Failed to get uncompleted operations");
1433            if !uncompleted_operations.is_empty() {
1434                let mut ctx = self.execute_context(turbo_tasks);
1435                for op in uncompleted_operations {
1436                    op.execute(&mut ctx);
1437                }
1438            }
1439        }
1440
1441        // Only when it should write regularly to the storage, we schedule the initial snapshot
1442        // job.
1443        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1444            // Schedule the snapshot job
1445            let _span = trace_span!("persisting background job").entered();
1446            let _span = tracing::info_span!("thread").entered();
1447            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1448        }
1449    }
1450
1451    fn stopping(&self) {
1452        self.stopping.store(true, Ordering::Release);
1453        self.stopping_event.notify(usize::MAX);
1454    }
1455
1456    #[allow(unused_variables)]
1457    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1458        #[cfg(feature = "verify_aggregation_graph")]
1459        {
1460            self.is_idle.store(false, Ordering::Release);
1461            self.verify_aggregation_graph(turbo_tasks, false);
1462        }
1463        if self.should_persist()
1464            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1465        {
1466            eprintln!("Persisting failed during shutdown: {err:?}");
1467        }
1468        drop_contents(&self.task_cache);
1469        self.storage.drop_contents();
1470        if let Err(err) = self.backing_storage.shutdown() {
1471            println!("Shutting down failed: {err}");
1472        }
1473    }
1474
1475    #[allow(unused_variables)]
1476    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1477        self.idle_start_event.notify(usize::MAX);
1478
1479        #[cfg(feature = "verify_aggregation_graph")]
1480        {
1481            use tokio::select;
1482
1483            self.is_idle.store(true, Ordering::Release);
1484            let this = self.clone();
1485            let turbo_tasks = turbo_tasks.pin();
1486            tokio::task::spawn(async move {
1487                select! {
1488                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1489                        // do nothing
1490                    }
1491                    _ = this.idle_end_event.listen() => {
1492                        return;
1493                    }
1494                }
1495                if !this.is_idle.load(Ordering::Relaxed) {
1496                    return;
1497                }
1498                this.verify_aggregation_graph(&*turbo_tasks, true);
1499            });
1500        }
1501    }
1502
1503    fn idle_end(&self) {
1504        #[cfg(feature = "verify_aggregation_graph")]
1505        self.is_idle.store(false, Ordering::Release);
1506        self.idle_end_event.notify(usize::MAX);
1507    }
1508
1509    fn get_or_create_task(
1510        &self,
1511        native_fn: &'static NativeFunction,
1512        this: Option<RawVc>,
1513        arg: &mut dyn StackDynTaskInputs,
1514        parent_task: Option<TaskId>,
1515        persistence: TaskPersistence,
1516        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1517    ) -> TaskId {
1518        let transient = matches!(persistence, TaskPersistence::Transient);
1519
1520        if transient
1521            && let Some(parent_task) = parent_task
1522            && !parent_task.is_transient()
1523        {
1524            let task_type = CachedTaskType {
1525                native_fn,
1526                this,
1527                arg: arg.take_box(),
1528            };
1529            self.panic_persistent_calling_transient(
1530                self.debug_get_task_description(parent_task),
1531                Some(&task_type),
1532                /* cell_id */ None,
1533            );
1534        }
1535
1536        let is_root = native_fn.is_root;
1537
1538        // Compute hash and shard index once from borrowed components (no heap allocation).
1539        let arg_ref = arg.as_ref();
1540        let hash = CachedTaskType::hash_from_components(
1541            self.task_cache.hasher(),
1542            native_fn,
1543            this,
1544            arg_ref,
1545        );
1546        // Locate the shard once so that the read-only lookup and any
1547        // write-lock retry below share the same reference (saves a modulo +
1548        // memory lookup on the miss path).
1549        let shard = get_shard(&self.task_cache, hash);
1550
1551        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1552        // Use a read lock rather than a write lock to avoid contention. connect_child
1553        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1554        if let Some(task_id) =
1555            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1556        {
1557            self.track_cache_hit_by_fn(native_fn);
1558            self.connect_child(parent_task, task_id, turbo_tasks);
1559            return task_id;
1560        }
1561
1562        // Step 2: Check backing storage using borrowed components (no box needed yet).
1563        let mut ctx = self.execute_context(turbo_tasks);
1564
1565        let mut is_new = false;
1566
1567        // Task exists in backing storage.
1568        // We only need to insert it into the in-memory cache.
1569        let task_id = if !transient
1570            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1571        {
1572            self.track_cache_hit_by_fn(native_fn);
1573            // Step 3a: Insert into in-memory cache using the pre-located shard.
1574            // Use the existing Arc from storage to avoid a duplicate allocation.
1575            match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| {
1576                k.eq_components(native_fn, this, arg_ref)
1577            }) {
1578                RawEntry::Occupied(_) => {}
1579                RawEntry::Vacant(e) => {
1580                    e.insert(stored_type, task_id);
1581                }
1582            };
1583            task_id
1584        } else {
1585            match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| {
1586                k.eq_components(native_fn, this, arg_ref)
1587            }) {
1588                RawEntry::Occupied(e) => {
1589                    // Another thread beat us to creating this task — use their task_id.
1590                    // They will handle logging the new task as modified.
1591                    let task_id = *e.get();
1592                    drop(e);
1593                    self.track_cache_hit_by_fn(native_fn);
1594                    task_id
1595                }
1596                RawEntry::Vacant(e) => {
1597                    // Only now do we force the allocation.
1598                    // NOTE: if our caller had to perform resolution, then this will have already
1599                    // been boxed and take_box just takes it.
1600                    let task_type = Arc::new(CachedTaskType {
1601                        native_fn,
1602                        this,
1603                        arg: arg.take_box(),
1604                    });
1605                    let task_id = if transient {
1606                        self.transient_task_id_factory.get()
1607                    } else {
1608                        self.persisted_task_id_factory.get()
1609                    };
1610                    // Initialize storage BEFORE making task_id visible in the cache.
1611                    // This ensures any thread that reads task_id from the cache sees
1612                    // the storage entry already initialized (restored flags set).
1613                    self.storage
1614                        .initialize_new_task(task_id, Some(task_type.clone()));
1615                    // insert() consumes e, releasing the shard write lock.
1616                    e.insert(task_type, task_id);
1617                    self.track_cache_miss_by_fn(native_fn);
1618                    is_new = true;
1619                    task_id
1620                }
1621            }
1622        };
1623
1624        if is_new && is_root {
1625            AggregationUpdateQueue::run(
1626                AggregationUpdateJob::UpdateAggregationNumber {
1627                    task_id,
1628                    base_aggregation_number: u32::MAX,
1629                    distance: None,
1630                },
1631                &mut ctx,
1632            );
1633        }
1634
1635        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1636
1637        task_id
1638    }
1639
1640    /// Generate an object that implements [`fmt::Display`] explaining why the given
1641    /// [`CachedTaskType`] is transient.
1642    fn debug_trace_transient_task(
1643        &self,
1644        task_type: &CachedTaskType,
1645        cell_id: Option<CellId>,
1646    ) -> DebugTraceTransientTask {
1647        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1648        // from tracing the same task many times, so use a visited_set
1649        fn inner_id(
1650            backend: &TurboTasksBackendInner<impl BackingStorage>,
1651            task_id: TaskId,
1652            cell_type_id: Option<ValueTypeId>,
1653            visited_set: &mut FxHashSet<TaskId>,
1654        ) -> DebugTraceTransientTask {
1655            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1656                if visited_set.contains(&task_id) {
1657                    let task_name = task_type.get_name();
1658                    DebugTraceTransientTask::Collapsed {
1659                        task_name,
1660                        cell_type_id,
1661                    }
1662                } else {
1663                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1664                }
1665            } else {
1666                DebugTraceTransientTask::Uncached { cell_type_id }
1667            }
1668        }
1669        fn inner_cached(
1670            backend: &TurboTasksBackendInner<impl BackingStorage>,
1671            task_type: &CachedTaskType,
1672            cell_type_id: Option<ValueTypeId>,
1673            visited_set: &mut FxHashSet<TaskId>,
1674        ) -> DebugTraceTransientTask {
1675            let task_name = task_type.get_name();
1676
1677            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1678                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1679                    // `task_id` should never be `None` at this point, as that would imply a
1680                    // non-local task is returning a local `Vc`...
1681                    // Just ignore if it happens, as we're likely already panicking.
1682                    return None;
1683                };
1684                if task_id.is_transient() {
1685                    Some(Box::new(inner_id(
1686                        backend,
1687                        task_id,
1688                        cause_self_raw_vc.try_get_type_id(),
1689                        visited_set,
1690                    )))
1691                } else {
1692                    None
1693                }
1694            });
1695            let cause_args = task_type
1696                .arg
1697                .get_raw_vcs()
1698                .into_iter()
1699                .filter_map(|raw_vc| {
1700                    let Some(task_id) = raw_vc.try_get_task_id() else {
1701                        // `task_id` should never be `None` (see comment above)
1702                        return None;
1703                    };
1704                    if !task_id.is_transient() {
1705                        return None;
1706                    }
1707                    Some((task_id, raw_vc.try_get_type_id()))
1708                })
1709                .collect::<IndexSet<_>>() // dedupe
1710                .into_iter()
1711                .map(|(task_id, cell_type_id)| {
1712                    inner_id(backend, task_id, cell_type_id, visited_set)
1713                })
1714                .collect();
1715
1716            DebugTraceTransientTask::Cached {
1717                task_name,
1718                cell_type_id,
1719                cause_self,
1720                cause_args,
1721            }
1722        }
1723        inner_cached(
1724            self,
1725            task_type,
1726            cell_id.map(|c| c.type_id),
1727            &mut FxHashSet::default(),
1728        )
1729    }
1730
1731    fn invalidate_task(
1732        &self,
1733        task_id: TaskId,
1734        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1735    ) {
1736        if !self.should_track_dependencies() {
1737            panic!("Dependency tracking is disabled so invalidation is not allowed");
1738        }
1739        operation::InvalidateOperation::run(
1740            smallvec![task_id],
1741            #[cfg(feature = "trace_task_dirty")]
1742            TaskDirtyCause::Invalidator,
1743            self.execute_context(turbo_tasks),
1744        );
1745    }
1746
1747    fn invalidate_tasks(
1748        &self,
1749        tasks: &[TaskId],
1750        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1751    ) {
1752        if !self.should_track_dependencies() {
1753            panic!("Dependency tracking is disabled so invalidation is not allowed");
1754        }
1755        operation::InvalidateOperation::run(
1756            tasks.iter().copied().collect(),
1757            #[cfg(feature = "trace_task_dirty")]
1758            TaskDirtyCause::Unknown,
1759            self.execute_context(turbo_tasks),
1760        );
1761    }
1762
1763    fn invalidate_tasks_set(
1764        &self,
1765        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1766        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1767    ) {
1768        if !self.should_track_dependencies() {
1769            panic!("Dependency tracking is disabled so invalidation is not allowed");
1770        }
1771        operation::InvalidateOperation::run(
1772            tasks.iter().copied().collect(),
1773            #[cfg(feature = "trace_task_dirty")]
1774            TaskDirtyCause::Unknown,
1775            self.execute_context(turbo_tasks),
1776        );
1777    }
1778
1779    fn invalidate_serialization(
1780        &self,
1781        task_id: TaskId,
1782        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1783    ) {
1784        if task_id.is_transient() {
1785            return;
1786        }
1787        let mut ctx = self.execute_context(turbo_tasks);
1788        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1789        task.invalidate_serialization();
1790    }
1791
1792    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1793        let task = self.storage.access_mut(task_id);
1794        if let Some(value) = task.get_persistent_task_type() {
1795            format!("{task_id:?} {}", value)
1796        } else if let Some(value) = task.get_transient_task_type() {
1797            format!("{task_id:?} {}", value)
1798        } else {
1799            format!("{task_id:?} unknown")
1800        }
1801    }
1802
1803    fn get_task_name(
1804        &self,
1805        task_id: TaskId,
1806        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1807    ) -> String {
1808        let mut ctx = self.execute_context(turbo_tasks);
1809        let task = ctx.task(task_id, TaskDataCategory::Data);
1810        if let Some(value) = task.get_persistent_task_type() {
1811            value.to_string()
1812        } else if let Some(value) = task.get_transient_task_type() {
1813            value.to_string()
1814        } else {
1815            "unknown".to_string()
1816        }
1817    }
1818
1819    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1820        let task = self.storage.access_mut(task_id);
1821        task.get_persistent_task_type().cloned()
1822    }
1823
1824    fn task_execution_canceled(
1825        &self,
1826        task_id: TaskId,
1827        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1828    ) {
1829        let mut ctx = self.execute_context(turbo_tasks);
1830        let mut task = ctx.task(task_id, TaskDataCategory::All);
1831        if let Some(in_progress) = task.take_in_progress() {
1832            match in_progress {
1833                InProgressState::Scheduled {
1834                    done_event,
1835                    reason: _,
1836                } => done_event.notify(usize::MAX),
1837                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1838                    done_event.notify(usize::MAX)
1839                }
1840                InProgressState::Canceled => {}
1841            }
1842        }
1843        // Notify any readers waiting on in-progress cells so their listeners
1844        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1845        let in_progress_cells = task.take_in_progress_cells();
1846        if let Some(ref cells) = in_progress_cells {
1847            for state in cells.values() {
1848                state.event.notify(usize::MAX);
1849            }
1850        }
1851
1852        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1853        // in the next session. Without this, any reader that encounters the cancelled task
1854        // records an error in its output. That error is persisted and would poison
1855        // subsequent builds. By marking the task session-dependent dirty, the next build
1856        // re-executes it, which invalidates dependents and corrects the stale errors.
1857        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1858            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1859        } else {
1860            None
1861        };
1862
1863        let old = task.set_in_progress(InProgressState::Canceled);
1864        debug_assert!(old.is_none(), "InProgress already exists");
1865        drop(task);
1866
1867        if let Some(data_update) = data_update {
1868            AggregationUpdateQueue::run(data_update, &mut ctx);
1869        }
1870
1871        drop(in_progress_cells);
1872    }
1873
1874    fn try_start_task_execution(
1875        &self,
1876        task_id: TaskId,
1877        priority: TaskPriority,
1878        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1879    ) -> Option<TaskExecutionSpec<'_>> {
1880        let execution_reason;
1881        let task_type;
1882        {
1883            let mut ctx = self.execute_context(turbo_tasks);
1884            let mut task = ctx.task(task_id, TaskDataCategory::All);
1885            task_type = task.get_task_type().to_owned();
1886            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1887            if let Some(tasks) = task.prefetch() {
1888                drop(task);
1889                ctx.prepare_tasks(tasks, "prefetch");
1890                task = ctx.task(task_id, TaskDataCategory::All);
1891            }
1892            let in_progress = task.take_in_progress()?;
1893            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1894                let old = task.set_in_progress(in_progress);
1895                debug_assert!(old.is_none(), "InProgress already exists");
1896                return None;
1897            };
1898            execution_reason = reason;
1899            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1900                InProgressStateInner {
1901                    stale: false,
1902                    once_task,
1903                    done_event,
1904                    session_dependent: false,
1905                    marked_as_completed: false,
1906                    new_children: Default::default(),
1907                },
1908            )));
1909            debug_assert!(old.is_none(), "InProgress already exists");
1910
1911            // Make all current collectibles outdated (remove left-over outdated collectibles)
1912            enum Collectible {
1913                Current(CollectibleRef, i32),
1914                Outdated(CollectibleRef),
1915            }
1916            let collectibles = task
1917                .iter_collectibles()
1918                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1919                .chain(
1920                    task.iter_outdated_collectibles()
1921                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1922                )
1923                .collect::<Vec<_>>();
1924            for collectible in collectibles {
1925                match collectible {
1926                    Collectible::Current(collectible, value) => {
1927                        let _ = task.insert_outdated_collectible(collectible, value);
1928                    }
1929                    Collectible::Outdated(collectible) => {
1930                        if task
1931                            .collectibles()
1932                            .is_none_or(|m| m.get(&collectible).is_none())
1933                        {
1934                            task.remove_outdated_collectibles(&collectible);
1935                        }
1936                    }
1937                }
1938            }
1939
1940            if self.should_track_dependencies() {
1941                // Make all dependencies outdated
1942                let cell_dependencies = task.iter_cell_dependencies().collect();
1943                task.set_outdated_cell_dependencies(cell_dependencies);
1944
1945                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1946                task.set_outdated_output_dependencies(outdated_output_dependencies);
1947            }
1948        }
1949
1950        let (span, future) = match task_type {
1951            TaskType::Cached(task_type) => {
1952                let CachedTaskType {
1953                    native_fn,
1954                    this,
1955                    arg,
1956                } = &*task_type;
1957                (
1958                    native_fn.span(task_id.persistence(), execution_reason, priority),
1959                    native_fn.execute(*this, &**arg),
1960                )
1961            }
1962            TaskType::Transient(task_type) => {
1963                let span = tracing::trace_span!("turbo_tasks::root_task");
1964                let future = match &*task_type {
1965                    TransientTask::Root(f) => f(),
1966                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1967                };
1968                (span, future)
1969            }
1970        };
1971        Some(TaskExecutionSpec { future, span })
1972    }
1973
1974    fn task_execution_completed(
1975        &self,
1976        task_id: TaskId,
1977        result: Result<RawVc, TurboTasksExecutionError>,
1978        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1979        #[cfg(feature = "verify_determinism")] stateful: bool,
1980        has_invalidator: bool,
1981        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1982    ) -> bool {
1983        // Task completion is a 4 step process:
1984        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1985        //    aggregation number of the task and the new children.
1986        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1987        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1988        // 4. Shrink the task memory to reduce footprint of the task.
1989
1990        // Due to persistence it is possible that the process is cancelled after any step. This is
1991        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1992        // in-memory representation.
1993
1994        // The task might be invalidated during this process, so we need to check the stale flag
1995        // at the start of every step.
1996
1997        #[cfg(not(feature = "trace_task_details"))]
1998        let span = tracing::trace_span!(
1999            "task execution completed",
2000            new_children = tracing::field::Empty
2001        )
2002        .entered();
2003        #[cfg(feature = "trace_task_details")]
2004        let span = tracing::trace_span!(
2005            "task execution completed",
2006            task_id = display(task_id),
2007            result = match result.as_ref() {
2008                Ok(value) => display(either::Either::Left(value)),
2009                Err(err) => display(either::Either::Right(err)),
2010            },
2011            new_children = tracing::field::Empty,
2012            immutable = tracing::field::Empty,
2013            new_output = tracing::field::Empty,
2014            output_dependents = tracing::field::Empty,
2015            stale = tracing::field::Empty,
2016        )
2017        .entered();
2018
2019        let is_error = result.is_err();
2020
2021        let mut ctx = self.execute_context(turbo_tasks);
2022
2023        let Some(TaskExecutionCompletePrepareResult {
2024            new_children,
2025            is_now_immutable,
2026            #[cfg(feature = "verify_determinism")]
2027            no_output_set,
2028            new_output,
2029            output_dependent_tasks,
2030            is_recomputation,
2031        }) = self.task_execution_completed_prepare(
2032            &mut ctx,
2033            #[cfg(feature = "trace_task_details")]
2034            &span,
2035            task_id,
2036            result,
2037            cell_counters,
2038            #[cfg(feature = "verify_determinism")]
2039            stateful,
2040            has_invalidator,
2041        )
2042        else {
2043            // Task was stale and has been rescheduled
2044            #[cfg(feature = "trace_task_details")]
2045            span.record("stale", "prepare");
2046            return true;
2047        };
2048
2049        #[cfg(feature = "trace_task_details")]
2050        span.record("new_output", new_output.is_some());
2051        #[cfg(feature = "trace_task_details")]
2052        span.record("output_dependents", output_dependent_tasks.len());
2053
2054        // When restoring from filesystem cache the following might not be executed (since we can
2055        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2056        // would be executed again.
2057
2058        if !output_dependent_tasks.is_empty() {
2059            self.task_execution_completed_invalidate_output_dependent(
2060                &mut ctx,
2061                task_id,
2062                output_dependent_tasks,
2063            );
2064        }
2065
2066        let has_new_children = !new_children.is_empty();
2067        span.record("new_children", new_children.len());
2068
2069        if has_new_children {
2070            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2071        }
2072
2073        if has_new_children
2074            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2075        {
2076            // Task was stale and has been rescheduled
2077            #[cfg(feature = "trace_task_details")]
2078            span.record("stale", "connect");
2079            return true;
2080        }
2081
2082        let (stale, in_progress_cells) = self.task_execution_completed_finish(
2083            &mut ctx,
2084            task_id,
2085            #[cfg(feature = "verify_determinism")]
2086            no_output_set,
2087            new_output,
2088            is_now_immutable,
2089        );
2090        if stale {
2091            // Task was stale and has been rescheduled
2092            #[cfg(feature = "trace_task_details")]
2093            span.record("stale", "finish");
2094            return true;
2095        }
2096
2097        let removed_data = self.task_execution_completed_cleanup(
2098            &mut ctx,
2099            task_id,
2100            cell_counters,
2101            is_error,
2102            is_recomputation,
2103        );
2104
2105        // Drop data outside of critical sections
2106        drop(removed_data);
2107        drop(in_progress_cells);
2108
2109        false
2110    }
2111
2112    fn task_execution_completed_prepare(
2113        &self,
2114        ctx: &mut impl ExecuteContext<'_>,
2115        #[cfg(feature = "trace_task_details")] span: &Span,
2116        task_id: TaskId,
2117        result: Result<RawVc, TurboTasksExecutionError>,
2118        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2119        #[cfg(feature = "verify_determinism")] stateful: bool,
2120        has_invalidator: bool,
2121    ) -> Option<TaskExecutionCompletePrepareResult> {
2122        let mut task = ctx.task(task_id, TaskDataCategory::All);
2123        let is_recomputation = task.is_dirty().is_none();
2124        let Some(in_progress) = task.get_in_progress_mut() else {
2125            panic!("Task execution completed, but task is not in progress: {task:#?}");
2126        };
2127        if matches!(in_progress, InProgressState::Canceled) {
2128            return Some(TaskExecutionCompletePrepareResult {
2129                new_children: Default::default(),
2130                is_now_immutable: false,
2131                #[cfg(feature = "verify_determinism")]
2132                no_output_set: false,
2133                new_output: None,
2134                output_dependent_tasks: Default::default(),
2135                is_recomputation,
2136            });
2137        }
2138        let &mut InProgressState::InProgress(box InProgressStateInner {
2139            stale,
2140            ref mut new_children,
2141            session_dependent,
2142            once_task: is_once_task,
2143            ..
2144        }) = in_progress
2145        else {
2146            panic!("Task execution completed, but task is not in progress: {task:#?}");
2147        };
2148
2149        // If the task is stale, reschedule it
2150        #[cfg(not(feature = "no_fast_stale"))]
2151        if stale && !is_once_task {
2152            let Some(InProgressState::InProgress(box InProgressStateInner {
2153                done_event,
2154                mut new_children,
2155                ..
2156            })) = task.take_in_progress()
2157            else {
2158                unreachable!();
2159            };
2160            let old = task.set_in_progress(InProgressState::Scheduled {
2161                done_event,
2162                reason: TaskExecutionReason::Stale,
2163            });
2164            debug_assert!(old.is_none(), "InProgress already exists");
2165            // Remove old children from new_children to leave only the children that had their
2166            // active count increased
2167            for task in task.iter_children() {
2168                new_children.remove(&task);
2169            }
2170            drop(task);
2171
2172            // We need to undo the active count increase for the children since we throw away the
2173            // new_children list now.
2174            AggregationUpdateQueue::run(
2175                AggregationUpdateJob::DecreaseActiveCounts {
2176                    task_ids: new_children.into_iter().collect(),
2177                },
2178                ctx,
2179            );
2180            return None;
2181        }
2182
2183        // take the children from the task to process them
2184        let mut new_children = take(new_children);
2185
2186        // handle stateful (only tracked when verify_determinism is enabled)
2187        #[cfg(feature = "verify_determinism")]
2188        if stateful {
2189            task.set_stateful(true);
2190        }
2191
2192        // handle has_invalidator
2193        if has_invalidator {
2194            task.set_invalidator(true);
2195        }
2196
2197        // handle cell counters: update max index and remove cells that are no longer used
2198        // On error, skip this update: the task may have failed before creating all cells it
2199        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2200        // based on partial counters would cause "cell no longer exists" errors for tasks that
2201        // still hold dependencies on those cells. The old cell data is preserved on error
2202        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2203        // that data is correct.
2204        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2205        // skips cell data removal on error.
2206        if result.is_ok() || is_recomputation {
2207            let old_counters: FxHashMap<_, _> = task
2208                .iter_cell_type_max_index()
2209                .map(|(&k, &v)| (k, v))
2210                .collect();
2211            let mut counters_to_remove = old_counters.clone();
2212
2213            for (&cell_type, &max_index) in cell_counters.iter() {
2214                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2215                    if old_max_index != max_index {
2216                        task.insert_cell_type_max_index(cell_type, max_index);
2217                    }
2218                } else {
2219                    task.insert_cell_type_max_index(cell_type, max_index);
2220                }
2221            }
2222            for (cell_type, _) in counters_to_remove {
2223                task.remove_cell_type_max_index(&cell_type);
2224            }
2225        }
2226
2227        let mut queue = AggregationUpdateQueue::new();
2228
2229        let mut old_edges = Vec::new();
2230
2231        let has_children = !new_children.is_empty();
2232        let is_immutable = task.immutable();
2233        let task_dependencies_for_immutable =
2234            // Task was previously marked as immutable
2235            if !is_immutable
2236            // Task is not session dependent (session dependent tasks can change between sessions)
2237            && !session_dependent
2238            // Task has no invalidator
2239            && !task.invalidator()
2240            // Task has no dependencies on collectibles
2241            && task.is_collectibles_dependencies_empty()
2242        {
2243            Some(
2244                // Collect all dependencies on tasks to check if all dependencies are immutable
2245                task.iter_output_dependencies()
2246                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2247                    .collect::<FxHashSet<_>>(),
2248            )
2249        } else {
2250            None
2251        };
2252
2253        if has_children {
2254            // Prepare all new children
2255            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2256
2257            // Filter actual new children
2258            old_edges.extend(
2259                task.iter_children()
2260                    .filter(|task| !new_children.remove(task))
2261                    .map(OutdatedEdge::Child),
2262            );
2263        } else {
2264            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2265        }
2266
2267        old_edges.extend(
2268            task.iter_outdated_collectibles()
2269                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2270        );
2271
2272        if self.should_track_dependencies() {
2273            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2274            // At execution start, active deps are copied to outdated as a "before" snapshot.
2275            // During execution, new deps are added to active.
2276            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2277            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2278            // breaking dependency tracking.
2279            old_edges.extend(
2280                task.iter_outdated_cell_dependencies()
2281                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2282            );
2283            old_edges.extend(
2284                task.iter_outdated_output_dependencies()
2285                    .map(OutdatedEdge::OutputDependency),
2286            );
2287        }
2288
2289        // Check if output need to be updated
2290        let current_output = task.get_output();
2291        #[cfg(feature = "verify_determinism")]
2292        let no_output_set = current_output.is_none();
2293        let new_output = match result {
2294            Ok(RawVc::TaskOutput(output_task_id)) => {
2295                if let Some(OutputValue::Output(current_task_id)) = current_output
2296                    && *current_task_id == output_task_id
2297                {
2298                    None
2299                } else {
2300                    Some(OutputValue::Output(output_task_id))
2301                }
2302            }
2303            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2304                if let Some(OutputValue::Cell(CellRef {
2305                    task: current_task_id,
2306                    cell: current_cell,
2307                })) = current_output
2308                    && *current_task_id == output_task_id
2309                    && *current_cell == cell
2310                {
2311                    None
2312                } else {
2313                    Some(OutputValue::Cell(CellRef {
2314                        task: output_task_id,
2315                        cell,
2316                    }))
2317                }
2318            }
2319            Ok(RawVc::LocalOutput(..)) => {
2320                panic!("Non-local tasks must not return a local Vc");
2321            }
2322            Err(err) => {
2323                if let Some(OutputValue::Error(old_error)) = current_output
2324                    && **old_error == err
2325                {
2326                    None
2327                } else {
2328                    Some(OutputValue::Error(Arc::new((&err).into())))
2329                }
2330            }
2331        };
2332        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2333        // When output has changed, grab the dependent tasks
2334        if new_output.is_some() && ctx.should_track_dependencies() {
2335            output_dependent_tasks = task.iter_output_dependent().collect();
2336        }
2337
2338        drop(task);
2339
2340        // Check if the task can be marked as immutable
2341        let mut is_now_immutable = false;
2342        if let Some(dependencies) = task_dependencies_for_immutable
2343            && dependencies
2344                .iter()
2345                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2346        {
2347            is_now_immutable = true;
2348        }
2349        #[cfg(feature = "trace_task_details")]
2350        span.record("immutable", is_immutable || is_now_immutable);
2351
2352        if !queue.is_empty() || !old_edges.is_empty() {
2353            #[cfg(any(
2354                feature = "trace_task_completion",
2355                feature = "trace_aggregation_update_stats"
2356            ))]
2357            let _span =
2358                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2359                    .entered();
2360            // Remove outdated edges first, before removing in_progress+dirty flag.
2361            // We need to make sure all outdated edges are removed before the task can potentially
2362            // be scheduled and executed again
2363            #[cfg(feature = "trace_aggregation_update_stats")]
2364            {
2365                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2366                _span.record("stats", tracing::field::debug(stats));
2367            }
2368            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2369            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2370        }
2371
2372        Some(TaskExecutionCompletePrepareResult {
2373            new_children,
2374            is_now_immutable,
2375            #[cfg(feature = "verify_determinism")]
2376            no_output_set,
2377            new_output,
2378            output_dependent_tasks,
2379            is_recomputation,
2380        })
2381    }
2382
2383    fn task_execution_completed_invalidate_output_dependent(
2384        &self,
2385        ctx: &mut impl ExecuteContext<'_>,
2386        task_id: TaskId,
2387        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2388    ) {
2389        debug_assert!(!output_dependent_tasks.is_empty());
2390
2391        if output_dependent_tasks.len() > 1 {
2392            ctx.prepare_tasks(
2393                output_dependent_tasks
2394                    .iter()
2395                    .map(|&id| (id, TaskDataCategory::All)),
2396                "invalidate output dependents",
2397            );
2398        }
2399
2400        fn process_output_dependents(
2401            ctx: &mut impl ExecuteContext<'_>,
2402            task_id: TaskId,
2403            dependent_task_id: TaskId,
2404            queue: &mut AggregationUpdateQueue,
2405        ) {
2406            #[cfg(feature = "trace_task_output_dependencies")]
2407            let span = tracing::trace_span!(
2408                "invalidate output dependency",
2409                task = %task_id,
2410                dependent_task = %dependent_task_id,
2411                result = tracing::field::Empty,
2412            )
2413            .entered();
2414            let mut make_stale = true;
2415            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2416            let transient_task_type = dependent.get_transient_task_type();
2417            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2418                // once tasks are never invalidated
2419                #[cfg(feature = "trace_task_output_dependencies")]
2420                span.record("result", "once task");
2421                return;
2422            }
2423            if dependent.outdated_output_dependencies_contains(&task_id) {
2424                #[cfg(feature = "trace_task_output_dependencies")]
2425                span.record("result", "outdated dependency");
2426                // output dependency is outdated, so it hasn't read the output yet
2427                // and doesn't need to be invalidated
2428                // But importantly we still need to make the task dirty as it should no longer
2429                // be considered as "recomputation".
2430                make_stale = false;
2431            } else if !dependent.output_dependencies_contains(&task_id) {
2432                // output dependency has been removed, so the task doesn't depend on the
2433                // output anymore and doesn't need to be invalidated
2434                #[cfg(feature = "trace_task_output_dependencies")]
2435                span.record("result", "no backward dependency");
2436                return;
2437            }
2438            make_task_dirty_internal(
2439                dependent,
2440                dependent_task_id,
2441                make_stale,
2442                #[cfg(feature = "trace_task_dirty")]
2443                TaskDirtyCause::OutputChange { task_id },
2444                queue,
2445                ctx,
2446            );
2447            #[cfg(feature = "trace_task_output_dependencies")]
2448            span.record("result", "marked dirty");
2449        }
2450
2451        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2452            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2453            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2454            let _ = scope_and_block(chunks.len(), |scope| {
2455                for chunk in chunks {
2456                    let child_ctx = ctx.child_context();
2457                    scope.spawn(move || {
2458                        let mut ctx = child_ctx.create();
2459                        let mut queue = AggregationUpdateQueue::new();
2460                        for dependent_task_id in chunk {
2461                            process_output_dependents(
2462                                &mut ctx,
2463                                task_id,
2464                                dependent_task_id,
2465                                &mut queue,
2466                            )
2467                        }
2468                        queue.execute(&mut ctx);
2469                    });
2470                }
2471            });
2472        } else {
2473            let mut queue = AggregationUpdateQueue::new();
2474            for dependent_task_id in output_dependent_tasks {
2475                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2476            }
2477            queue.execute(ctx);
2478        }
2479    }
2480
2481    fn task_execution_completed_unfinished_children_dirty(
2482        &self,
2483        ctx: &mut impl ExecuteContext<'_>,
2484        new_children: &FxHashSet<TaskId>,
2485    ) {
2486        debug_assert!(!new_children.is_empty());
2487
2488        let mut queue = AggregationUpdateQueue::new();
2489        ctx.for_each_task_all(
2490            new_children.iter().copied(),
2491            "unfinished children dirty",
2492            |child_task, ctx| {
2493                if !child_task.has_output() {
2494                    let child_id = child_task.id();
2495                    make_task_dirty_internal(
2496                        child_task,
2497                        child_id,
2498                        false,
2499                        #[cfg(feature = "trace_task_dirty")]
2500                        TaskDirtyCause::InitialDirty,
2501                        &mut queue,
2502                        ctx,
2503                    );
2504                }
2505            },
2506        );
2507
2508        queue.execute(ctx);
2509    }
2510
2511    fn task_execution_completed_connect(
2512        &self,
2513        ctx: &mut impl ExecuteContext<'_>,
2514        task_id: TaskId,
2515        new_children: FxHashSet<TaskId>,
2516    ) -> bool {
2517        debug_assert!(!new_children.is_empty());
2518
2519        let mut task = ctx.task(task_id, TaskDataCategory::All);
2520        let Some(in_progress) = task.get_in_progress() else {
2521            panic!("Task execution completed, but task is not in progress: {task:#?}");
2522        };
2523        if matches!(in_progress, InProgressState::Canceled) {
2524            // Task was canceled in the meantime, so we don't connect the children
2525            return false;
2526        }
2527        let InProgressState::InProgress(box InProgressStateInner {
2528            #[cfg(not(feature = "no_fast_stale"))]
2529            stale,
2530            once_task: is_once_task,
2531            ..
2532        }) = in_progress
2533        else {
2534            panic!("Task execution completed, but task is not in progress: {task:#?}");
2535        };
2536
2537        // If the task is stale, reschedule it
2538        #[cfg(not(feature = "no_fast_stale"))]
2539        if *stale && !is_once_task {
2540            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2541                task.take_in_progress()
2542            else {
2543                unreachable!();
2544            };
2545            let old = task.set_in_progress(InProgressState::Scheduled {
2546                done_event,
2547                reason: TaskExecutionReason::Stale,
2548            });
2549            debug_assert!(old.is_none(), "InProgress already exists");
2550            drop(task);
2551
2552            // All `new_children` are currently hold active with an active count and we need to undo
2553            // that. (We already filtered out the old children from that list)
2554            AggregationUpdateQueue::run(
2555                AggregationUpdateJob::DecreaseActiveCounts {
2556                    task_ids: new_children.into_iter().collect(),
2557                },
2558                ctx,
2559            );
2560            return true;
2561        }
2562
2563        let has_active_count = ctx.should_track_activeness()
2564            && task
2565                .get_activeness()
2566                .is_some_and(|activeness| activeness.active_counter > 0);
2567        connect_children(
2568            ctx,
2569            task_id,
2570            task,
2571            new_children,
2572            has_active_count,
2573            ctx.should_track_activeness(),
2574        );
2575
2576        false
2577    }
2578
2579    fn task_execution_completed_finish(
2580        &self,
2581        ctx: &mut impl ExecuteContext<'_>,
2582        task_id: TaskId,
2583        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2584        new_output: Option<OutputValue>,
2585        is_now_immutable: bool,
2586    ) -> (
2587        bool,
2588        Option<
2589            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2590        >,
2591    ) {
2592        let mut task = ctx.task(task_id, TaskDataCategory::All);
2593        let Some(in_progress) = task.take_in_progress() else {
2594            panic!("Task execution completed, but task is not in progress: {task:#?}");
2595        };
2596        if matches!(in_progress, InProgressState::Canceled) {
2597            // Task was canceled in the meantime, so we don't finish it
2598            return (false, None);
2599        }
2600        let InProgressState::InProgress(box InProgressStateInner {
2601            done_event,
2602            once_task: is_once_task,
2603            stale,
2604            session_dependent,
2605            marked_as_completed: _,
2606            new_children,
2607        }) = in_progress
2608        else {
2609            panic!("Task execution completed, but task is not in progress: {task:#?}");
2610        };
2611        debug_assert!(new_children.is_empty());
2612
2613        // If the task is stale, reschedule it
2614        if stale && !is_once_task {
2615            let old = task.set_in_progress(InProgressState::Scheduled {
2616                done_event,
2617                reason: TaskExecutionReason::Stale,
2618            });
2619            debug_assert!(old.is_none(), "InProgress already exists");
2620            return (true, None);
2621        }
2622
2623        // Set the output if it has changed
2624        let mut old_content = None;
2625        if let Some(value) = new_output {
2626            old_content = task.set_output(value);
2627        }
2628
2629        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2630        // to be invalidated and we can mark it as immutable.
2631        if is_now_immutable {
2632            task.set_immutable(true);
2633        }
2634
2635        // Notify in progress cells and remove all of them
2636        let in_progress_cells = task.take_in_progress_cells();
2637        if let Some(ref cells) = in_progress_cells {
2638            for state in cells.values() {
2639                state.event.notify(usize::MAX);
2640            }
2641        }
2642
2643        // Compute the new dirty state
2644        let new_dirtyness = if session_dependent {
2645            Some(Dirtyness::SessionDependent)
2646        } else {
2647            None
2648        };
2649        #[cfg(feature = "verify_determinism")]
2650        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2651        let data_update = task.update_dirty_state(new_dirtyness);
2652
2653        #[cfg(feature = "verify_determinism")]
2654        let reschedule =
2655            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2656        #[cfg(not(feature = "verify_determinism"))]
2657        let reschedule = false;
2658        if reschedule {
2659            let old = task.set_in_progress(InProgressState::Scheduled {
2660                done_event,
2661                reason: TaskExecutionReason::Stale,
2662            });
2663            debug_assert!(old.is_none(), "InProgress already exists");
2664            drop(task);
2665        } else {
2666            drop(task);
2667
2668            // Notify dependent tasks that are waiting for this task to finish
2669            done_event.notify(usize::MAX);
2670        }
2671
2672        drop(old_content);
2673
2674        if let Some(data_update) = data_update {
2675            AggregationUpdateQueue::run(data_update, ctx);
2676        }
2677
2678        // We return so the data can be dropped outside of critical sections
2679        (reschedule, in_progress_cells)
2680    }
2681
2682    fn task_execution_completed_cleanup(
2683        &self,
2684        ctx: &mut impl ExecuteContext<'_>,
2685        task_id: TaskId,
2686        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2687        is_error: bool,
2688        is_recomputation: bool,
2689    ) -> Vec<SharedReference> {
2690        let mut task = ctx.task(task_id, TaskDataCategory::All);
2691        let mut removed_cell_data = Vec::new();
2692        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2693        // after an error as it is likely transient and we want to keep the dependent tasks
2694        // clean to avoid re-executions.
2695        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2696        // skips cell_type_max_index updates on error.
2697        if !is_error || is_recomputation {
2698            // Remove no longer existing cells and
2699            // find all outdated data items (removed cells, outdated edges)
2700            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2701            // anyway and we want to avoid needless re-executions. When the cells become
2702            // used again, they are invalidated from the update cell operation.
2703            let to_remove: Vec<_> = task
2704                .iter_cell_data()
2705                .filter_map(|(cell, _)| {
2706                    cell_counters
2707                        .get(&cell.type_id)
2708                        .is_none_or(|start_index| cell.index >= *start_index)
2709                        .then_some(*cell)
2710                })
2711                .collect();
2712            removed_cell_data.reserve_exact(to_remove.len());
2713            for cell in to_remove {
2714                if let Some(data) = task.remove_cell_data(&cell) {
2715                    removed_cell_data.push(data);
2716                }
2717            }
2718            // Remove cell_data_hash entries for cells that no longer exist
2719            let to_remove_hash: Vec<_> = task
2720                .iter_cell_data_hash()
2721                .filter_map(|(cell, _)| {
2722                    cell_counters
2723                        .get(&cell.type_id)
2724                        .is_none_or(|start_index| cell.index >= *start_index)
2725                        .then_some(*cell)
2726                })
2727                .collect();
2728            for cell in to_remove_hash {
2729                task.remove_cell_data_hash(&cell);
2730            }
2731        }
2732
2733        // Clean up task storage after execution:
2734        // - Shrink collections marked with shrink_on_completion
2735        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2736        task.cleanup_after_execution();
2737
2738        drop(task);
2739
2740        // Return so we can drop outside of critical sections
2741        removed_cell_data
2742    }
2743
2744    /// Prints the standard message emitted when the background persisting process stops due to an
2745    /// unrecoverable write error. The caller is responsible for returning from the background job.
2746    fn log_unrecoverable_persist_error() {
2747        eprintln!(
2748            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2749             background persisting process."
2750        );
2751    }
2752
2753    fn run_backend_job<'a>(
2754        self: &'a Arc<Self>,
2755        job: TurboTasksBackendJob,
2756        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2757    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2758        Box::pin(async move {
2759            match job {
2760                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2761                    debug_assert!(self.should_persist());
2762
2763                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2764                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2765                    let mut idle_start_listener = self.idle_start_event.listen();
2766                    let mut idle_end_listener = self.idle_end_event.listen();
2767                    let mut fresh_idle = true;
2768                    loop {
2769                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2770                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2771                        let idle_timeout = *IDLE_TIMEOUT;
2772                        let (time, mut reason) =
2773                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2774                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2775                            } else {
2776                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2777                            };
2778
2779                        let until = last_snapshot + time;
2780                        if until > Instant::now() {
2781                            let mut stop_listener = self.stopping_event.listen();
2782                            if self.stopping.load(Ordering::Acquire) {
2783                                return;
2784                            }
2785                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2786                                Instant::now() + idle_timeout
2787                            } else {
2788                                far_future()
2789                            };
2790                            loop {
2791                                tokio::select! {
2792                                    _ = &mut stop_listener => {
2793                                        return;
2794                                    },
2795                                    _ = &mut idle_start_listener => {
2796                                        idle_time = Instant::now() + idle_timeout;
2797                                        idle_start_listener = self.idle_start_event.listen()
2798                                    },
2799                                    _ = &mut idle_end_listener => {
2800                                        idle_time = until + idle_timeout;
2801                                        idle_end_listener = self.idle_end_event.listen()
2802                                    },
2803                                    _ = tokio::time::sleep_until(until) => {
2804                                        break;
2805                                    },
2806                                    _ = tokio::time::sleep_until(idle_time) => {
2807                                        if turbo_tasks.is_idle() {
2808                                            reason = "idle timeout";
2809                                            break;
2810                                        }
2811                                    },
2812                                }
2813                            }
2814                        }
2815
2816                        let this = self.clone();
2817                        // Create a root span shared by both the snapshot/persist
2818                        // work and the subsequent compaction so they appear
2819                        // grouped together in trace viewers.
2820                        let background_span =
2821                            tracing::info_span!(parent: None, "background snapshot");
2822                        match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2823                            Err(err) => {
2824                                // save_snapshot consumed persisted_task_cache_log entries;
2825                                // further snapshots would corrupt the task graph.
2826                                eprintln!("Persisting failed: {err:?}");
2827                                Self::log_unrecoverable_persist_error();
2828                                return;
2829                            }
2830                            Ok((snapshot_start, new_data)) => {
2831                                last_snapshot = snapshot_start;
2832
2833                                // Compact while idle (up to limit), regardless of
2834                                // whether the snapshot had new data.
2835                                // `background_span` is not entered here because
2836                                // `EnteredSpan` is `!Send` and would prevent the
2837                                // future from being sent across threads when it
2838                                // suspends at the `select!` await below.
2839                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2840                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2841                                    let idle_ended = tokio::select! {
2842                                        biased;
2843                                        _ = &mut idle_end_listener => {
2844                                            idle_end_listener = self.idle_end_event.listen();
2845                                            true
2846                                        },
2847                                        _ = std::future::ready(()) => false,
2848                                    };
2849                                    if idle_ended {
2850                                        break;
2851                                    }
2852                                    // Enter the span only around the synchronous
2853                                    // compact() call so we never hold an
2854                                    // `EnteredSpan` across an await point.
2855                                    let _compact_span = tracing::info_span!(
2856                                        parent: background_span.id(),
2857                                        "compact database"
2858                                    )
2859                                    .entered();
2860                                    match self.backing_storage.compact() {
2861                                        Ok(true) => {}
2862                                        Ok(false) => break,
2863                                        Err(err) => {
2864                                            eprintln!("Compaction failed: {err:?}");
2865                                            if self.backing_storage.has_unrecoverable_write_error()
2866                                            {
2867                                                Self::log_unrecoverable_persist_error();
2868                                                return;
2869                                            }
2870                                            break;
2871                                        }
2872                                    }
2873                                }
2874
2875                                if !new_data {
2876                                    fresh_idle = false;
2877                                    continue;
2878                                }
2879                                let last_snapshot = last_snapshot.duration_since(self.start_time);
2880                                self.last_snapshot.store(
2881                                    last_snapshot.as_millis().try_into().unwrap(),
2882                                    Ordering::Relaxed,
2883                                );
2884
2885                                turbo_tasks.schedule_backend_background_job(
2886                                    TurboTasksBackendJob::FollowUpSnapshot,
2887                                );
2888                                return;
2889                            }
2890                        }
2891                    }
2892                }
2893            }
2894        })
2895    }
2896
2897    fn try_read_own_task_cell(
2898        &self,
2899        task_id: TaskId,
2900        cell: CellId,
2901        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2902    ) -> Result<TypedCellContent> {
2903        let mut ctx = self.execute_context(turbo_tasks);
2904        let task = ctx.task(task_id, TaskDataCategory::Data);
2905        if let Some(content) = task.get_cell_data(&cell).cloned() {
2906            Ok(CellContent(Some(content)).into_typed(cell.type_id))
2907        } else {
2908            Ok(CellContent(None).into_typed(cell.type_id))
2909        }
2910    }
2911
2912    fn read_task_collectibles(
2913        &self,
2914        task_id: TaskId,
2915        collectible_type: TraitTypeId,
2916        reader_id: Option<TaskId>,
2917        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2918    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2919        let mut ctx = self.execute_context(turbo_tasks);
2920        let mut collectibles = AutoMap::default();
2921        {
2922            let mut task = ctx.task(task_id, TaskDataCategory::All);
2923            // Ensure it's an root node
2924            loop {
2925                let aggregation_number = get_aggregation_number(&task);
2926                if is_root_node(aggregation_number) {
2927                    break;
2928                }
2929                drop(task);
2930                AggregationUpdateQueue::run(
2931                    AggregationUpdateJob::UpdateAggregationNumber {
2932                        task_id,
2933                        base_aggregation_number: u32::MAX,
2934                        distance: None,
2935                    },
2936                    &mut ctx,
2937                );
2938                task = ctx.task(task_id, TaskDataCategory::All);
2939            }
2940            for (collectible, count) in task.iter_aggregated_collectibles() {
2941                if *count > 0 && collectible.collectible_type == collectible_type {
2942                    *collectibles
2943                        .entry(RawVc::TaskCell(
2944                            collectible.cell.task,
2945                            collectible.cell.cell,
2946                        ))
2947                        .or_insert(0) += 1;
2948                }
2949            }
2950            for (&collectible, &count) in task.iter_collectibles() {
2951                if collectible.collectible_type == collectible_type {
2952                    *collectibles
2953                        .entry(RawVc::TaskCell(
2954                            collectible.cell.task,
2955                            collectible.cell.cell,
2956                        ))
2957                        .or_insert(0) += count;
2958                }
2959            }
2960            if let Some(reader_id) = reader_id {
2961                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2962            }
2963        }
2964        if let Some(reader_id) = reader_id {
2965            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2966            let target = CollectiblesRef {
2967                task: task_id,
2968                collectible_type,
2969            };
2970            if !reader.remove_outdated_collectibles_dependencies(&target) {
2971                let _ = reader.add_collectibles_dependencies(target);
2972            }
2973        }
2974        collectibles
2975    }
2976
2977    fn emit_collectible(
2978        &self,
2979        collectible_type: TraitTypeId,
2980        collectible: RawVc,
2981        task_id: TaskId,
2982        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2983    ) {
2984        self.assert_valid_collectible(task_id, collectible);
2985
2986        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2987            panic!("Collectibles need to be resolved");
2988        };
2989        let cell = CellRef {
2990            task: collectible_task,
2991            cell,
2992        };
2993        operation::UpdateCollectibleOperation::run(
2994            task_id,
2995            CollectibleRef {
2996                collectible_type,
2997                cell,
2998            },
2999            1,
3000            self.execute_context(turbo_tasks),
3001        );
3002    }
3003
3004    fn unemit_collectible(
3005        &self,
3006        collectible_type: TraitTypeId,
3007        collectible: RawVc,
3008        count: u32,
3009        task_id: TaskId,
3010        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3011    ) {
3012        self.assert_valid_collectible(task_id, collectible);
3013
3014        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3015            panic!("Collectibles need to be resolved");
3016        };
3017        let cell = CellRef {
3018            task: collectible_task,
3019            cell,
3020        };
3021        operation::UpdateCollectibleOperation::run(
3022            task_id,
3023            CollectibleRef {
3024                collectible_type,
3025                cell,
3026            },
3027            -(i32::try_from(count).unwrap()),
3028            self.execute_context(turbo_tasks),
3029        );
3030    }
3031
3032    fn update_task_cell(
3033        &self,
3034        task_id: TaskId,
3035        cell: CellId,
3036        content: CellContent,
3037        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3038        content_hash: Option<CellHash>,
3039        verification_mode: VerificationMode,
3040        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3041    ) {
3042        operation::UpdateCellOperation::run(
3043            task_id,
3044            cell,
3045            content,
3046            updated_key_hashes,
3047            content_hash,
3048            verification_mode,
3049            self.execute_context(turbo_tasks),
3050        );
3051    }
3052
3053    fn mark_own_task_as_session_dependent(
3054        &self,
3055        task_id: TaskId,
3056        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3057    ) {
3058        if !self.should_track_dependencies() {
3059            // Without dependency tracking we don't need session dependent tasks
3060            return;
3061        }
3062        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3063        let mut ctx = self.execute_context(turbo_tasks);
3064        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3065        let aggregation_number = get_aggregation_number(&task);
3066        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3067            drop(task);
3068            // We want to use a high aggregation number to avoid large aggregation chains for
3069            // session dependent tasks (which change on every run)
3070            AggregationUpdateQueue::run(
3071                AggregationUpdateJob::UpdateAggregationNumber {
3072                    task_id,
3073                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3074                    distance: None,
3075                },
3076                &mut ctx,
3077            );
3078            task = ctx.task(task_id, TaskDataCategory::Meta);
3079        }
3080        if let Some(InProgressState::InProgress(box InProgressStateInner {
3081            session_dependent,
3082            ..
3083        })) = task.get_in_progress_mut()
3084        {
3085            *session_dependent = true;
3086        }
3087    }
3088
3089    fn mark_own_task_as_finished(
3090        &self,
3091        task: TaskId,
3092        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3093    ) {
3094        let mut ctx = self.execute_context(turbo_tasks);
3095        let mut task = ctx.task(task, TaskDataCategory::Data);
3096        if let Some(InProgressState::InProgress(box InProgressStateInner {
3097            marked_as_completed,
3098            ..
3099        })) = task.get_in_progress_mut()
3100        {
3101            *marked_as_completed = true;
3102            // TODO this should remove the dirty state (also check session_dependent)
3103            // but this would break some assumptions for strongly consistent reads.
3104            // Client tasks are not connected yet, so we wouldn't wait for them.
3105            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3106        }
3107    }
3108
3109    fn connect_task(
3110        &self,
3111        task: TaskId,
3112        parent_task: Option<TaskId>,
3113        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3114    ) {
3115        self.assert_not_persistent_calling_transient(parent_task, task, None);
3116        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3117    }
3118
3119    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3120        let task_id = self.transient_task_id_factory.get();
3121        {
3122            let mut task = self.storage.access_mut(task_id);
3123            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3124        }
3125        #[cfg(feature = "verify_aggregation_graph")]
3126        self.root_tasks.lock().insert(task_id);
3127        task_id
3128    }
3129
3130    fn dispose_root_task(
3131        &self,
3132        task_id: TaskId,
3133        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3134    ) {
3135        #[cfg(feature = "verify_aggregation_graph")]
3136        self.root_tasks.lock().remove(&task_id);
3137
3138        let mut ctx = self.execute_context(turbo_tasks);
3139        let mut task = ctx.task(task_id, TaskDataCategory::All);
3140        let is_dirty = task.is_dirty();
3141        let has_dirty_containers = task.has_dirty_containers();
3142        if is_dirty.is_some() || has_dirty_containers {
3143            if let Some(activeness_state) = task.get_activeness_mut() {
3144                // We will finish the task, but it would be removed after the task is done
3145                activeness_state.unset_root_type();
3146                activeness_state.set_active_until_clean();
3147            };
3148        } else if let Some(activeness_state) = task.take_activeness() {
3149            // Technically nobody should be listening to this event, but just in case
3150            // we notify it anyway
3151            activeness_state.all_clean_event.notify(usize::MAX);
3152        }
3153    }
3154
3155    #[cfg(feature = "verify_aggregation_graph")]
3156    fn verify_aggregation_graph(
3157        &self,
3158        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3159        idle: bool,
3160    ) {
3161        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3162            return;
3163        }
3164        use std::{collections::VecDeque, env, io::stdout};
3165
3166        use crate::backend::operation::{get_uppers, is_aggregating_node};
3167
3168        let mut ctx = self.execute_context(turbo_tasks);
3169        let root_tasks = self.root_tasks.lock().clone();
3170
3171        for task_id in root_tasks.into_iter() {
3172            let mut queue = VecDeque::new();
3173            let mut visited = FxHashSet::default();
3174            let mut aggregated_nodes = FxHashSet::default();
3175            let mut collectibles = FxHashMap::default();
3176            let root_task_id = task_id;
3177            visited.insert(task_id);
3178            aggregated_nodes.insert(task_id);
3179            queue.push_back(task_id);
3180            let mut counter = 0;
3181            while let Some(task_id) = queue.pop_front() {
3182                counter += 1;
3183                if counter % 100000 == 0 {
3184                    println!(
3185                        "queue={}, visited={}, aggregated_nodes={}",
3186                        queue.len(),
3187                        visited.len(),
3188                        aggregated_nodes.len()
3189                    );
3190                }
3191                let task = ctx.task(task_id, TaskDataCategory::All);
3192                if idle && !self.is_idle.load(Ordering::Relaxed) {
3193                    return;
3194                }
3195
3196                let uppers = get_uppers(&task);
3197                if task_id != root_task_id
3198                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3199                {
3200                    panic!(
3201                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3202                         {:?})",
3203                        task_id,
3204                        task.get_task_description(),
3205                        uppers
3206                    );
3207                }
3208
3209                for (collectible, _) in task.iter_aggregated_collectibles() {
3210                    collectibles
3211                        .entry(*collectible)
3212                        .or_insert_with(|| (false, Vec::new()))
3213                        .1
3214                        .push(task_id);
3215                }
3216
3217                for (&collectible, &value) in task.iter_collectibles() {
3218                    if value > 0 {
3219                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3220                            *flag = true
3221                        } else {
3222                            panic!(
3223                                "Task {} has a collectible {:?} that is not in any upper task",
3224                                task_id, collectible
3225                            );
3226                        }
3227                    }
3228                }
3229
3230                let is_dirty = task.has_dirty();
3231                let has_dirty_container = task.has_dirty_containers();
3232                let should_be_in_upper = is_dirty || has_dirty_container;
3233
3234                let aggregation_number = get_aggregation_number(&task);
3235                if is_aggregating_node(aggregation_number) {
3236                    aggregated_nodes.insert(task_id);
3237                }
3238                // println!(
3239                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3240                //     ctx.get_task_description(task_id),
3241                //     uppers
3242                // );
3243
3244                for child_id in task.iter_children() {
3245                    // println!("{task_id}: child -> {child_id}");
3246                    if visited.insert(child_id) {
3247                        queue.push_back(child_id);
3248                    }
3249                }
3250                drop(task);
3251
3252                if should_be_in_upper {
3253                    for upper_id in uppers {
3254                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3255                        let in_upper = upper
3256                            .get_aggregated_dirty_containers(&task_id)
3257                            .is_some_and(|&dirty| dirty > 0);
3258                        if !in_upper {
3259                            let containers: Vec<_> = upper
3260                                .iter_aggregated_dirty_containers()
3261                                .map(|(&k, &v)| (k, v))
3262                                .collect();
3263                            let upper_task_desc = upper.get_task_description();
3264                            drop(upper);
3265                            panic!(
3266                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3267                                 ({})\nThese dirty containers are present:\n{:#?}",
3268                                task_id,
3269                                ctx.task(task_id, TaskDataCategory::Data)
3270                                    .get_task_description(),
3271                                upper_id,
3272                                upper_task_desc,
3273                                containers,
3274                            );
3275                        }
3276                    }
3277                }
3278            }
3279
3280            for (collectible, (flag, task_ids)) in collectibles {
3281                if !flag {
3282                    use std::io::Write;
3283                    let mut stdout = stdout().lock();
3284                    writeln!(
3285                        stdout,
3286                        "{:?} that is not emitted in any child task but in these aggregated \
3287                         tasks: {:#?}",
3288                        collectible,
3289                        task_ids
3290                            .iter()
3291                            .map(|t| format!(
3292                                "{t} {}",
3293                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3294                            ))
3295                            .collect::<Vec<_>>()
3296                    )
3297                    .unwrap();
3298
3299                    let task_id = collectible.cell.task;
3300                    let mut queue = {
3301                        let task = ctx.task(task_id, TaskDataCategory::All);
3302                        get_uppers(&task)
3303                    };
3304                    let mut visited = FxHashSet::default();
3305                    for &upper_id in queue.iter() {
3306                        visited.insert(upper_id);
3307                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3308                    }
3309                    while let Some(task_id) = queue.pop() {
3310                        let task = ctx.task(task_id, TaskDataCategory::All);
3311                        let desc = task.get_task_description();
3312                        let aggregated_collectible = task
3313                            .get_aggregated_collectibles(&collectible)
3314                            .copied()
3315                            .unwrap_or_default();
3316                        let uppers = get_uppers(&task);
3317                        drop(task);
3318                        writeln!(
3319                            stdout,
3320                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3321                        )
3322                        .unwrap();
3323                        if task_ids.contains(&task_id) {
3324                            writeln!(
3325                                stdout,
3326                                "Task has an upper connection to an aggregated task that doesn't \
3327                                 reference it. Upper connection is invalid!"
3328                            )
3329                            .unwrap();
3330                        }
3331                        for upper_id in uppers {
3332                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3333                            if !visited.contains(&upper_id) {
3334                                queue.push(upper_id);
3335                            }
3336                        }
3337                    }
3338                    panic!("See stdout for more details");
3339                }
3340            }
3341        }
3342    }
3343
3344    fn assert_not_persistent_calling_transient(
3345        &self,
3346        parent_id: Option<TaskId>,
3347        child_id: TaskId,
3348        cell_id: Option<CellId>,
3349    ) {
3350        if let Some(parent_id) = parent_id
3351            && !parent_id.is_transient()
3352            && child_id.is_transient()
3353        {
3354            self.panic_persistent_calling_transient(
3355                self.debug_get_task_description(parent_id),
3356                self.debug_get_cached_task_type(child_id).as_deref(),
3357                cell_id,
3358            );
3359        }
3360    }
3361
3362    fn panic_persistent_calling_transient(
3363        &self,
3364        parent: String,
3365        child: Option<&CachedTaskType>,
3366        cell_id: Option<CellId>,
3367    ) -> ! {
3368        let transient_reason = if let Some(child) = child {
3369            Cow::Owned(format!(
3370                " The callee is transient because it depends on:\n{}",
3371                self.debug_trace_transient_task(child, cell_id),
3372            ))
3373        } else {
3374            Cow::Borrowed("")
3375        };
3376        panic!(
3377            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3378            parent,
3379            child.map_or("unknown", |t| t.get_name()),
3380            transient_reason,
3381        );
3382    }
3383
3384    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3385        // these checks occur in a potentially hot codepath, but they're cheap
3386        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3387            // This should never happen: The collectible APIs use ResolvedVc
3388            let task_info = if let Some(col_task_ty) = collectible
3389                .try_get_task_id()
3390                .map(|t| self.debug_get_task_description(t))
3391            {
3392                Cow::Owned(format!(" (return type of {col_task_ty})"))
3393            } else {
3394                Cow::Borrowed("")
3395            };
3396            panic!("Collectible{task_info} must be a ResolvedVc")
3397        };
3398        if col_task_id.is_transient() && !task_id.is_transient() {
3399            let transient_reason =
3400                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3401                    Cow::Owned(format!(
3402                        ". The collectible is transient because it depends on:\n{}",
3403                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3404                    ))
3405                } else {
3406                    Cow::Borrowed("")
3407                };
3408            // this should never happen: How would a persistent function get a transient Vc?
3409            panic!(
3410                "Collectible is transient, transient collectibles cannot be emitted from \
3411                 persistent tasks{transient_reason}",
3412            )
3413        }
3414    }
3415}
3416
3417impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3418    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3419        self.0.startup(turbo_tasks);
3420    }
3421
3422    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3423        self.0.stopping();
3424    }
3425
3426    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3427        self.0.stop(turbo_tasks);
3428    }
3429
3430    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3431        self.0.idle_start(turbo_tasks);
3432    }
3433
3434    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3435        self.0.idle_end();
3436    }
3437
3438    fn get_or_create_task(
3439        &self,
3440        native_fn: &'static NativeFunction,
3441        this: Option<RawVc>,
3442        arg: &mut dyn StackDynTaskInputs,
3443        parent_task: Option<TaskId>,
3444        persistence: TaskPersistence,
3445        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3446    ) -> TaskId {
3447        self.0
3448            .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3449    }
3450
3451    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3452        self.0.invalidate_task(task_id, turbo_tasks);
3453    }
3454
3455    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3456        self.0.invalidate_tasks(tasks, turbo_tasks);
3457    }
3458
3459    fn invalidate_tasks_set(
3460        &self,
3461        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3462        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3463    ) {
3464        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3465    }
3466
3467    fn invalidate_serialization(
3468        &self,
3469        task_id: TaskId,
3470        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3471    ) {
3472        self.0.invalidate_serialization(task_id, turbo_tasks);
3473    }
3474
3475    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3476        self.0.task_execution_canceled(task, turbo_tasks)
3477    }
3478
3479    fn try_start_task_execution(
3480        &self,
3481        task_id: TaskId,
3482        priority: TaskPriority,
3483        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3484    ) -> Option<TaskExecutionSpec<'_>> {
3485        self.0
3486            .try_start_task_execution(task_id, priority, turbo_tasks)
3487    }
3488
3489    fn task_execution_completed(
3490        &self,
3491        task_id: TaskId,
3492        result: Result<RawVc, TurboTasksExecutionError>,
3493        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3494        #[cfg(feature = "verify_determinism")] stateful: bool,
3495        has_invalidator: bool,
3496        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3497    ) -> bool {
3498        self.0.task_execution_completed(
3499            task_id,
3500            result,
3501            cell_counters,
3502            #[cfg(feature = "verify_determinism")]
3503            stateful,
3504            has_invalidator,
3505            turbo_tasks,
3506        )
3507    }
3508
3509    type BackendJob = TurboTasksBackendJob;
3510
3511    fn run_backend_job<'a>(
3512        &'a self,
3513        job: Self::BackendJob,
3514        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3515    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3516        self.0.run_backend_job(job, turbo_tasks)
3517    }
3518
3519    fn try_read_task_output(
3520        &self,
3521        task_id: TaskId,
3522        reader: Option<TaskId>,
3523        options: ReadOutputOptions,
3524        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3525    ) -> Result<Result<RawVc, EventListener>> {
3526        self.0
3527            .try_read_task_output(task_id, reader, options, turbo_tasks)
3528    }
3529
3530    fn try_read_task_cell(
3531        &self,
3532        task_id: TaskId,
3533        cell: CellId,
3534        reader: Option<TaskId>,
3535        options: ReadCellOptions,
3536        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3537    ) -> Result<Result<TypedCellContent, EventListener>> {
3538        self.0
3539            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3540    }
3541
3542    fn try_read_own_task_cell(
3543        &self,
3544        task_id: TaskId,
3545        cell: CellId,
3546        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3547    ) -> Result<TypedCellContent> {
3548        self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3549    }
3550
3551    fn read_task_collectibles(
3552        &self,
3553        task_id: TaskId,
3554        collectible_type: TraitTypeId,
3555        reader: Option<TaskId>,
3556        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3557    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3558        self.0
3559            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3560    }
3561
3562    fn emit_collectible(
3563        &self,
3564        collectible_type: TraitTypeId,
3565        collectible: RawVc,
3566        task_id: TaskId,
3567        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3568    ) {
3569        self.0
3570            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3571    }
3572
3573    fn unemit_collectible(
3574        &self,
3575        collectible_type: TraitTypeId,
3576        collectible: RawVc,
3577        count: u32,
3578        task_id: TaskId,
3579        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3580    ) {
3581        self.0
3582            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3583    }
3584
3585    fn update_task_cell(
3586        &self,
3587        task_id: TaskId,
3588        cell: CellId,
3589        content: CellContent,
3590        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3591        content_hash: Option<CellHash>,
3592        verification_mode: VerificationMode,
3593        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3594    ) {
3595        self.0.update_task_cell(
3596            task_id,
3597            cell,
3598            content,
3599            updated_key_hashes,
3600            content_hash,
3601            verification_mode,
3602            turbo_tasks,
3603        );
3604    }
3605
3606    fn mark_own_task_as_finished(
3607        &self,
3608        task_id: TaskId,
3609        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3610    ) {
3611        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3612    }
3613
3614    fn mark_own_task_as_session_dependent(
3615        &self,
3616        task: TaskId,
3617        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3618    ) {
3619        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3620    }
3621
3622    fn connect_task(
3623        &self,
3624        task: TaskId,
3625        parent_task: Option<TaskId>,
3626        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3627    ) {
3628        self.0.connect_task(task, parent_task, turbo_tasks);
3629    }
3630
3631    fn create_transient_task(
3632        &self,
3633        task_type: TransientTaskType,
3634        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3635    ) -> TaskId {
3636        self.0.create_transient_task(task_type)
3637    }
3638
3639    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3640        self.0.dispose_root_task(task_id, turbo_tasks);
3641    }
3642
3643    fn task_statistics(&self) -> &TaskStatisticsApi {
3644        &self.0.task_statistics
3645    }
3646
3647    fn is_tracking_dependencies(&self) -> bool {
3648        self.0.options.dependency_tracking
3649    }
3650
3651    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3652        self.0.get_task_name(task, turbo_tasks)
3653    }
3654}
3655
3656enum DebugTraceTransientTask {
3657    Cached {
3658        task_name: &'static str,
3659        cell_type_id: Option<ValueTypeId>,
3660        cause_self: Option<Box<DebugTraceTransientTask>>,
3661        cause_args: Vec<DebugTraceTransientTask>,
3662    },
3663    /// This representation is used when this task is a duplicate of one previously shown
3664    Collapsed {
3665        task_name: &'static str,
3666        cell_type_id: Option<ValueTypeId>,
3667    },
3668    Uncached {
3669        cell_type_id: Option<ValueTypeId>,
3670    },
3671}
3672
3673impl DebugTraceTransientTask {
3674    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3675        let indent = "    ".repeat(level);
3676        f.write_str(&indent)?;
3677
3678        fn fmt_cell_type_id(
3679            f: &mut fmt::Formatter<'_>,
3680            cell_type_id: Option<ValueTypeId>,
3681        ) -> fmt::Result {
3682            if let Some(ty) = cell_type_id {
3683                write!(
3684                    f,
3685                    " (read cell of type {})",
3686                    get_value_type(ty).ty.global_name
3687                )
3688            } else {
3689                Ok(())
3690            }
3691        }
3692
3693        // write the name and type
3694        match self {
3695            Self::Cached {
3696                task_name,
3697                cell_type_id,
3698                ..
3699            }
3700            | Self::Collapsed {
3701                task_name,
3702                cell_type_id,
3703                ..
3704            } => {
3705                f.write_str(task_name)?;
3706                fmt_cell_type_id(f, *cell_type_id)?;
3707                if matches!(self, Self::Collapsed { .. }) {
3708                    f.write_str(" (collapsed)")?;
3709                }
3710            }
3711            Self::Uncached { cell_type_id } => {
3712                f.write_str("unknown transient task")?;
3713                fmt_cell_type_id(f, *cell_type_id)?;
3714            }
3715        }
3716        f.write_char('\n')?;
3717
3718        // write any extra "cause" information we might have
3719        if let Self::Cached {
3720            cause_self,
3721            cause_args,
3722            ..
3723        } = self
3724        {
3725            if let Some(c) = cause_self {
3726                writeln!(f, "{indent}  self:")?;
3727                c.fmt_indented(f, level + 1)?;
3728            }
3729            if !cause_args.is_empty() {
3730                writeln!(f, "{indent}  args:")?;
3731                for c in cause_args {
3732                    c.fmt_indented(f, level + 1)?;
3733                }
3734            }
3735        }
3736        Ok(())
3737    }
3738}
3739
3740impl fmt::Display for DebugTraceTransientTask {
3741    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3742        self.fmt_indented(f, 0)
3743    }
3744}
3745
3746// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3747fn far_future() -> Instant {
3748    // Roughly 30 years from now.
3749    // API does not provide a way to obtain max `Instant`
3750    // or convert specific date in the future to instant.
3751    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3752    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3753}
3754
3755/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3756/// buffer.
3757/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3758/// resulting buffer sizes.
3759///
3760/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3761/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3762/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3763/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3764/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3765/// spurious `Result` threading throughout the encode path.
3766fn encode_task_data(
3767    task: TaskId,
3768    data: &TaskStorage,
3769    category: SpecificTaskDataCategory,
3770    scratch_buffer: &mut TurboBincodeBuffer,
3771) -> Result<TurboBincodeBuffer> {
3772    scratch_buffer.clear();
3773    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3774    data.encode(category, &mut encoder)?;
3775
3776    if cfg!(feature = "verify_serialization") {
3777        TaskStorage::new()
3778            .decode(
3779                category,
3780                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3781            )
3782            .with_context(|| {
3783                format!(
3784                    "expected to be able to decode serialized data for '{category:?}' information \
3785                     for {task}"
3786                )
3787            })?;
3788    }
3789    Ok(SmallVec::from_slice(scratch_buffer))
3790}