Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7    borrow::Cow,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    pin::Pin,
13    sync::{
14        Arc, LazyLock,
15        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16    },
17    time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32    TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33    backend::{
34        Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
35        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37        VerificationMode,
38    },
39    event::{Event, EventDescription, EventListener},
40    message_queue::{TimingEvent, TraceEvent},
41    registry::get_value_type,
42    scope::scope_and_block,
43    task_statistics::TaskStatisticsApi,
44    trace::TraceRawVcs,
45    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49    operation::AnyOperation,
50    storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55    backend::{
56        operation::{
57            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
59            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
60            connect_children, get_aggregation_number, get_uppers, is_root_node,
61            make_task_dirty_internal, prepare_new_children,
62        },
63        storage::Storage,
64        storage_schema::{TaskStorage, TaskStorageAccessors},
65    },
66    backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
67    data::{
68        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69        InProgressState, InProgressStateInner, OutputValue, TransientTask,
70    },
71    error::TaskError,
72    utils::{
73        arc_or_owned::ArcOrOwned,
74        dash_map_drop_contents::drop_contents,
75        dash_map_raw_entry::{RawEntry, raw_entry},
76        ptr_eq_arc::PtrEqArc,
77        shard_amount::compute_shard_amount,
78    },
79};
80
81/// Threshold for parallelizing making dependent tasks dirty.
82/// If the number of dependent tasks exceeds this threshold,
83/// the operation will be parallelized.
84const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
85
86const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
87
88/// Configurable idle timeout for snapshot persistence.
89/// Defaults to 2 seconds if not set or if the value is invalid.
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92        .ok()
93        .and_then(|v| v.parse::<u64>().ok())
94        .map(Duration::from_millis)
95        .unwrap_or(Duration::from_secs(2))
96});
97
98struct SnapshotRequest {
99    snapshot_requested: bool,
100    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
101}
102
103impl SnapshotRequest {
104    fn new() -> Self {
105        Self {
106            snapshot_requested: false,
107            suspended_operations: FxHashSet::default(),
108        }
109    }
110}
111
112pub enum StorageMode {
113    /// Queries the storage for cache entries that don't exist locally.
114    ReadOnly,
115    /// Queries the storage for cache entries that don't exist locally.
116    /// Regularly pushes changes to the backing storage.
117    ReadWrite,
118    /// Queries the storage for cache entries that don't exist locally.
119    /// On shutdown, pushes all changes to the backing storage.
120    ReadWriteOnShutdown,
121}
122
123pub struct BackendOptions {
124    /// Enables dependency tracking.
125    ///
126    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
127    /// forever.
128    pub dependency_tracking: bool,
129
130    /// Enables active tracking.
131    ///
132    /// Automatically disabled when `dependency_tracking` is disabled.
133    ///
134    /// When disabled: All tasks are considered as active.
135    pub active_tracking: bool,
136
137    /// Enables the backing storage.
138    pub storage_mode: Option<StorageMode>,
139
140    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
141    /// datastructures. If `None`, it will use the available parallelism.
142    pub num_workers: Option<usize>,
143
144    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
145    pub small_preallocation: bool,
146}
147
148impl Default for BackendOptions {
149    fn default() -> Self {
150        Self {
151            dependency_tracking: true,
152            active_tracking: true,
153            storage_mode: Some(StorageMode::ReadWrite),
154            num_workers: None,
155            small_preallocation: false,
156        }
157    }
158}
159
160pub enum TurboTasksBackendJob {
161    InitialSnapshot,
162    FollowUpSnapshot,
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167struct TurboTasksBackendInner<B: BackingStorage> {
168    options: BackendOptions,
169
170    start_time: Instant,
171
172    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
173    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
174
175    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
176
177    storage: Storage,
178
179    /// Number of executing operations + Highest bit is set when snapshot is
180    /// requested. When that bit is set, operations should pause until the
181    /// snapshot is completed. When the bit is set and in progress counter
182    /// reaches zero, `operations_completed_when_snapshot_requested` is
183    /// triggered.
184    in_progress_operations: AtomicUsize,
185
186    snapshot_request: Mutex<SnapshotRequest>,
187    /// Condition Variable that is triggered when `in_progress_operations`
188    /// reaches zero while snapshot is requested. All operations are either
189    /// completed or suspended.
190    operations_suspended: Condvar,
191    /// Condition Variable that is triggered when a snapshot is completed and
192    /// operations can continue.
193    snapshot_completed: Condvar,
194    /// The timestamp of the last started snapshot since [`Self::start_time`].
195    last_snapshot: AtomicU64,
196
197    stopping: AtomicBool,
198    stopping_event: Event,
199    idle_start_event: Event,
200    idle_end_event: Event,
201    #[cfg(feature = "verify_aggregation_graph")]
202    is_idle: AtomicBool,
203
204    task_statistics: TaskStatisticsApi,
205
206    backing_storage: B,
207
208    #[cfg(feature = "verify_aggregation_graph")]
209    root_tasks: Mutex<FxHashSet<TaskId>>,
210}
211
212impl<B: BackingStorage> TurboTasksBackend<B> {
213    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
214        Self(Arc::new(TurboTasksBackendInner::new(
215            options,
216            backing_storage,
217        )))
218    }
219
220    pub fn backing_storage(&self) -> &B {
221        &self.0.backing_storage
222    }
223}
224
225impl<B: BackingStorage> TurboTasksBackendInner<B> {
226    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
227        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
228        if !options.dependency_tracking {
229            options.active_tracking = false;
230        }
231        let small_preallocation = options.small_preallocation;
232        let next_task_id = backing_storage
233            .next_free_task_id()
234            .expect("Failed to get task id");
235        Self {
236            options,
237            start_time: Instant::now(),
238            persisted_task_id_factory: IdFactoryWithReuse::new(
239                next_task_id,
240                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
241            ),
242            transient_task_id_factory: IdFactoryWithReuse::new(
243                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
244                TaskId::MAX,
245            ),
246            task_cache: FxDashMap::default(),
247            storage: Storage::new(shard_amount, small_preallocation),
248            in_progress_operations: AtomicUsize::new(0),
249            snapshot_request: Mutex::new(SnapshotRequest::new()),
250            operations_suspended: Condvar::new(),
251            snapshot_completed: Condvar::new(),
252            last_snapshot: AtomicU64::new(0),
253            stopping: AtomicBool::new(false),
254            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
255            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
256            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
257            #[cfg(feature = "verify_aggregation_graph")]
258            is_idle: AtomicBool::new(false),
259            task_statistics: TaskStatisticsApi::default(),
260            backing_storage,
261            #[cfg(feature = "verify_aggregation_graph")]
262            root_tasks: Default::default(),
263        }
264    }
265
266    fn execute_context<'a>(
267        &'a self,
268        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
269    ) -> impl ExecuteContext<'a> {
270        ExecuteContextImpl::new(self, turbo_tasks)
271    }
272
273    fn suspending_requested(&self) -> bool {
274        self.should_persist()
275            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
276    }
277
278    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
279        #[cold]
280        fn operation_suspend_point_cold<B: BackingStorage>(
281            this: &TurboTasksBackendInner<B>,
282            suspend: impl FnOnce() -> AnyOperation,
283        ) {
284            let operation = Arc::new(suspend());
285            let mut snapshot_request = this.snapshot_request.lock();
286            if snapshot_request.snapshot_requested {
287                snapshot_request
288                    .suspended_operations
289                    .insert(operation.clone().into());
290                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
291                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
292                if value == SNAPSHOT_REQUESTED_BIT {
293                    this.operations_suspended.notify_all();
294                }
295                this.snapshot_completed
296                    .wait_while(&mut snapshot_request, |snapshot_request| {
297                        snapshot_request.snapshot_requested
298                    });
299                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
300                snapshot_request
301                    .suspended_operations
302                    .remove(&operation.into());
303            }
304        }
305
306        if self.suspending_requested() {
307            operation_suspend_point_cold(self, suspend);
308        }
309    }
310
311    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
312        if !self.should_persist() {
313            return OperationGuard { backend: None };
314        }
315        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
317            let mut snapshot_request = self.snapshot_request.lock();
318            if snapshot_request.snapshot_requested {
319                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
320                if value == SNAPSHOT_REQUESTED_BIT {
321                    self.operations_suspended.notify_all();
322                }
323                self.snapshot_completed
324                    .wait_while(&mut snapshot_request, |snapshot_request| {
325                        snapshot_request.snapshot_requested
326                    });
327                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
328            }
329        }
330        OperationGuard {
331            backend: Some(self),
332        }
333    }
334
335    fn should_persist(&self) -> bool {
336        matches!(
337            self.options.storage_mode,
338            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
339        )
340    }
341
342    fn should_restore(&self) -> bool {
343        self.options.storage_mode.is_some()
344    }
345
346    fn should_track_dependencies(&self) -> bool {
347        self.options.dependency_tracking
348    }
349
350    fn should_track_activeness(&self) -> bool {
351        self.options.active_tracking
352    }
353
354    fn track_cache_hit(&self, task_type: &CachedTaskType) {
355        self.task_statistics
356            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
357    }
358
359    fn track_cache_miss(&self, task_type: &CachedTaskType) {
360        self.task_statistics
361            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
362    }
363
364    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
365    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
366    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
367    /// references for lazy name resolution.
368    fn task_error_to_turbo_tasks_execution_error(
369        &self,
370        error: &TaskError,
371        ctx: &mut impl ExecuteContext<'_>,
372    ) -> TurboTasksExecutionError {
373        match error {
374            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
375            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
376                message: item.message.clone(),
377                source: item
378                    .source
379                    .as_ref()
380                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
381            })),
382            TaskError::LocalTaskContext(local_task_context) => {
383                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
384                    name: local_task_context.name.clone(),
385                    source: local_task_context
386                        .source
387                        .as_ref()
388                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
389                }))
390            }
391            TaskError::TaskChain(chain) => {
392                let task_id = chain.last().unwrap();
393                let error = {
394                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
395                    if let Some(OutputValue::Error(error)) = task.get_output() {
396                        Some(error.clone())
397                    } else {
398                        None
399                    }
400                };
401                let error = error.map_or_else(
402                    || {
403                        // Eventual consistency will cause errors to no longer be available
404                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
405                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
406                                "Error no longer available",
407                            )),
408                            location: None,
409                        }))
410                    },
411                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
412                );
413                let mut current_error = error;
414                for &task_id in chain.iter().rev() {
415                    current_error =
416                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
417                            task_id,
418                            source: Some(current_error),
419                            turbo_tasks: ctx.turbo_tasks(),
420                        }));
421                }
422                current_error
423            }
424        }
425    }
426}
427
428pub(crate) struct OperationGuard<'a, B: BackingStorage> {
429    backend: Option<&'a TurboTasksBackendInner<B>>,
430}
431
432impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
433    fn drop(&mut self) {
434        if let Some(backend) = self.backend {
435            let fetch_sub = backend
436                .in_progress_operations
437                .fetch_sub(1, Ordering::AcqRel);
438            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
439                backend.operations_suspended.notify_all();
440            }
441        }
442    }
443}
444
445/// Intermediate result of step 1 of task execution completion.
446struct TaskExecutionCompletePrepareResult {
447    pub new_children: FxHashSet<TaskId>,
448    pub is_now_immutable: bool,
449    #[cfg(feature = "verify_determinism")]
450    pub no_output_set: bool,
451    pub new_output: Option<OutputValue>,
452    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
453    pub is_recomputation: bool,
454}
455
456// Operations
457impl<B: BackingStorage> TurboTasksBackendInner<B> {
458    fn connect_child(
459        &self,
460        parent_task: Option<TaskId>,
461        child_task: TaskId,
462        task_type: Option<ArcOrOwned<CachedTaskType>>,
463        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
464    ) {
465        operation::ConnectChildOperation::run(
466            parent_task,
467            child_task,
468            task_type,
469            self.execute_context(turbo_tasks),
470        );
471    }
472
473    fn try_read_task_output(
474        self: &Arc<Self>,
475        task_id: TaskId,
476        reader: Option<TaskId>,
477        options: ReadOutputOptions,
478        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
479    ) -> Result<Result<RawVc, EventListener>> {
480        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
481
482        let mut ctx = self.execute_context(turbo_tasks);
483        let need_reader_task = if self.should_track_dependencies()
484            && !matches!(options.tracking, ReadTracking::Untracked)
485            && reader.is_some_and(|reader_id| reader_id != task_id)
486            && let Some(reader_id) = reader
487            && reader_id != task_id
488        {
489            Some(reader_id)
490        } else {
491            None
492        };
493        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
494            // Having a task_pair here is not optimal, but otherwise this would lead to a race
495            // condition. See below.
496            // TODO(sokra): solve that in a more performant way.
497            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
498            (task, Some(reader))
499        } else {
500            (ctx.task(task_id, TaskDataCategory::All), None)
501        };
502
503        fn listen_to_done_event(
504            reader_description: Option<EventDescription>,
505            tracking: ReadTracking,
506            done_event: &Event,
507        ) -> EventListener {
508            done_event.listen_with_note(move || {
509                move || {
510                    if let Some(reader_description) = reader_description.as_ref() {
511                        format!(
512                            "try_read_task_output from {} ({})",
513                            reader_description, tracking
514                        )
515                    } else {
516                        format!("try_read_task_output ({})", tracking)
517                    }
518                }
519            })
520        }
521
522        fn check_in_progress(
523            task: &impl TaskGuard,
524            reader_description: Option<EventDescription>,
525            tracking: ReadTracking,
526        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
527        {
528            match task.get_in_progress() {
529                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
530                    listen_to_done_event(reader_description, tracking, done_event),
531                ))),
532                Some(InProgressState::InProgress(box InProgressStateInner {
533                    done_event, ..
534                })) => Some(Ok(Err(listen_to_done_event(
535                    reader_description,
536                    tracking,
537                    done_event,
538                )))),
539                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
540                    "{} was canceled",
541                    task.get_task_description()
542                ))),
543                None => None,
544            }
545        }
546
547        if matches!(options.consistency, ReadConsistency::Strong) {
548            // Ensure it's an root node
549            loop {
550                let aggregation_number = get_aggregation_number(&task);
551                if is_root_node(aggregation_number) {
552                    break;
553                }
554                drop(task);
555                drop(reader_task);
556                {
557                    let _span = tracing::trace_span!(
558                        "make root node for strongly consistent read",
559                        %task_id
560                    )
561                    .entered();
562                    AggregationUpdateQueue::run(
563                        AggregationUpdateJob::UpdateAggregationNumber {
564                            task_id,
565                            base_aggregation_number: u32::MAX,
566                            distance: None,
567                        },
568                        &mut ctx,
569                    );
570                }
571                (task, reader_task) = if let Some(reader_id) = need_reader_task {
572                    // TODO(sokra): see comment above
573                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
574                    (task, Some(reader))
575                } else {
576                    (ctx.task(task_id, TaskDataCategory::All), None)
577                }
578            }
579
580            let is_dirty = task.is_dirty();
581
582            // Check the dirty count of the root node
583            let has_dirty_containers = task.has_dirty_containers();
584            if has_dirty_containers || is_dirty.is_some() {
585                let activeness = task.get_activeness_mut();
586                let mut task_ids_to_schedule: Vec<_> = Vec::new();
587                // When there are dirty task, subscribe to the all_clean_event
588                let activeness = if let Some(activeness) = activeness {
589                    // This makes sure all tasks stay active and this task won't stale.
590                    // active_until_clean is automatically removed when this
591                    // task is clean.
592                    activeness.set_active_until_clean();
593                    activeness
594                } else {
595                    // If we don't have a root state, add one. This also makes sure all tasks stay
596                    // active and this task won't stale. active_until_clean
597                    // is automatically removed when this task is clean.
598                    if ctx.should_track_activeness() {
599                        // A newly added Activeness need to make sure to schedule the tasks
600                        task_ids_to_schedule = task.dirty_containers().collect();
601                        task_ids_to_schedule.push(task_id);
602                    }
603                    let activeness =
604                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
605                    activeness.set_active_until_clean();
606                    activeness
607                };
608                let listener = activeness.all_clean_event.listen_with_note(move || {
609                    let this = self.clone();
610                    let tt = turbo_tasks.pin();
611                    move || {
612                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
613                        let mut ctx = this.execute_context(tt);
614                        let mut visited = FxHashSet::default();
615                        fn indent(s: &str) -> String {
616                            s.split_inclusive('\n')
617                                .flat_map(|line: &str| ["  ", line].into_iter())
618                                .collect::<String>()
619                        }
620                        fn get_info(
621                            ctx: &mut impl ExecuteContext<'_>,
622                            task_id: TaskId,
623                            parent_and_count: Option<(TaskId, i32)>,
624                            visited: &mut FxHashSet<TaskId>,
625                        ) -> String {
626                            let task = ctx.task(task_id, TaskDataCategory::All);
627                            let is_dirty = task.is_dirty();
628                            let in_progress =
629                                task.get_in_progress()
630                                    .map_or("not in progress", |p| match p {
631                                        InProgressState::InProgress(_) => "in progress",
632                                        InProgressState::Scheduled { .. } => "scheduled",
633                                        InProgressState::Canceled => "canceled",
634                                    });
635                            let activeness = task.get_activeness().map_or_else(
636                                || "not active".to_string(),
637                                |activeness| format!("{activeness:?}"),
638                            );
639                            let aggregation_number = get_aggregation_number(&task);
640                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
641                            {
642                                let uppers = get_uppers(&task);
643                                !uppers.contains(&parent_task_id)
644                            } else {
645                                false
646                            };
647
648                            // Check the dirty count of the root node
649                            let has_dirty_containers = task.has_dirty_containers();
650
651                            let task_description = task.get_task_description();
652                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
653                                format!(", dirty({parent_priority})")
654                            } else {
655                                String::new()
656                            };
657                            let has_dirty_containers_label = if has_dirty_containers {
658                                ", dirty containers"
659                            } else {
660                                ""
661                            };
662                            let count = if let Some((_, count)) = parent_and_count {
663                                format!(" {count}")
664                            } else {
665                                String::new()
666                            };
667                            let mut info = format!(
668                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
669                                 {in_progress}, \
670                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
671                            );
672                            let children: Vec<_> = task.dirty_containers_with_count().collect();
673                            drop(task);
674
675                            if missing_upper {
676                                info.push_str("\n  ERROR: missing upper connection");
677                            }
678
679                            if has_dirty_containers || !children.is_empty() {
680                                writeln!(info, "\n  dirty tasks:").unwrap();
681
682                                for (child_task_id, count) in children {
683                                    let task_description = ctx
684                                        .task(child_task_id, TaskDataCategory::Data)
685                                        .get_task_description();
686                                    if visited.insert(child_task_id) {
687                                        let child_info = get_info(
688                                            ctx,
689                                            child_task_id,
690                                            Some((task_id, count)),
691                                            visited,
692                                        );
693                                        info.push_str(&indent(&child_info));
694                                        if !info.ends_with('\n') {
695                                            info.push('\n');
696                                        }
697                                    } else {
698                                        writeln!(
699                                            info,
700                                            "  {child_task_id} {task_description} {count} \
701                                             (already visited)"
702                                        )
703                                        .unwrap();
704                                    }
705                                }
706                            }
707                            info
708                        }
709                        let info = get_info(&mut ctx, task_id, None, &mut visited);
710                        format!(
711                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
712                        )
713                    }
714                });
715                drop(reader_task);
716                drop(task);
717                if !task_ids_to_schedule.is_empty() {
718                    let mut queue = AggregationUpdateQueue::new();
719                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
720                    queue.execute(&mut ctx);
721                }
722
723                return Ok(Err(listener));
724            }
725        }
726
727        let reader_description = reader_task
728            .as_ref()
729            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
730        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
731        {
732            return value;
733        }
734
735        if let Some(output) = task.get_output() {
736            let result = match output {
737                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
738                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
739                OutputValue::Error(error) => Err(error.clone()),
740            };
741            if let Some(mut reader_task) = reader_task.take()
742                && options.tracking.should_track(result.is_err())
743                && (!task.immutable() || cfg!(feature = "verify_immutable"))
744            {
745                #[cfg(feature = "trace_task_output_dependencies")]
746                let _span = tracing::trace_span!(
747                    "add output dependency",
748                    task = %task_id,
749                    dependent_task = ?reader
750                )
751                .entered();
752                let mut queue = LeafDistanceUpdateQueue::new();
753                let reader = reader.unwrap();
754                if task.add_output_dependent(reader) {
755                    // Ensure that dependent leaf distance is strictly monotonic increasing
756                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
757                    let reader_leaf_distance =
758                        reader_task.get_leaf_distance().copied().unwrap_or_default();
759                    if reader_leaf_distance.distance <= leaf_distance.distance {
760                        queue.push(
761                            reader,
762                            leaf_distance.distance,
763                            leaf_distance.max_distance_in_buffer,
764                        );
765                    }
766                }
767
768                drop(task);
769
770                // Note: We use `task_pair` earlier to lock the task and its reader at the same
771                // time. If we didn't and just locked the reader here, an invalidation could occur
772                // between grabbing the locks. If that happened, and if the task is "outdated" or
773                // doesn't have the dependency edge yet, the invalidation would be lost.
774
775                if !reader_task.remove_outdated_output_dependencies(&task_id) {
776                    let _ = reader_task.add_output_dependencies(task_id);
777                }
778                drop(reader_task);
779
780                queue.execute(&mut ctx);
781            } else {
782                drop(task);
783            }
784
785            return result.map_err(|error| {
786                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
787                    .with_task_context(task_id, turbo_tasks.pin())
788                    .into()
789            });
790        }
791        drop(reader_task);
792
793        let note = EventDescription::new(|| {
794            move || {
795                if let Some(reader) = reader_description.as_ref() {
796                    format!("try_read_task_output (recompute) from {reader}",)
797                } else {
798                    "try_read_task_output (recompute, untracked)".to_string()
799                }
800            }
801        });
802
803        // Output doesn't exist. We need to schedule the task to compute it.
804        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
805            TaskExecutionReason::OutputNotAvailable,
806            EventDescription::new(|| task.get_task_desc_fn()),
807            note,
808        );
809
810        // It's not possible that the task is InProgress at this point. If it is InProgress {
811        // done: true } it must have Output and would early return.
812        let old = task.set_in_progress(in_progress_state);
813        debug_assert!(old.is_none(), "InProgress already exists");
814        ctx.schedule_task(task, TaskPriority::Initial);
815
816        Ok(Err(listener))
817    }
818
819    fn try_read_task_cell(
820        &self,
821        task_id: TaskId,
822        reader: Option<TaskId>,
823        cell: CellId,
824        options: ReadCellOptions,
825        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
826    ) -> Result<Result<TypedCellContent, EventListener>> {
827        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
828
829        fn add_cell_dependency(
830            task_id: TaskId,
831            mut task: impl TaskGuard,
832            reader: Option<TaskId>,
833            reader_task: Option<impl TaskGuard>,
834            cell: CellId,
835            key: Option<u64>,
836        ) {
837            if let Some(mut reader_task) = reader_task
838                && (!task.immutable() || cfg!(feature = "verify_immutable"))
839            {
840                let reader = reader.unwrap();
841                let _ = task.add_cell_dependents((cell, key, reader));
842                drop(task);
843
844                // Note: We use `task_pair` earlier to lock the task and its reader at the same
845                // time. If we didn't and just locked the reader here, an invalidation could occur
846                // between grabbing the locks. If that happened, and if the task is "outdated" or
847                // doesn't have the dependency edge yet, the invalidation would be lost.
848
849                let target = CellRef {
850                    task: task_id,
851                    cell,
852                };
853                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
854                    let _ = reader_task.add_cell_dependencies((target, key));
855                }
856                drop(reader_task);
857            }
858        }
859
860        let ReadCellOptions {
861            is_serializable_cell_content,
862            tracking,
863            final_read_hint,
864        } = options;
865
866        let mut ctx = self.execute_context(turbo_tasks);
867        let (mut task, reader_task) = if self.should_track_dependencies()
868            && !matches!(tracking, ReadCellTracking::Untracked)
869            && let Some(reader_id) = reader
870            && reader_id != task_id
871        {
872            // Having a task_pair here is not optimal, but otherwise this would lead to a race
873            // condition. See below.
874            // TODO(sokra): solve that in a more performant way.
875            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
876            (task, Some(reader))
877        } else {
878            (ctx.task(task_id, TaskDataCategory::All), None)
879        };
880
881        let content = if final_read_hint {
882            task.remove_cell_data(is_serializable_cell_content, cell)
883        } else {
884            task.get_cell_data(is_serializable_cell_content, cell)
885        };
886        if let Some(content) = content {
887            if tracking.should_track(false) {
888                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
889            }
890            return Ok(Ok(TypedCellContent(
891                cell.type_id,
892                CellContent(Some(content.reference)),
893            )));
894        }
895
896        let in_progress = task.get_in_progress();
897        if matches!(
898            in_progress,
899            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
900        ) {
901            return Ok(Err(self
902                .listen_to_cell(&mut task, task_id, &reader_task, cell)
903                .0));
904        }
905        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
906
907        // Check cell index range (cell might not exist at all)
908        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
909        let Some(max_id) = max_id else {
910            let task_desc = task.get_task_description();
911            if tracking.should_track(true) {
912                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
913            }
914            bail!(
915                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
916            );
917        };
918        if cell.index >= max_id {
919            let task_desc = task.get_task_description();
920            if tracking.should_track(true) {
921                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
922            }
923            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
924        }
925
926        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
927        // task to get the cell content.
928
929        // Bail early if the task was cancelled — no point in registering a listener
930        // on a task that won't execute again.
931        if is_cancelled {
932            bail!("{} was canceled", task.get_task_description());
933        }
934
935        // Listen to the cell and potentially schedule the task
936        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
937        drop(reader_task);
938        if !new_listener {
939            return Ok(Err(listener));
940        }
941
942        let _span = tracing::trace_span!(
943            "recomputation",
944            cell_type = get_value_type(cell.type_id).ty.global_name,
945            cell_index = cell.index
946        )
947        .entered();
948
949        let _ = task.add_scheduled(
950            TaskExecutionReason::CellNotAvailable,
951            EventDescription::new(|| task.get_task_desc_fn()),
952        );
953        ctx.schedule_task(task, TaskPriority::Initial);
954
955        Ok(Err(listener))
956    }
957
958    fn listen_to_cell(
959        &self,
960        task: &mut impl TaskGuard,
961        task_id: TaskId,
962        reader_task: &Option<impl TaskGuard>,
963        cell: CellId,
964    ) -> (EventListener, bool) {
965        let note = || {
966            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
967            move || {
968                if let Some(reader_desc) = reader_desc.as_ref() {
969                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
970                } else {
971                    "try_read_task_cell (in progress, untracked)".to_string()
972                }
973            }
974        };
975        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
976            // Someone else is already computing the cell
977            let listener = in_progress.event.listen_with_note(note);
978            return (listener, false);
979        }
980        let in_progress = InProgressCellState::new(task_id, cell);
981        let listener = in_progress.event.listen_with_note(note);
982        let old = task.insert_in_progress_cells(cell, in_progress);
983        debug_assert!(old.is_none(), "InProgressCell already exists");
984        (listener, true)
985    }
986
987    fn snapshot_and_persist(
988        &self,
989        parent_span: Option<tracing::Id>,
990        reason: &str,
991        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
992    ) -> Result<(Instant, bool), anyhow::Error> {
993        let snapshot_span =
994            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
995                .entered();
996        let start = Instant::now();
997        // SystemTime for wall-clock timestamps in trace events (milliseconds
998        // since epoch). Instant is monotonic but has no defined epoch, so it
999        // can't be used for cross-process trace correlation.
1000        let wall_start = SystemTime::now();
1001        debug_assert!(self.should_persist());
1002
1003        let suspended_operations;
1004        {
1005            let _span = tracing::info_span!("blocking").entered();
1006            let mut snapshot_request = self.snapshot_request.lock();
1007            snapshot_request.snapshot_requested = true;
1008            let active_operations = self
1009                .in_progress_operations
1010                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1011            if active_operations != 0 {
1012                self.operations_suspended
1013                    .wait_while(&mut snapshot_request, |_| {
1014                        self.in_progress_operations.load(Ordering::Relaxed)
1015                            != SNAPSHOT_REQUESTED_BIT
1016                    });
1017            }
1018            suspended_operations = snapshot_request
1019                .suspended_operations
1020                .iter()
1021                .map(|op| op.arc().clone())
1022                .collect::<Vec<_>>();
1023        }
1024        // Enter snapshot mode, which atomically reads and resets the modified count.
1025        // Checking after start_snapshot ensures no concurrent increments can race.
1026        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
1027        let mut snapshot_request = self.snapshot_request.lock();
1028        snapshot_request.snapshot_requested = false;
1029        self.in_progress_operations
1030            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1031        self.snapshot_completed.notify_all();
1032        let snapshot_time = Instant::now();
1033        drop(snapshot_request);
1034
1035        if !has_modifications {
1036            // No tasks modified since the last snapshot — drop the guard (which
1037            // calls end_snapshot) and skip the expensive O(N) scan.
1038            drop(snapshot_guard);
1039            return Ok((start, false));
1040        }
1041
1042        #[cfg(feature = "print_cache_item_size")]
1043        #[derive(Default)]
1044        struct TaskCacheStats {
1045            data: usize,
1046            #[cfg(feature = "print_cache_item_size_with_compressed")]
1047            data_compressed: usize,
1048            data_count: usize,
1049            meta: usize,
1050            #[cfg(feature = "print_cache_item_size_with_compressed")]
1051            meta_compressed: usize,
1052            meta_count: usize,
1053            upper_count: usize,
1054            collectibles_count: usize,
1055            aggregated_collectibles_count: usize,
1056            children_count: usize,
1057            followers_count: usize,
1058            collectibles_dependents_count: usize,
1059            aggregated_dirty_containers_count: usize,
1060            output_size: usize,
1061        }
1062        /// Formats a byte size, optionally including the compressed size when the
1063        /// `print_cache_item_size_with_compressed` feature is enabled.
1064        #[cfg(feature = "print_cache_item_size")]
1065        struct FormatSizes {
1066            size: usize,
1067            #[cfg(feature = "print_cache_item_size_with_compressed")]
1068            compressed_size: usize,
1069        }
1070        #[cfg(feature = "print_cache_item_size")]
1071        impl std::fmt::Display for FormatSizes {
1072            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1073                use turbo_tasks::util::FormatBytes;
1074                #[cfg(feature = "print_cache_item_size_with_compressed")]
1075                {
1076                    write!(
1077                        f,
1078                        "{} ({} compressed)",
1079                        FormatBytes(self.size),
1080                        FormatBytes(self.compressed_size)
1081                    )
1082                }
1083                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1084                {
1085                    write!(f, "{}", FormatBytes(self.size))
1086                }
1087            }
1088        }
1089        #[cfg(feature = "print_cache_item_size")]
1090        impl TaskCacheStats {
1091            #[cfg(feature = "print_cache_item_size_with_compressed")]
1092            fn compressed_size(data: &[u8]) -> Result<usize> {
1093                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1094                    data,
1095                    &mut Vec::new(),
1096                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1097                )?)
1098            }
1099
1100            fn add_data(&mut self, data: &[u8]) {
1101                self.data += data.len();
1102                #[cfg(feature = "print_cache_item_size_with_compressed")]
1103                {
1104                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1105                }
1106                self.data_count += 1;
1107            }
1108
1109            fn add_meta(&mut self, data: &[u8]) {
1110                self.meta += data.len();
1111                #[cfg(feature = "print_cache_item_size_with_compressed")]
1112                {
1113                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1114                }
1115                self.meta_count += 1;
1116            }
1117
1118            fn add_counts(&mut self, storage: &TaskStorage) {
1119                let counts = storage.meta_counts();
1120                self.upper_count += counts.upper;
1121                self.collectibles_count += counts.collectibles;
1122                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1123                self.children_count += counts.children;
1124                self.followers_count += counts.followers;
1125                self.collectibles_dependents_count += counts.collectibles_dependents;
1126                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1127                if let Some(output) = storage.get_output() {
1128                    use turbo_bincode::turbo_bincode_encode;
1129
1130                    self.output_size += turbo_bincode_encode(&output)
1131                        .map(|data| data.len())
1132                        .unwrap_or(0);
1133                }
1134            }
1135
1136            /// Returns the task name used as the stats grouping key.
1137            fn task_name(storage: &TaskStorage) -> String {
1138                storage
1139                    .get_persistent_task_type()
1140                    .map(|t| t.to_string())
1141                    .unwrap_or_else(|| "<unknown>".to_string())
1142            }
1143
1144            /// Returns the primary sort key: compressed total when
1145            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1146            fn sort_key(&self) -> usize {
1147                #[cfg(feature = "print_cache_item_size_with_compressed")]
1148                {
1149                    self.data_compressed + self.meta_compressed
1150                }
1151                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1152                {
1153                    self.data + self.meta
1154                }
1155            }
1156
1157            fn format_total(&self) -> FormatSizes {
1158                FormatSizes {
1159                    size: self.data + self.meta,
1160                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1161                    compressed_size: self.data_compressed + self.meta_compressed,
1162                }
1163            }
1164
1165            fn format_data(&self) -> FormatSizes {
1166                FormatSizes {
1167                    size: self.data,
1168                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1169                    compressed_size: self.data_compressed,
1170                }
1171            }
1172
1173            fn format_avg_data(&self) -> FormatSizes {
1174                FormatSizes {
1175                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1176                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1177                    compressed_size: self
1178                        .data_compressed
1179                        .checked_div(self.data_count)
1180                        .unwrap_or(0),
1181                }
1182            }
1183
1184            fn format_meta(&self) -> FormatSizes {
1185                FormatSizes {
1186                    size: self.meta,
1187                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1188                    compressed_size: self.meta_compressed,
1189                }
1190            }
1191
1192            fn format_avg_meta(&self) -> FormatSizes {
1193                FormatSizes {
1194                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1195                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1196                    compressed_size: self
1197                        .meta_compressed
1198                        .checked_div(self.meta_count)
1199                        .unwrap_or(0),
1200                }
1201            }
1202        }
1203        #[cfg(feature = "print_cache_item_size")]
1204        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1205            Mutex::new(FxHashMap::default());
1206
1207        // Encode each task's modified categories. We only encode categories with `modified` set,
1208        // meaning the category was actually dirtied. Categories restored from disk but never
1209        // modified don't need re-persisting since the on-disk version is still valid.
1210        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1211        // flags were copied from the live task at snapshot creation time, reflecting which
1212        // categories were dirtied before the snapshot was taken.
1213        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1214            let encode_category = |task_id: TaskId,
1215                                   data: &TaskStorage,
1216                                   category: SpecificTaskDataCategory,
1217                                   buffer: &mut TurboBincodeBuffer|
1218             -> Option<TurboBincodeBuffer> {
1219                match encode_task_data(task_id, data, category, buffer) {
1220                    Ok(encoded) => {
1221                        #[cfg(feature = "print_cache_item_size")]
1222                        {
1223                            let mut stats = task_cache_stats.lock();
1224                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1225                            match category {
1226                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1227                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1228                            }
1229                        }
1230                        Some(encoded)
1231                    }
1232                    Err(err) => {
1233                        panic!(
1234                            "Serializing task {} failed ({:?}): {:?}",
1235                            self.debug_get_task_description(task_id),
1236                            category,
1237                            err
1238                        );
1239                    }
1240                }
1241            };
1242            if task_id.is_transient() {
1243                unreachable!("transient task_ids should never be enqueued to be persisted");
1244            }
1245
1246            let encode_meta = inner.flags.meta_modified();
1247            let encode_data = inner.flags.data_modified();
1248
1249            #[cfg(feature = "print_cache_item_size")]
1250            if encode_data || encode_meta {
1251                task_cache_stats
1252                    .lock()
1253                    .entry(TaskCacheStats::task_name(inner))
1254                    .or_default()
1255                    .add_counts(inner);
1256            }
1257
1258            let meta = if encode_meta {
1259                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1260            } else {
1261                None
1262            };
1263
1264            let data = if encode_data {
1265                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1266            } else {
1267                None
1268            };
1269            let task_type_hash = if inner.flags.new_task() {
1270                let task_type = inner.get_persistent_task_type().expect(
1271                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1272                     creation for persistent tasks uses a single ExecutionContextImpl for \
1273                     creating the task (which sets new_task) and connect_child (which sets \
1274                     persistent_task_type) and take_snapshot waits for all operations to complete \
1275                     or suspend before we start snapshotting.  So task creation will always set \
1276                     the task_type.",
1277                );
1278                Some(compute_task_type_hash(task_type))
1279            } else {
1280                None
1281            };
1282
1283            SnapshotItem {
1284                task_id,
1285                meta,
1286                data,
1287                task_type_hash,
1288            }
1289        };
1290
1291        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1292
1293        drop(snapshot_span);
1294        let snapshot_duration = start.elapsed();
1295        let task_count = task_snapshots.len();
1296
1297        if task_snapshots.is_empty() {
1298            // This should be impossible — if we got here, modified_count was nonzero, and every
1299            // modification that increments the count also failed during encoding.
1300            std::hint::cold_path();
1301            return Ok((snapshot_time, false));
1302        }
1303
1304        let persist_start = Instant::now();
1305        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1306        {
1307            // Tasks were already consumed by take_snapshot, so a future snapshot
1308            // would not re-persist them — returning an error signals to the caller
1309            // that further persist attempts would corrupt the task graph in storage.
1310            self.backing_storage
1311                .save_snapshot(suspended_operations, task_snapshots)?;
1312            #[cfg(feature = "print_cache_item_size")]
1313            {
1314                let mut task_cache_stats = task_cache_stats
1315                    .into_inner()
1316                    .into_iter()
1317                    .collect::<Vec<_>>();
1318                if !task_cache_stats.is_empty() {
1319                    use turbo_tasks::util::FormatBytes;
1320
1321                    use crate::utils::markdown_table::print_markdown_table;
1322
1323                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1324                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1325                    });
1326
1327                    println!(
1328                        "Task cache stats: {}",
1329                        FormatSizes {
1330                            size: task_cache_stats
1331                                .iter()
1332                                .map(|(_, s)| s.data + s.meta)
1333                                .sum::<usize>(),
1334                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1335                            compressed_size: task_cache_stats
1336                                .iter()
1337                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1338                                .sum::<usize>()
1339                        },
1340                    );
1341
1342                    print_markdown_table(
1343                        [
1344                            "Task",
1345                            " Total Size",
1346                            " Data Size",
1347                            " Data Count x Avg",
1348                            " Data Count x Avg",
1349                            " Meta Size",
1350                            " Meta Count x Avg",
1351                            " Meta Count x Avg",
1352                            " Uppers",
1353                            " Coll",
1354                            " Agg Coll",
1355                            " Children",
1356                            " Followers",
1357                            " Coll Deps",
1358                            " Agg Dirty",
1359                            " Output Size",
1360                        ],
1361                        task_cache_stats.iter(),
1362                        |(task_desc, stats)| {
1363                            [
1364                                task_desc.to_string(),
1365                                format!(" {}", stats.format_total()),
1366                                format!(" {}", stats.format_data()),
1367                                format!(" {} x", stats.data_count),
1368                                format!("{}", stats.format_avg_data()),
1369                                format!(" {}", stats.format_meta()),
1370                                format!(" {} x", stats.meta_count),
1371                                format!("{}", stats.format_avg_meta()),
1372                                format!(" {}", stats.upper_count),
1373                                format!(" {}", stats.collectibles_count),
1374                                format!(" {}", stats.aggregated_collectibles_count),
1375                                format!(" {}", stats.children_count),
1376                                format!(" {}", stats.followers_count),
1377                                format!(" {}", stats.collectibles_dependents_count),
1378                                format!(" {}", stats.aggregated_dirty_containers_count),
1379                                format!(" {}", FormatBytes(stats.output_size)),
1380                            ]
1381                        },
1382                    );
1383                }
1384            }
1385        }
1386
1387        let elapsed = start.elapsed();
1388        let persist_duration = persist_start.elapsed();
1389        // avoid spamming the event queue with information about fast operations
1390        if elapsed > Duration::from_secs(10) {
1391            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1392                "Finished writing to filesystem cache".to_string(),
1393                elapsed,
1394            )));
1395        }
1396
1397        let wall_start_ms = wall_start
1398            .duration_since(SystemTime::UNIX_EPOCH)
1399            .unwrap_or_default()
1400            // as_millis_f64 is not stable yet
1401            .as_secs_f64()
1402            * 1000.0;
1403        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1404        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1405            "turbopack-persistence",
1406            wall_start_ms,
1407            wall_end_ms,
1408            vec![
1409                ("reason", serde_json::Value::from(reason)),
1410                (
1411                    "snapshot_duration_ms",
1412                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1413                ),
1414                (
1415                    "persist_duration_ms",
1416                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1417                ),
1418                ("task_count", serde_json::Value::from(task_count)),
1419            ],
1420        )));
1421
1422        Ok((snapshot_time, true))
1423    }
1424
1425    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1426        if self.should_restore() {
1427            // Continue all uncompleted operations
1428            // They can't be interrupted by a snapshot since the snapshotting job has not been
1429            // scheduled yet.
1430            let uncompleted_operations = self
1431                .backing_storage
1432                .uncompleted_operations()
1433                .expect("Failed to get uncompleted operations");
1434            if !uncompleted_operations.is_empty() {
1435                let mut ctx = self.execute_context(turbo_tasks);
1436                for op in uncompleted_operations {
1437                    op.execute(&mut ctx);
1438                }
1439            }
1440        }
1441
1442        // Only when it should write regularly to the storage, we schedule the initial snapshot
1443        // job.
1444        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1445            // Schedule the snapshot job
1446            let _span = trace_span!("persisting background job").entered();
1447            let _span = tracing::info_span!("thread").entered();
1448            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1449        }
1450    }
1451
1452    fn stopping(&self) {
1453        self.stopping.store(true, Ordering::Release);
1454        self.stopping_event.notify(usize::MAX);
1455    }
1456
1457    #[allow(unused_variables)]
1458    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1459        #[cfg(feature = "verify_aggregation_graph")]
1460        {
1461            self.is_idle.store(false, Ordering::Release);
1462            self.verify_aggregation_graph(turbo_tasks, false);
1463        }
1464        if self.should_persist()
1465            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1466        {
1467            eprintln!("Persisting failed during shutdown: {err:?}");
1468        }
1469        drop_contents(&self.task_cache);
1470        self.storage.drop_contents();
1471        if let Err(err) = self.backing_storage.shutdown() {
1472            println!("Shutting down failed: {err}");
1473        }
1474    }
1475
1476    #[allow(unused_variables)]
1477    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1478        self.idle_start_event.notify(usize::MAX);
1479
1480        #[cfg(feature = "verify_aggregation_graph")]
1481        {
1482            use tokio::select;
1483
1484            self.is_idle.store(true, Ordering::Release);
1485            let this = self.clone();
1486            let turbo_tasks = turbo_tasks.pin();
1487            tokio::task::spawn(async move {
1488                select! {
1489                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1490                        // do nothing
1491                    }
1492                    _ = this.idle_end_event.listen() => {
1493                        return;
1494                    }
1495                }
1496                if !this.is_idle.load(Ordering::Relaxed) {
1497                    return;
1498                }
1499                this.verify_aggregation_graph(&*turbo_tasks, true);
1500            });
1501        }
1502    }
1503
1504    fn idle_end(&self) {
1505        #[cfg(feature = "verify_aggregation_graph")]
1506        self.is_idle.store(false, Ordering::Release);
1507        self.idle_end_event.notify(usize::MAX);
1508    }
1509
1510    fn get_or_create_persistent_task(
1511        &self,
1512        task_type: CachedTaskType,
1513        parent_task: Option<TaskId>,
1514        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1515    ) -> TaskId {
1516        let is_root = task_type.native_fn.is_root;
1517
1518        // First check if the task exists in the cache which only uses a read lock
1519        // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
1520        // before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
1521        if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1522            self.track_cache_hit(&task_type);
1523            self.connect_child(
1524                parent_task,
1525                task_id,
1526                Some(ArcOrOwned::Owned(task_type)),
1527                turbo_tasks,
1528            );
1529            return task_id;
1530        }
1531
1532        // Create a single ExecuteContext for both lookup and connect_child
1533        let mut ctx = self.execute_context(turbo_tasks);
1534
1535        let mut is_new = false;
1536        let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1537            // Task exists in backing storage
1538            // So we only need to insert it into the in-memory cache
1539            self.track_cache_hit(&task_type);
1540            let task_type = match raw_entry(&self.task_cache, &task_type) {
1541                RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1542                RawEntry::Vacant(e) => {
1543                    let task_type = Arc::new(task_type);
1544                    e.insert(task_type.clone(), task_id);
1545                    ArcOrOwned::Arc(task_type)
1546                }
1547            };
1548            (task_id, task_type)
1549        } else {
1550            // Task doesn't exist in memory cache or backing storage
1551            // So we might need to create a new task
1552            let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1553                RawEntry::Occupied(e) => {
1554                    // Another thread beat us to creating this task - use their task_id.
1555                    // They will handle logging the new task as modified
1556                    let task_id = *e.get();
1557                    drop(e);
1558                    self.track_cache_hit(&task_type);
1559                    (task_id, ArcOrOwned::Owned(task_type))
1560                }
1561                RawEntry::Vacant(e) => {
1562                    // We're creating a new task.
1563                    let task_type = Arc::new(task_type);
1564                    let task_id = self.persisted_task_id_factory.get();
1565                    // Initialize storage BEFORE making task_id visible in the cache.
1566                    // This ensures any thread that reads task_id from the cache sees
1567                    // the storage entry already initialized (restored flags set).
1568                    self.storage.initialize_new_task(task_id);
1569                    e.insert(task_type.clone(), task_id);
1570                    // insert() consumes e, releasing the lock
1571                    self.track_cache_miss(&task_type);
1572                    is_new = true;
1573                    (task_id, ArcOrOwned::Arc(task_type))
1574                }
1575            };
1576            (task_id, task_type)
1577        };
1578        if is_new && is_root {
1579            AggregationUpdateQueue::run(
1580                AggregationUpdateJob::UpdateAggregationNumber {
1581                    task_id,
1582                    base_aggregation_number: u32::MAX,
1583                    distance: None,
1584                },
1585                &mut ctx,
1586            );
1587        }
1588        // Reuse the same ExecuteContext for connect_child
1589        operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1590
1591        task_id
1592    }
1593
1594    fn get_or_create_transient_task(
1595        &self,
1596        task_type: CachedTaskType,
1597        parent_task: Option<TaskId>,
1598        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1599    ) -> TaskId {
1600        let is_root = task_type.native_fn.is_root;
1601
1602        if let Some(parent_task) = parent_task
1603            && !parent_task.is_transient()
1604        {
1605            self.panic_persistent_calling_transient(
1606                self.debug_get_task_description(parent_task),
1607                Some(&task_type),
1608                /* cell_id */ None,
1609            );
1610        }
1611        // First check if the task exists in the cache which only uses a read lock.
1612        // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
1613        // before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
1614        if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1615            self.track_cache_hit(&task_type);
1616            self.connect_child(
1617                parent_task,
1618                task_id,
1619                Some(ArcOrOwned::Owned(task_type)),
1620                turbo_tasks,
1621            );
1622            return task_id;
1623        }
1624        // If not, acquire a write lock and double check / insert
1625        match raw_entry(&self.task_cache, &task_type) {
1626            RawEntry::Occupied(e) => {
1627                let task_id = *e.get();
1628                drop(e);
1629                self.track_cache_hit(&task_type);
1630                self.connect_child(
1631                    parent_task,
1632                    task_id,
1633                    Some(ArcOrOwned::Owned(task_type)),
1634                    turbo_tasks,
1635                );
1636                task_id
1637            }
1638            RawEntry::Vacant(e) => {
1639                let task_type = Arc::new(task_type);
1640                let task_id = self.transient_task_id_factory.get();
1641                // Initialize storage BEFORE making task_id visible in the cache.
1642                self.storage.initialize_new_task(task_id);
1643                e.insert(task_type.clone(), task_id);
1644                self.track_cache_miss(&task_type);
1645
1646                if is_root {
1647                    let mut ctx = self.execute_context(turbo_tasks);
1648                    AggregationUpdateQueue::run(
1649                        AggregationUpdateJob::UpdateAggregationNumber {
1650                            task_id,
1651                            base_aggregation_number: u32::MAX,
1652                            distance: None,
1653                        },
1654                        &mut ctx,
1655                    );
1656                }
1657
1658                self.connect_child(
1659                    parent_task,
1660                    task_id,
1661                    Some(ArcOrOwned::Arc(task_type)),
1662                    turbo_tasks,
1663                );
1664
1665                task_id
1666            }
1667        }
1668    }
1669
1670    /// Generate an object that implements [`fmt::Display`] explaining why the given
1671    /// [`CachedTaskType`] is transient.
1672    fn debug_trace_transient_task(
1673        &self,
1674        task_type: &CachedTaskType,
1675        cell_id: Option<CellId>,
1676    ) -> DebugTraceTransientTask {
1677        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1678        // from tracing the same task many times, so use a visited_set
1679        fn inner_id(
1680            backend: &TurboTasksBackendInner<impl BackingStorage>,
1681            task_id: TaskId,
1682            cell_type_id: Option<ValueTypeId>,
1683            visited_set: &mut FxHashSet<TaskId>,
1684        ) -> DebugTraceTransientTask {
1685            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1686                if visited_set.contains(&task_id) {
1687                    let task_name = task_type.get_name();
1688                    DebugTraceTransientTask::Collapsed {
1689                        task_name,
1690                        cell_type_id,
1691                    }
1692                } else {
1693                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1694                }
1695            } else {
1696                DebugTraceTransientTask::Uncached { cell_type_id }
1697            }
1698        }
1699        fn inner_cached(
1700            backend: &TurboTasksBackendInner<impl BackingStorage>,
1701            task_type: &CachedTaskType,
1702            cell_type_id: Option<ValueTypeId>,
1703            visited_set: &mut FxHashSet<TaskId>,
1704        ) -> DebugTraceTransientTask {
1705            let task_name = task_type.get_name();
1706
1707            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1708                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1709                    // `task_id` should never be `None` at this point, as that would imply a
1710                    // non-local task is returning a local `Vc`...
1711                    // Just ignore if it happens, as we're likely already panicking.
1712                    return None;
1713                };
1714                if task_id.is_transient() {
1715                    Some(Box::new(inner_id(
1716                        backend,
1717                        task_id,
1718                        cause_self_raw_vc.try_get_type_id(),
1719                        visited_set,
1720                    )))
1721                } else {
1722                    None
1723                }
1724            });
1725            let cause_args = task_type
1726                .arg
1727                .get_raw_vcs()
1728                .into_iter()
1729                .filter_map(|raw_vc| {
1730                    let Some(task_id) = raw_vc.try_get_task_id() else {
1731                        // `task_id` should never be `None` (see comment above)
1732                        return None;
1733                    };
1734                    if !task_id.is_transient() {
1735                        return None;
1736                    }
1737                    Some((task_id, raw_vc.try_get_type_id()))
1738                })
1739                .collect::<IndexSet<_>>() // dedupe
1740                .into_iter()
1741                .map(|(task_id, cell_type_id)| {
1742                    inner_id(backend, task_id, cell_type_id, visited_set)
1743                })
1744                .collect();
1745
1746            DebugTraceTransientTask::Cached {
1747                task_name,
1748                cell_type_id,
1749                cause_self,
1750                cause_args,
1751            }
1752        }
1753        inner_cached(
1754            self,
1755            task_type,
1756            cell_id.map(|c| c.type_id),
1757            &mut FxHashSet::default(),
1758        )
1759    }
1760
1761    fn invalidate_task(
1762        &self,
1763        task_id: TaskId,
1764        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1765    ) {
1766        if !self.should_track_dependencies() {
1767            panic!("Dependency tracking is disabled so invalidation is not allowed");
1768        }
1769        operation::InvalidateOperation::run(
1770            smallvec![task_id],
1771            #[cfg(feature = "trace_task_dirty")]
1772            TaskDirtyCause::Invalidator,
1773            self.execute_context(turbo_tasks),
1774        );
1775    }
1776
1777    fn invalidate_tasks(
1778        &self,
1779        tasks: &[TaskId],
1780        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1781    ) {
1782        if !self.should_track_dependencies() {
1783            panic!("Dependency tracking is disabled so invalidation is not allowed");
1784        }
1785        operation::InvalidateOperation::run(
1786            tasks.iter().copied().collect(),
1787            #[cfg(feature = "trace_task_dirty")]
1788            TaskDirtyCause::Unknown,
1789            self.execute_context(turbo_tasks),
1790        );
1791    }
1792
1793    fn invalidate_tasks_set(
1794        &self,
1795        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1796        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1797    ) {
1798        if !self.should_track_dependencies() {
1799            panic!("Dependency tracking is disabled so invalidation is not allowed");
1800        }
1801        operation::InvalidateOperation::run(
1802            tasks.iter().copied().collect(),
1803            #[cfg(feature = "trace_task_dirty")]
1804            TaskDirtyCause::Unknown,
1805            self.execute_context(turbo_tasks),
1806        );
1807    }
1808
1809    fn invalidate_serialization(
1810        &self,
1811        task_id: TaskId,
1812        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1813    ) {
1814        if task_id.is_transient() {
1815            return;
1816        }
1817        let mut ctx = self.execute_context(turbo_tasks);
1818        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1819        task.invalidate_serialization();
1820    }
1821
1822    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1823        let task = self.storage.access_mut(task_id);
1824        if let Some(value) = task.get_persistent_task_type() {
1825            format!("{task_id:?} {}", value)
1826        } else if let Some(value) = task.get_transient_task_type() {
1827            format!("{task_id:?} {}", value)
1828        } else {
1829            format!("{task_id:?} unknown")
1830        }
1831    }
1832
1833    fn get_task_name(
1834        &self,
1835        task_id: TaskId,
1836        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1837    ) -> String {
1838        let mut ctx = self.execute_context(turbo_tasks);
1839        let task = ctx.task(task_id, TaskDataCategory::Data);
1840        if let Some(value) = task.get_persistent_task_type() {
1841            value.to_string()
1842        } else if let Some(value) = task.get_transient_task_type() {
1843            value.to_string()
1844        } else {
1845            "unknown".to_string()
1846        }
1847    }
1848
1849    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1850        let task = self.storage.access_mut(task_id);
1851        task.get_persistent_task_type().cloned()
1852    }
1853
1854    fn task_execution_canceled(
1855        &self,
1856        task_id: TaskId,
1857        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1858    ) {
1859        let mut ctx = self.execute_context(turbo_tasks);
1860        let mut task = ctx.task(task_id, TaskDataCategory::All);
1861        if let Some(in_progress) = task.take_in_progress() {
1862            match in_progress {
1863                InProgressState::Scheduled {
1864                    done_event,
1865                    reason: _,
1866                } => done_event.notify(usize::MAX),
1867                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1868                    done_event.notify(usize::MAX)
1869                }
1870                InProgressState::Canceled => {}
1871            }
1872        }
1873        // Notify any readers waiting on in-progress cells so their listeners
1874        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1875        let in_progress_cells = task.take_in_progress_cells();
1876        if let Some(ref cells) = in_progress_cells {
1877            for state in cells.values() {
1878                state.event.notify(usize::MAX);
1879            }
1880        }
1881
1882        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1883        // in the next session. Without this, any reader that encounters the cancelled task
1884        // records an error in its output. That error is persisted and would poison
1885        // subsequent builds. By marking the task session-dependent dirty, the next build
1886        // re-executes it, which invalidates dependents and corrects the stale errors.
1887        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1888            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1889        } else {
1890            None
1891        };
1892
1893        let old = task.set_in_progress(InProgressState::Canceled);
1894        debug_assert!(old.is_none(), "InProgress already exists");
1895        drop(task);
1896
1897        if let Some(data_update) = data_update {
1898            AggregationUpdateQueue::run(data_update, &mut ctx);
1899        }
1900
1901        drop(in_progress_cells);
1902    }
1903
1904    fn try_start_task_execution(
1905        &self,
1906        task_id: TaskId,
1907        priority: TaskPriority,
1908        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1909    ) -> Option<TaskExecutionSpec<'_>> {
1910        let execution_reason;
1911        let task_type;
1912        {
1913            let mut ctx = self.execute_context(turbo_tasks);
1914            let mut task = ctx.task(task_id, TaskDataCategory::All);
1915            task_type = task.get_task_type().to_owned();
1916            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1917            if let Some(tasks) = task.prefetch() {
1918                drop(task);
1919                ctx.prepare_tasks(tasks, "prefetch");
1920                task = ctx.task(task_id, TaskDataCategory::All);
1921            }
1922            let in_progress = task.take_in_progress()?;
1923            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1924                let old = task.set_in_progress(in_progress);
1925                debug_assert!(old.is_none(), "InProgress already exists");
1926                return None;
1927            };
1928            execution_reason = reason;
1929            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1930                InProgressStateInner {
1931                    stale: false,
1932                    once_task,
1933                    done_event,
1934                    session_dependent: false,
1935                    marked_as_completed: false,
1936                    new_children: Default::default(),
1937                },
1938            )));
1939            debug_assert!(old.is_none(), "InProgress already exists");
1940
1941            // Make all current collectibles outdated (remove left-over outdated collectibles)
1942            enum Collectible {
1943                Current(CollectibleRef, i32),
1944                Outdated(CollectibleRef),
1945            }
1946            let collectibles = task
1947                .iter_collectibles()
1948                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1949                .chain(
1950                    task.iter_outdated_collectibles()
1951                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1952                )
1953                .collect::<Vec<_>>();
1954            for collectible in collectibles {
1955                match collectible {
1956                    Collectible::Current(collectible, value) => {
1957                        let _ = task.insert_outdated_collectible(collectible, value);
1958                    }
1959                    Collectible::Outdated(collectible) => {
1960                        if task
1961                            .collectibles()
1962                            .is_none_or(|m| m.get(&collectible).is_none())
1963                        {
1964                            task.remove_outdated_collectibles(&collectible);
1965                        }
1966                    }
1967                }
1968            }
1969
1970            if self.should_track_dependencies() {
1971                // Make all dependencies outdated
1972                let cell_dependencies = task.iter_cell_dependencies().collect();
1973                task.set_outdated_cell_dependencies(cell_dependencies);
1974
1975                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1976                task.set_outdated_output_dependencies(outdated_output_dependencies);
1977            }
1978        }
1979
1980        let (span, future) = match task_type {
1981            TaskType::Cached(task_type) => {
1982                let CachedTaskType {
1983                    native_fn,
1984                    this,
1985                    arg,
1986                } = &*task_type;
1987                (
1988                    native_fn.span(task_id.persistence(), execution_reason, priority),
1989                    native_fn.execute(*this, &**arg),
1990                )
1991            }
1992            TaskType::Transient(task_type) => {
1993                let span = tracing::trace_span!("turbo_tasks::root_task");
1994                let future = match &*task_type {
1995                    TransientTask::Root(f) => f(),
1996                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1997                };
1998                (span, future)
1999            }
2000        };
2001        Some(TaskExecutionSpec { future, span })
2002    }
2003
2004    fn task_execution_completed(
2005        &self,
2006        task_id: TaskId,
2007        result: Result<RawVc, TurboTasksExecutionError>,
2008        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2009        #[cfg(feature = "verify_determinism")] stateful: bool,
2010        has_invalidator: bool,
2011        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2012    ) -> bool {
2013        // Task completion is a 4 step process:
2014        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
2015        //    aggregation number of the task and the new children.
2016        // 2. Connect the new children to the task (and do the relevant aggregation updates).
2017        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
2018        // 4. Shrink the task memory to reduce footprint of the task.
2019
2020        // Due to persistence it is possible that the process is cancelled after any step. This is
2021        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
2022        // in-memory representation.
2023
2024        // The task might be invalidated during this process, so we need to check the stale flag
2025        // at the start of every step.
2026
2027        #[cfg(not(feature = "trace_task_details"))]
2028        let span = tracing::trace_span!(
2029            "task execution completed",
2030            new_children = tracing::field::Empty
2031        )
2032        .entered();
2033        #[cfg(feature = "trace_task_details")]
2034        let span = tracing::trace_span!(
2035            "task execution completed",
2036            task_id = display(task_id),
2037            result = match result.as_ref() {
2038                Ok(value) => display(either::Either::Left(value)),
2039                Err(err) => display(either::Either::Right(err)),
2040            },
2041            new_children = tracing::field::Empty,
2042            immutable = tracing::field::Empty,
2043            new_output = tracing::field::Empty,
2044            output_dependents = tracing::field::Empty,
2045            stale = tracing::field::Empty,
2046        )
2047        .entered();
2048
2049        let is_error = result.is_err();
2050
2051        let mut ctx = self.execute_context(turbo_tasks);
2052
2053        let Some(TaskExecutionCompletePrepareResult {
2054            new_children,
2055            is_now_immutable,
2056            #[cfg(feature = "verify_determinism")]
2057            no_output_set,
2058            new_output,
2059            output_dependent_tasks,
2060            is_recomputation,
2061        }) = self.task_execution_completed_prepare(
2062            &mut ctx,
2063            #[cfg(feature = "trace_task_details")]
2064            &span,
2065            task_id,
2066            result,
2067            cell_counters,
2068            #[cfg(feature = "verify_determinism")]
2069            stateful,
2070            has_invalidator,
2071        )
2072        else {
2073            // Task was stale and has been rescheduled
2074            #[cfg(feature = "trace_task_details")]
2075            span.record("stale", "prepare");
2076            return true;
2077        };
2078
2079        #[cfg(feature = "trace_task_details")]
2080        span.record("new_output", new_output.is_some());
2081        #[cfg(feature = "trace_task_details")]
2082        span.record("output_dependents", output_dependent_tasks.len());
2083
2084        // When restoring from filesystem cache the following might not be executed (since we can
2085        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2086        // would be executed again.
2087
2088        if !output_dependent_tasks.is_empty() {
2089            self.task_execution_completed_invalidate_output_dependent(
2090                &mut ctx,
2091                task_id,
2092                output_dependent_tasks,
2093            );
2094        }
2095
2096        let has_new_children = !new_children.is_empty();
2097        span.record("new_children", new_children.len());
2098
2099        if has_new_children {
2100            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2101        }
2102
2103        if has_new_children
2104            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2105        {
2106            // Task was stale and has been rescheduled
2107            #[cfg(feature = "trace_task_details")]
2108            span.record("stale", "connect");
2109            return true;
2110        }
2111
2112        let (stale, in_progress_cells) = self.task_execution_completed_finish(
2113            &mut ctx,
2114            task_id,
2115            #[cfg(feature = "verify_determinism")]
2116            no_output_set,
2117            new_output,
2118            is_now_immutable,
2119        );
2120        if stale {
2121            // Task was stale and has been rescheduled
2122            #[cfg(feature = "trace_task_details")]
2123            span.record("stale", "finish");
2124            return true;
2125        }
2126
2127        let removed_data = self.task_execution_completed_cleanup(
2128            &mut ctx,
2129            task_id,
2130            cell_counters,
2131            is_error,
2132            is_recomputation,
2133        );
2134
2135        // Drop data outside of critical sections
2136        drop(removed_data);
2137        drop(in_progress_cells);
2138
2139        false
2140    }
2141
2142    fn task_execution_completed_prepare(
2143        &self,
2144        ctx: &mut impl ExecuteContext<'_>,
2145        #[cfg(feature = "trace_task_details")] span: &Span,
2146        task_id: TaskId,
2147        result: Result<RawVc, TurboTasksExecutionError>,
2148        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2149        #[cfg(feature = "verify_determinism")] stateful: bool,
2150        has_invalidator: bool,
2151    ) -> Option<TaskExecutionCompletePrepareResult> {
2152        let mut task = ctx.task(task_id, TaskDataCategory::All);
2153        let is_recomputation = task.is_dirty().is_none();
2154        let Some(in_progress) = task.get_in_progress_mut() else {
2155            panic!("Task execution completed, but task is not in progress: {task:#?}");
2156        };
2157        if matches!(in_progress, InProgressState::Canceled) {
2158            return Some(TaskExecutionCompletePrepareResult {
2159                new_children: Default::default(),
2160                is_now_immutable: false,
2161                #[cfg(feature = "verify_determinism")]
2162                no_output_set: false,
2163                new_output: None,
2164                output_dependent_tasks: Default::default(),
2165                is_recomputation,
2166            });
2167        }
2168        let &mut InProgressState::InProgress(box InProgressStateInner {
2169            stale,
2170            ref mut new_children,
2171            session_dependent,
2172            once_task: is_once_task,
2173            ..
2174        }) = in_progress
2175        else {
2176            panic!("Task execution completed, but task is not in progress: {task:#?}");
2177        };
2178
2179        // If the task is stale, reschedule it
2180        #[cfg(not(feature = "no_fast_stale"))]
2181        if stale && !is_once_task {
2182            let Some(InProgressState::InProgress(box InProgressStateInner {
2183                done_event,
2184                mut new_children,
2185                ..
2186            })) = task.take_in_progress()
2187            else {
2188                unreachable!();
2189            };
2190            let old = task.set_in_progress(InProgressState::Scheduled {
2191                done_event,
2192                reason: TaskExecutionReason::Stale,
2193            });
2194            debug_assert!(old.is_none(), "InProgress already exists");
2195            // Remove old children from new_children to leave only the children that had their
2196            // active count increased
2197            for task in task.iter_children() {
2198                new_children.remove(&task);
2199            }
2200            drop(task);
2201
2202            // We need to undo the active count increase for the children since we throw away the
2203            // new_children list now.
2204            AggregationUpdateQueue::run(
2205                AggregationUpdateJob::DecreaseActiveCounts {
2206                    task_ids: new_children.into_iter().collect(),
2207                },
2208                ctx,
2209            );
2210            return None;
2211        }
2212
2213        // take the children from the task to process them
2214        let mut new_children = take(new_children);
2215
2216        // handle stateful (only tracked when verify_determinism is enabled)
2217        #[cfg(feature = "verify_determinism")]
2218        if stateful {
2219            task.set_stateful(true);
2220        }
2221
2222        // handle has_invalidator
2223        if has_invalidator {
2224            task.set_invalidator(true);
2225        }
2226
2227        // handle cell counters: update max index and remove cells that are no longer used
2228        // On error, skip this update: the task may have failed before creating all cells it
2229        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2230        // based on partial counters would cause "cell no longer exists" errors for tasks that
2231        // still hold dependencies on those cells. The old cell data is preserved on error
2232        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2233        // that data is correct.
2234        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2235        // skips cell data removal on error.
2236        if result.is_ok() || is_recomputation {
2237            let old_counters: FxHashMap<_, _> = task
2238                .iter_cell_type_max_index()
2239                .map(|(&k, &v)| (k, v))
2240                .collect();
2241            let mut counters_to_remove = old_counters.clone();
2242
2243            for (&cell_type, &max_index) in cell_counters.iter() {
2244                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2245                    if old_max_index != max_index {
2246                        task.insert_cell_type_max_index(cell_type, max_index);
2247                    }
2248                } else {
2249                    task.insert_cell_type_max_index(cell_type, max_index);
2250                }
2251            }
2252            for (cell_type, _) in counters_to_remove {
2253                task.remove_cell_type_max_index(&cell_type);
2254            }
2255        }
2256
2257        let mut queue = AggregationUpdateQueue::new();
2258
2259        let mut old_edges = Vec::new();
2260
2261        let has_children = !new_children.is_empty();
2262        let is_immutable = task.immutable();
2263        let task_dependencies_for_immutable =
2264            // Task was previously marked as immutable
2265            if !is_immutable
2266            // Task is not session dependent (session dependent tasks can change between sessions)
2267            && !session_dependent
2268            // Task has no invalidator
2269            && !task.invalidator()
2270            // Task has no dependencies on collectibles
2271            && task.is_collectibles_dependencies_empty()
2272        {
2273            Some(
2274                // Collect all dependencies on tasks to check if all dependencies are immutable
2275                task.iter_output_dependencies()
2276                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2277                    .collect::<FxHashSet<_>>(),
2278            )
2279        } else {
2280            None
2281        };
2282
2283        if has_children {
2284            // Prepare all new children
2285            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2286
2287            // Filter actual new children
2288            old_edges.extend(
2289                task.iter_children()
2290                    .filter(|task| !new_children.remove(task))
2291                    .map(OutdatedEdge::Child),
2292            );
2293        } else {
2294            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2295        }
2296
2297        old_edges.extend(
2298            task.iter_outdated_collectibles()
2299                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2300        );
2301
2302        if self.should_track_dependencies() {
2303            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2304            // At execution start, active deps are copied to outdated as a "before" snapshot.
2305            // During execution, new deps are added to active.
2306            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2307            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2308            // breaking dependency tracking.
2309            old_edges.extend(
2310                task.iter_outdated_cell_dependencies()
2311                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2312            );
2313            old_edges.extend(
2314                task.iter_outdated_output_dependencies()
2315                    .map(OutdatedEdge::OutputDependency),
2316            );
2317        }
2318
2319        // Check if output need to be updated
2320        let current_output = task.get_output();
2321        #[cfg(feature = "verify_determinism")]
2322        let no_output_set = current_output.is_none();
2323        let new_output = match result {
2324            Ok(RawVc::TaskOutput(output_task_id)) => {
2325                if let Some(OutputValue::Output(current_task_id)) = current_output
2326                    && *current_task_id == output_task_id
2327                {
2328                    None
2329                } else {
2330                    Some(OutputValue::Output(output_task_id))
2331                }
2332            }
2333            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2334                if let Some(OutputValue::Cell(CellRef {
2335                    task: current_task_id,
2336                    cell: current_cell,
2337                })) = current_output
2338                    && *current_task_id == output_task_id
2339                    && *current_cell == cell
2340                {
2341                    None
2342                } else {
2343                    Some(OutputValue::Cell(CellRef {
2344                        task: output_task_id,
2345                        cell,
2346                    }))
2347                }
2348            }
2349            Ok(RawVc::LocalOutput(..)) => {
2350                panic!("Non-local tasks must not return a local Vc");
2351            }
2352            Err(err) => {
2353                if let Some(OutputValue::Error(old_error)) = current_output
2354                    && **old_error == err
2355                {
2356                    None
2357                } else {
2358                    Some(OutputValue::Error(Arc::new((&err).into())))
2359                }
2360            }
2361        };
2362        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2363        // When output has changed, grab the dependent tasks
2364        if new_output.is_some() && ctx.should_track_dependencies() {
2365            output_dependent_tasks = task.iter_output_dependent().collect();
2366        }
2367
2368        drop(task);
2369
2370        // Check if the task can be marked as immutable
2371        let mut is_now_immutable = false;
2372        if let Some(dependencies) = task_dependencies_for_immutable
2373            && dependencies
2374                .iter()
2375                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2376        {
2377            is_now_immutable = true;
2378        }
2379        #[cfg(feature = "trace_task_details")]
2380        span.record("immutable", is_immutable || is_now_immutable);
2381
2382        if !queue.is_empty() || !old_edges.is_empty() {
2383            #[cfg(feature = "trace_task_completion")]
2384            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2385            // Remove outdated edges first, before removing in_progress+dirty flag.
2386            // We need to make sure all outdated edges are removed before the task can potentially
2387            // be scheduled and executed again
2388            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2389        }
2390
2391        Some(TaskExecutionCompletePrepareResult {
2392            new_children,
2393            is_now_immutable,
2394            #[cfg(feature = "verify_determinism")]
2395            no_output_set,
2396            new_output,
2397            output_dependent_tasks,
2398            is_recomputation,
2399        })
2400    }
2401
2402    fn task_execution_completed_invalidate_output_dependent(
2403        &self,
2404        ctx: &mut impl ExecuteContext<'_>,
2405        task_id: TaskId,
2406        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2407    ) {
2408        debug_assert!(!output_dependent_tasks.is_empty());
2409
2410        if output_dependent_tasks.len() > 1 {
2411            ctx.prepare_tasks(
2412                output_dependent_tasks
2413                    .iter()
2414                    .map(|&id| (id, TaskDataCategory::All)),
2415                "invalidate output dependents",
2416            );
2417        }
2418
2419        fn process_output_dependents(
2420            ctx: &mut impl ExecuteContext<'_>,
2421            task_id: TaskId,
2422            dependent_task_id: TaskId,
2423            queue: &mut AggregationUpdateQueue,
2424        ) {
2425            #[cfg(feature = "trace_task_output_dependencies")]
2426            let span = tracing::trace_span!(
2427                "invalidate output dependency",
2428                task = %task_id,
2429                dependent_task = %dependent_task_id,
2430                result = tracing::field::Empty,
2431            )
2432            .entered();
2433            let mut make_stale = true;
2434            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2435            let transient_task_type = dependent.get_transient_task_type();
2436            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2437                // once tasks are never invalidated
2438                #[cfg(feature = "trace_task_output_dependencies")]
2439                span.record("result", "once task");
2440                return;
2441            }
2442            if dependent.outdated_output_dependencies_contains(&task_id) {
2443                #[cfg(feature = "trace_task_output_dependencies")]
2444                span.record("result", "outdated dependency");
2445                // output dependency is outdated, so it hasn't read the output yet
2446                // and doesn't need to be invalidated
2447                // But importantly we still need to make the task dirty as it should no longer
2448                // be considered as "recomputation".
2449                make_stale = false;
2450            } else if !dependent.output_dependencies_contains(&task_id) {
2451                // output dependency has been removed, so the task doesn't depend on the
2452                // output anymore and doesn't need to be invalidated
2453                #[cfg(feature = "trace_task_output_dependencies")]
2454                span.record("result", "no backward dependency");
2455                return;
2456            }
2457            make_task_dirty_internal(
2458                dependent,
2459                dependent_task_id,
2460                make_stale,
2461                #[cfg(feature = "trace_task_dirty")]
2462                TaskDirtyCause::OutputChange { task_id },
2463                queue,
2464                ctx,
2465            );
2466            #[cfg(feature = "trace_task_output_dependencies")]
2467            span.record("result", "marked dirty");
2468        }
2469
2470        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2471            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2472            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2473            let _ = scope_and_block(chunks.len(), |scope| {
2474                for chunk in chunks {
2475                    let child_ctx = ctx.child_context();
2476                    scope.spawn(move || {
2477                        let mut ctx = child_ctx.create();
2478                        let mut queue = AggregationUpdateQueue::new();
2479                        for dependent_task_id in chunk {
2480                            process_output_dependents(
2481                                &mut ctx,
2482                                task_id,
2483                                dependent_task_id,
2484                                &mut queue,
2485                            )
2486                        }
2487                        queue.execute(&mut ctx);
2488                    });
2489                }
2490            });
2491        } else {
2492            let mut queue = AggregationUpdateQueue::new();
2493            for dependent_task_id in output_dependent_tasks {
2494                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2495            }
2496            queue.execute(ctx);
2497        }
2498    }
2499
2500    fn task_execution_completed_unfinished_children_dirty(
2501        &self,
2502        ctx: &mut impl ExecuteContext<'_>,
2503        new_children: &FxHashSet<TaskId>,
2504    ) {
2505        debug_assert!(!new_children.is_empty());
2506
2507        let mut queue = AggregationUpdateQueue::new();
2508        ctx.for_each_task_all(
2509            new_children.iter().copied(),
2510            "unfinished children dirty",
2511            |child_task, ctx| {
2512                if !child_task.has_output() {
2513                    let child_id = child_task.id();
2514                    make_task_dirty_internal(
2515                        child_task,
2516                        child_id,
2517                        false,
2518                        #[cfg(feature = "trace_task_dirty")]
2519                        TaskDirtyCause::InitialDirty,
2520                        &mut queue,
2521                        ctx,
2522                    );
2523                }
2524            },
2525        );
2526
2527        queue.execute(ctx);
2528    }
2529
2530    fn task_execution_completed_connect(
2531        &self,
2532        ctx: &mut impl ExecuteContext<'_>,
2533        task_id: TaskId,
2534        new_children: FxHashSet<TaskId>,
2535    ) -> bool {
2536        debug_assert!(!new_children.is_empty());
2537
2538        let mut task = ctx.task(task_id, TaskDataCategory::All);
2539        let Some(in_progress) = task.get_in_progress() else {
2540            panic!("Task execution completed, but task is not in progress: {task:#?}");
2541        };
2542        if matches!(in_progress, InProgressState::Canceled) {
2543            // Task was canceled in the meantime, so we don't connect the children
2544            return false;
2545        }
2546        let InProgressState::InProgress(box InProgressStateInner {
2547            #[cfg(not(feature = "no_fast_stale"))]
2548            stale,
2549            once_task: is_once_task,
2550            ..
2551        }) = in_progress
2552        else {
2553            panic!("Task execution completed, but task is not in progress: {task:#?}");
2554        };
2555
2556        // If the task is stale, reschedule it
2557        #[cfg(not(feature = "no_fast_stale"))]
2558        if *stale && !is_once_task {
2559            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2560                task.take_in_progress()
2561            else {
2562                unreachable!();
2563            };
2564            let old = task.set_in_progress(InProgressState::Scheduled {
2565                done_event,
2566                reason: TaskExecutionReason::Stale,
2567            });
2568            debug_assert!(old.is_none(), "InProgress already exists");
2569            drop(task);
2570
2571            // All `new_children` are currently hold active with an active count and we need to undo
2572            // that. (We already filtered out the old children from that list)
2573            AggregationUpdateQueue::run(
2574                AggregationUpdateJob::DecreaseActiveCounts {
2575                    task_ids: new_children.into_iter().collect(),
2576                },
2577                ctx,
2578            );
2579            return true;
2580        }
2581
2582        let has_active_count = ctx.should_track_activeness()
2583            && task
2584                .get_activeness()
2585                .is_some_and(|activeness| activeness.active_counter > 0);
2586        connect_children(
2587            ctx,
2588            task_id,
2589            task,
2590            new_children,
2591            has_active_count,
2592            ctx.should_track_activeness(),
2593        );
2594
2595        false
2596    }
2597
2598    fn task_execution_completed_finish(
2599        &self,
2600        ctx: &mut impl ExecuteContext<'_>,
2601        task_id: TaskId,
2602        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2603        new_output: Option<OutputValue>,
2604        is_now_immutable: bool,
2605    ) -> (
2606        bool,
2607        Option<
2608            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2609        >,
2610    ) {
2611        let mut task = ctx.task(task_id, TaskDataCategory::All);
2612        let Some(in_progress) = task.take_in_progress() else {
2613            panic!("Task execution completed, but task is not in progress: {task:#?}");
2614        };
2615        if matches!(in_progress, InProgressState::Canceled) {
2616            // Task was canceled in the meantime, so we don't finish it
2617            return (false, None);
2618        }
2619        let InProgressState::InProgress(box InProgressStateInner {
2620            done_event,
2621            once_task: is_once_task,
2622            stale,
2623            session_dependent,
2624            marked_as_completed: _,
2625            new_children,
2626        }) = in_progress
2627        else {
2628            panic!("Task execution completed, but task is not in progress: {task:#?}");
2629        };
2630        debug_assert!(new_children.is_empty());
2631
2632        // If the task is stale, reschedule it
2633        if stale && !is_once_task {
2634            let old = task.set_in_progress(InProgressState::Scheduled {
2635                done_event,
2636                reason: TaskExecutionReason::Stale,
2637            });
2638            debug_assert!(old.is_none(), "InProgress already exists");
2639            return (true, None);
2640        }
2641
2642        // Set the output if it has changed
2643        let mut old_content = None;
2644        if let Some(value) = new_output {
2645            old_content = task.set_output(value);
2646        }
2647
2648        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2649        // to be invalidated and we can mark it as immutable.
2650        if is_now_immutable {
2651            task.set_immutable(true);
2652        }
2653
2654        // Notify in progress cells and remove all of them
2655        let in_progress_cells = task.take_in_progress_cells();
2656        if let Some(ref cells) = in_progress_cells {
2657            for state in cells.values() {
2658                state.event.notify(usize::MAX);
2659            }
2660        }
2661
2662        // Compute the new dirty state
2663        let new_dirtyness = if session_dependent {
2664            Some(Dirtyness::SessionDependent)
2665        } else {
2666            None
2667        };
2668        #[cfg(feature = "verify_determinism")]
2669        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2670        let data_update = task.update_dirty_state(new_dirtyness);
2671
2672        #[cfg(feature = "verify_determinism")]
2673        let reschedule =
2674            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2675        #[cfg(not(feature = "verify_determinism"))]
2676        let reschedule = false;
2677        if reschedule {
2678            let old = task.set_in_progress(InProgressState::Scheduled {
2679                done_event,
2680                reason: TaskExecutionReason::Stale,
2681            });
2682            debug_assert!(old.is_none(), "InProgress already exists");
2683            drop(task);
2684        } else {
2685            drop(task);
2686
2687            // Notify dependent tasks that are waiting for this task to finish
2688            done_event.notify(usize::MAX);
2689        }
2690
2691        drop(old_content);
2692
2693        if let Some(data_update) = data_update {
2694            AggregationUpdateQueue::run(data_update, ctx);
2695        }
2696
2697        // We return so the data can be dropped outside of critical sections
2698        (reschedule, in_progress_cells)
2699    }
2700
2701    fn task_execution_completed_cleanup(
2702        &self,
2703        ctx: &mut impl ExecuteContext<'_>,
2704        task_id: TaskId,
2705        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2706        is_error: bool,
2707        is_recomputation: bool,
2708    ) -> Vec<SharedReference> {
2709        let mut task = ctx.task(task_id, TaskDataCategory::All);
2710        let mut removed_cell_data = Vec::new();
2711        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2712        // after an error as it is likely transient and we want to keep the dependent tasks
2713        // clean to avoid re-executions.
2714        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2715        // skips cell_type_max_index updates on error.
2716        if !is_error || is_recomputation {
2717            // Remove no longer existing cells and
2718            // find all outdated data items (removed cells, outdated edges)
2719            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2720            // anyway and we want to avoid needless re-executions. When the cells become
2721            // used again, they are invalidated from the update cell operation.
2722            // Remove cell data for cells that no longer exist
2723            let to_remove_persistent: Vec<_> = task
2724                .iter_persistent_cell_data()
2725                .filter_map(|(cell, _)| {
2726                    cell_counters
2727                        .get(&cell.type_id)
2728                        .is_none_or(|start_index| cell.index >= *start_index)
2729                        .then_some(*cell)
2730                })
2731                .collect();
2732
2733            // Remove transient cell data for cells that no longer exist
2734            let to_remove_transient: Vec<_> = task
2735                .iter_transient_cell_data()
2736                .filter_map(|(cell, _)| {
2737                    cell_counters
2738                        .get(&cell.type_id)
2739                        .is_none_or(|start_index| cell.index >= *start_index)
2740                        .then_some(*cell)
2741                })
2742                .collect();
2743            removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2744            for cell in to_remove_persistent {
2745                if let Some(data) = task.remove_persistent_cell_data(&cell) {
2746                    removed_cell_data.push(data.into_untyped());
2747                }
2748            }
2749            for cell in to_remove_transient {
2750                if let Some(data) = task.remove_transient_cell_data(&cell) {
2751                    removed_cell_data.push(data);
2752                }
2753            }
2754            // Remove cell_data_hash entries for cells that no longer exist
2755            let to_remove_hash: Vec<_> = task
2756                .iter_cell_data_hash()
2757                .filter_map(|(cell, _)| {
2758                    cell_counters
2759                        .get(&cell.type_id)
2760                        .is_none_or(|start_index| cell.index >= *start_index)
2761                        .then_some(*cell)
2762                })
2763                .collect();
2764            for cell in to_remove_hash {
2765                task.remove_cell_data_hash(&cell);
2766            }
2767        }
2768
2769        // Clean up task storage after execution:
2770        // - Shrink collections marked with shrink_on_completion
2771        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2772        task.cleanup_after_execution();
2773
2774        drop(task);
2775
2776        // Return so we can drop outside of critical sections
2777        removed_cell_data
2778    }
2779
2780    /// Prints the standard message emitted when the background persisting process stops due to an
2781    /// unrecoverable write error. The caller is responsible for returning from the background job.
2782    fn log_unrecoverable_persist_error() {
2783        eprintln!(
2784            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2785             background persisting process."
2786        );
2787    }
2788
2789    fn run_backend_job<'a>(
2790        self: &'a Arc<Self>,
2791        job: TurboTasksBackendJob,
2792        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2793    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2794        Box::pin(async move {
2795            match job {
2796                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2797                    debug_assert!(self.should_persist());
2798
2799                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2800                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2801                    let mut idle_start_listener = self.idle_start_event.listen();
2802                    let mut idle_end_listener = self.idle_end_event.listen();
2803                    let mut fresh_idle = true;
2804                    loop {
2805                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2806                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2807                        let idle_timeout = *IDLE_TIMEOUT;
2808                        let (time, mut reason) =
2809                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2810                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2811                            } else {
2812                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2813                            };
2814
2815                        let until = last_snapshot + time;
2816                        if until > Instant::now() {
2817                            let mut stop_listener = self.stopping_event.listen();
2818                            if self.stopping.load(Ordering::Acquire) {
2819                                return;
2820                            }
2821                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2822                                Instant::now() + idle_timeout
2823                            } else {
2824                                far_future()
2825                            };
2826                            loop {
2827                                tokio::select! {
2828                                    _ = &mut stop_listener => {
2829                                        return;
2830                                    },
2831                                    _ = &mut idle_start_listener => {
2832                                        idle_time = Instant::now() + idle_timeout;
2833                                        idle_start_listener = self.idle_start_event.listen()
2834                                    },
2835                                    _ = &mut idle_end_listener => {
2836                                        idle_time = until + idle_timeout;
2837                                        idle_end_listener = self.idle_end_event.listen()
2838                                    },
2839                                    _ = tokio::time::sleep_until(until) => {
2840                                        break;
2841                                    },
2842                                    _ = tokio::time::sleep_until(idle_time) => {
2843                                        if turbo_tasks.is_idle() {
2844                                            reason = "idle timeout";
2845                                            break;
2846                                        }
2847                                    },
2848                                }
2849                            }
2850                        }
2851
2852                        let this = self.clone();
2853                        // Create a root span shared by both the snapshot/persist
2854                        // work and the subsequent compaction so they appear
2855                        // grouped together in trace viewers.
2856                        let background_span =
2857                            tracing::info_span!(parent: None, "background snapshot");
2858                        match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2859                            Err(err) => {
2860                                // save_snapshot consumed persisted_task_cache_log entries;
2861                                // further snapshots would corrupt the task graph.
2862                                eprintln!("Persisting failed: {err:?}");
2863                                Self::log_unrecoverable_persist_error();
2864                                return;
2865                            }
2866                            Ok((snapshot_start, new_data)) => {
2867                                last_snapshot = snapshot_start;
2868
2869                                // Compact while idle (up to limit), regardless of
2870                                // whether the snapshot had new data.
2871                                // `background_span` is not entered here because
2872                                // `EnteredSpan` is `!Send` and would prevent the
2873                                // future from being sent across threads when it
2874                                // suspends at the `select!` await below.
2875                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2876                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2877                                    let idle_ended = tokio::select! {
2878                                        biased;
2879                                        _ = &mut idle_end_listener => {
2880                                            idle_end_listener = self.idle_end_event.listen();
2881                                            true
2882                                        },
2883                                        _ = std::future::ready(()) => false,
2884                                    };
2885                                    if idle_ended {
2886                                        break;
2887                                    }
2888                                    // Enter the span only around the synchronous
2889                                    // compact() call so we never hold an
2890                                    // `EnteredSpan` across an await point.
2891                                    let _compact_span = tracing::info_span!(
2892                                        parent: background_span.id(),
2893                                        "compact database"
2894                                    )
2895                                    .entered();
2896                                    match self.backing_storage.compact() {
2897                                        Ok(true) => {}
2898                                        Ok(false) => break,
2899                                        Err(err) => {
2900                                            eprintln!("Compaction failed: {err:?}");
2901                                            if self.backing_storage.has_unrecoverable_write_error()
2902                                            {
2903                                                Self::log_unrecoverable_persist_error();
2904                                                return;
2905                                            }
2906                                            break;
2907                                        }
2908                                    }
2909                                }
2910
2911                                if !new_data {
2912                                    fresh_idle = false;
2913                                    continue;
2914                                }
2915                                let last_snapshot = last_snapshot.duration_since(self.start_time);
2916                                self.last_snapshot.store(
2917                                    last_snapshot.as_millis().try_into().unwrap(),
2918                                    Ordering::Relaxed,
2919                                );
2920
2921                                turbo_tasks.schedule_backend_background_job(
2922                                    TurboTasksBackendJob::FollowUpSnapshot,
2923                                );
2924                                return;
2925                            }
2926                        }
2927                    }
2928                }
2929            }
2930        })
2931    }
2932
2933    fn try_read_own_task_cell(
2934        &self,
2935        task_id: TaskId,
2936        cell: CellId,
2937        options: ReadCellOptions,
2938        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2939    ) -> Result<TypedCellContent> {
2940        let mut ctx = self.execute_context(turbo_tasks);
2941        let task = ctx.task(task_id, TaskDataCategory::Data);
2942        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2943            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2944            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2945        } else {
2946            Ok(CellContent(None).into_typed(cell.type_id))
2947        }
2948    }
2949
2950    fn read_task_collectibles(
2951        &self,
2952        task_id: TaskId,
2953        collectible_type: TraitTypeId,
2954        reader_id: Option<TaskId>,
2955        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2956    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2957        let mut ctx = self.execute_context(turbo_tasks);
2958        let mut collectibles = AutoMap::default();
2959        {
2960            let mut task = ctx.task(task_id, TaskDataCategory::All);
2961            // Ensure it's an root node
2962            loop {
2963                let aggregation_number = get_aggregation_number(&task);
2964                if is_root_node(aggregation_number) {
2965                    break;
2966                }
2967                drop(task);
2968                AggregationUpdateQueue::run(
2969                    AggregationUpdateJob::UpdateAggregationNumber {
2970                        task_id,
2971                        base_aggregation_number: u32::MAX,
2972                        distance: None,
2973                    },
2974                    &mut ctx,
2975                );
2976                task = ctx.task(task_id, TaskDataCategory::All);
2977            }
2978            for (collectible, count) in task.iter_aggregated_collectibles() {
2979                if *count > 0 && collectible.collectible_type == collectible_type {
2980                    *collectibles
2981                        .entry(RawVc::TaskCell(
2982                            collectible.cell.task,
2983                            collectible.cell.cell,
2984                        ))
2985                        .or_insert(0) += 1;
2986                }
2987            }
2988            for (&collectible, &count) in task.iter_collectibles() {
2989                if collectible.collectible_type == collectible_type {
2990                    *collectibles
2991                        .entry(RawVc::TaskCell(
2992                            collectible.cell.task,
2993                            collectible.cell.cell,
2994                        ))
2995                        .or_insert(0) += count;
2996                }
2997            }
2998            if let Some(reader_id) = reader_id {
2999                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3000            }
3001        }
3002        if let Some(reader_id) = reader_id {
3003            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3004            let target = CollectiblesRef {
3005                task: task_id,
3006                collectible_type,
3007            };
3008            if !reader.remove_outdated_collectibles_dependencies(&target) {
3009                let _ = reader.add_collectibles_dependencies(target);
3010            }
3011        }
3012        collectibles
3013    }
3014
3015    fn emit_collectible(
3016        &self,
3017        collectible_type: TraitTypeId,
3018        collectible: RawVc,
3019        task_id: TaskId,
3020        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3021    ) {
3022        self.assert_valid_collectible(task_id, collectible);
3023
3024        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3025            panic!("Collectibles need to be resolved");
3026        };
3027        let cell = CellRef {
3028            task: collectible_task,
3029            cell,
3030        };
3031        operation::UpdateCollectibleOperation::run(
3032            task_id,
3033            CollectibleRef {
3034                collectible_type,
3035                cell,
3036            },
3037            1,
3038            self.execute_context(turbo_tasks),
3039        );
3040    }
3041
3042    fn unemit_collectible(
3043        &self,
3044        collectible_type: TraitTypeId,
3045        collectible: RawVc,
3046        count: u32,
3047        task_id: TaskId,
3048        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3049    ) {
3050        self.assert_valid_collectible(task_id, collectible);
3051
3052        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3053            panic!("Collectibles need to be resolved");
3054        };
3055        let cell = CellRef {
3056            task: collectible_task,
3057            cell,
3058        };
3059        operation::UpdateCollectibleOperation::run(
3060            task_id,
3061            CollectibleRef {
3062                collectible_type,
3063                cell,
3064            },
3065            -(i32::try_from(count).unwrap()),
3066            self.execute_context(turbo_tasks),
3067        );
3068    }
3069
3070    fn update_task_cell(
3071        &self,
3072        task_id: TaskId,
3073        cell: CellId,
3074        is_serializable_cell_content: bool,
3075        content: CellContent,
3076        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3077        content_hash: Option<CellHash>,
3078        verification_mode: VerificationMode,
3079        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3080    ) {
3081        operation::UpdateCellOperation::run(
3082            task_id,
3083            cell,
3084            content,
3085            is_serializable_cell_content,
3086            updated_key_hashes,
3087            content_hash,
3088            verification_mode,
3089            self.execute_context(turbo_tasks),
3090        );
3091    }
3092
3093    fn mark_own_task_as_session_dependent(
3094        &self,
3095        task_id: TaskId,
3096        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3097    ) {
3098        if !self.should_track_dependencies() {
3099            // Without dependency tracking we don't need session dependent tasks
3100            return;
3101        }
3102        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3103        let mut ctx = self.execute_context(turbo_tasks);
3104        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3105        let aggregation_number = get_aggregation_number(&task);
3106        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3107            drop(task);
3108            // We want to use a high aggregation number to avoid large aggregation chains for
3109            // session dependent tasks (which change on every run)
3110            AggregationUpdateQueue::run(
3111                AggregationUpdateJob::UpdateAggregationNumber {
3112                    task_id,
3113                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3114                    distance: None,
3115                },
3116                &mut ctx,
3117            );
3118            task = ctx.task(task_id, TaskDataCategory::Meta);
3119        }
3120        if let Some(InProgressState::InProgress(box InProgressStateInner {
3121            session_dependent,
3122            ..
3123        })) = task.get_in_progress_mut()
3124        {
3125            *session_dependent = true;
3126        }
3127    }
3128
3129    fn mark_own_task_as_finished(
3130        &self,
3131        task: TaskId,
3132        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3133    ) {
3134        let mut ctx = self.execute_context(turbo_tasks);
3135        let mut task = ctx.task(task, TaskDataCategory::Data);
3136        if let Some(InProgressState::InProgress(box InProgressStateInner {
3137            marked_as_completed,
3138            ..
3139        })) = task.get_in_progress_mut()
3140        {
3141            *marked_as_completed = true;
3142            // TODO this should remove the dirty state (also check session_dependent)
3143            // but this would break some assumptions for strongly consistent reads.
3144            // Client tasks are not connected yet, so we wouldn't wait for them.
3145            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3146        }
3147    }
3148
3149    fn connect_task(
3150        &self,
3151        task: TaskId,
3152        parent_task: Option<TaskId>,
3153        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3154    ) {
3155        self.assert_not_persistent_calling_transient(parent_task, task, None);
3156        ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3157    }
3158
3159    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3160        let task_id = self.transient_task_id_factory.get();
3161        {
3162            let mut task = self.storage.access_mut(task_id);
3163            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3164        }
3165        #[cfg(feature = "verify_aggregation_graph")]
3166        self.root_tasks.lock().insert(task_id);
3167        task_id
3168    }
3169
3170    fn dispose_root_task(
3171        &self,
3172        task_id: TaskId,
3173        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3174    ) {
3175        #[cfg(feature = "verify_aggregation_graph")]
3176        self.root_tasks.lock().remove(&task_id);
3177
3178        let mut ctx = self.execute_context(turbo_tasks);
3179        let mut task = ctx.task(task_id, TaskDataCategory::All);
3180        let is_dirty = task.is_dirty();
3181        let has_dirty_containers = task.has_dirty_containers();
3182        if is_dirty.is_some() || has_dirty_containers {
3183            if let Some(activeness_state) = task.get_activeness_mut() {
3184                // We will finish the task, but it would be removed after the task is done
3185                activeness_state.unset_root_type();
3186                activeness_state.set_active_until_clean();
3187            };
3188        } else if let Some(activeness_state) = task.take_activeness() {
3189            // Technically nobody should be listening to this event, but just in case
3190            // we notify it anyway
3191            activeness_state.all_clean_event.notify(usize::MAX);
3192        }
3193    }
3194
3195    #[cfg(feature = "verify_aggregation_graph")]
3196    fn verify_aggregation_graph(
3197        &self,
3198        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3199        idle: bool,
3200    ) {
3201        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3202            return;
3203        }
3204        use std::{collections::VecDeque, env, io::stdout};
3205
3206        use crate::backend::operation::{get_uppers, is_aggregating_node};
3207
3208        let mut ctx = self.execute_context(turbo_tasks);
3209        let root_tasks = self.root_tasks.lock().clone();
3210
3211        for task_id in root_tasks.into_iter() {
3212            let mut queue = VecDeque::new();
3213            let mut visited = FxHashSet::default();
3214            let mut aggregated_nodes = FxHashSet::default();
3215            let mut collectibles = FxHashMap::default();
3216            let root_task_id = task_id;
3217            visited.insert(task_id);
3218            aggregated_nodes.insert(task_id);
3219            queue.push_back(task_id);
3220            let mut counter = 0;
3221            while let Some(task_id) = queue.pop_front() {
3222                counter += 1;
3223                if counter % 100000 == 0 {
3224                    println!(
3225                        "queue={}, visited={}, aggregated_nodes={}",
3226                        queue.len(),
3227                        visited.len(),
3228                        aggregated_nodes.len()
3229                    );
3230                }
3231                let task = ctx.task(task_id, TaskDataCategory::All);
3232                if idle && !self.is_idle.load(Ordering::Relaxed) {
3233                    return;
3234                }
3235
3236                let uppers = get_uppers(&task);
3237                if task_id != root_task_id
3238                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3239                {
3240                    panic!(
3241                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3242                         {:?})",
3243                        task_id,
3244                        task.get_task_description(),
3245                        uppers
3246                    );
3247                }
3248
3249                for (collectible, _) in task.iter_aggregated_collectibles() {
3250                    collectibles
3251                        .entry(*collectible)
3252                        .or_insert_with(|| (false, Vec::new()))
3253                        .1
3254                        .push(task_id);
3255                }
3256
3257                for (&collectible, &value) in task.iter_collectibles() {
3258                    if value > 0 {
3259                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3260                            *flag = true
3261                        } else {
3262                            panic!(
3263                                "Task {} has a collectible {:?} that is not in any upper task",
3264                                task_id, collectible
3265                            );
3266                        }
3267                    }
3268                }
3269
3270                let is_dirty = task.has_dirty();
3271                let has_dirty_container = task.has_dirty_containers();
3272                let should_be_in_upper = is_dirty || has_dirty_container;
3273
3274                let aggregation_number = get_aggregation_number(&task);
3275                if is_aggregating_node(aggregation_number) {
3276                    aggregated_nodes.insert(task_id);
3277                }
3278                // println!(
3279                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3280                //     ctx.get_task_description(task_id),
3281                //     uppers
3282                // );
3283
3284                for child_id in task.iter_children() {
3285                    // println!("{task_id}: child -> {child_id}");
3286                    if visited.insert(child_id) {
3287                        queue.push_back(child_id);
3288                    }
3289                }
3290                drop(task);
3291
3292                if should_be_in_upper {
3293                    for upper_id in uppers {
3294                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3295                        let in_upper = upper
3296                            .get_aggregated_dirty_containers(&task_id)
3297                            .is_some_and(|&dirty| dirty > 0);
3298                        if !in_upper {
3299                            let containers: Vec<_> = upper
3300                                .iter_aggregated_dirty_containers()
3301                                .map(|(&k, &v)| (k, v))
3302                                .collect();
3303                            let upper_task_desc = upper.get_task_description();
3304                            drop(upper);
3305                            panic!(
3306                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3307                                 ({})\nThese dirty containers are present:\n{:#?}",
3308                                task_id,
3309                                ctx.task(task_id, TaskDataCategory::Data)
3310                                    .get_task_description(),
3311                                upper_id,
3312                                upper_task_desc,
3313                                containers,
3314                            );
3315                        }
3316                    }
3317                }
3318            }
3319
3320            for (collectible, (flag, task_ids)) in collectibles {
3321                if !flag {
3322                    use std::io::Write;
3323                    let mut stdout = stdout().lock();
3324                    writeln!(
3325                        stdout,
3326                        "{:?} that is not emitted in any child task but in these aggregated \
3327                         tasks: {:#?}",
3328                        collectible,
3329                        task_ids
3330                            .iter()
3331                            .map(|t| format!(
3332                                "{t} {}",
3333                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3334                            ))
3335                            .collect::<Vec<_>>()
3336                    )
3337                    .unwrap();
3338
3339                    let task_id = collectible.cell.task;
3340                    let mut queue = {
3341                        let task = ctx.task(task_id, TaskDataCategory::All);
3342                        get_uppers(&task)
3343                    };
3344                    let mut visited = FxHashSet::default();
3345                    for &upper_id in queue.iter() {
3346                        visited.insert(upper_id);
3347                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3348                    }
3349                    while let Some(task_id) = queue.pop() {
3350                        let task = ctx.task(task_id, TaskDataCategory::All);
3351                        let desc = task.get_task_description();
3352                        let aggregated_collectible = task
3353                            .get_aggregated_collectibles(&collectible)
3354                            .copied()
3355                            .unwrap_or_default();
3356                        let uppers = get_uppers(&task);
3357                        drop(task);
3358                        writeln!(
3359                            stdout,
3360                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3361                        )
3362                        .unwrap();
3363                        if task_ids.contains(&task_id) {
3364                            writeln!(
3365                                stdout,
3366                                "Task has an upper connection to an aggregated task that doesn't \
3367                                 reference it. Upper connection is invalid!"
3368                            )
3369                            .unwrap();
3370                        }
3371                        for upper_id in uppers {
3372                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3373                            if !visited.contains(&upper_id) {
3374                                queue.push(upper_id);
3375                            }
3376                        }
3377                    }
3378                    panic!("See stdout for more details");
3379                }
3380            }
3381        }
3382    }
3383
3384    fn assert_not_persistent_calling_transient(
3385        &self,
3386        parent_id: Option<TaskId>,
3387        child_id: TaskId,
3388        cell_id: Option<CellId>,
3389    ) {
3390        if let Some(parent_id) = parent_id
3391            && !parent_id.is_transient()
3392            && child_id.is_transient()
3393        {
3394            self.panic_persistent_calling_transient(
3395                self.debug_get_task_description(parent_id),
3396                self.debug_get_cached_task_type(child_id).as_deref(),
3397                cell_id,
3398            );
3399        }
3400    }
3401
3402    fn panic_persistent_calling_transient(
3403        &self,
3404        parent: String,
3405        child: Option<&CachedTaskType>,
3406        cell_id: Option<CellId>,
3407    ) {
3408        let transient_reason = if let Some(child) = child {
3409            Cow::Owned(format!(
3410                " The callee is transient because it depends on:\n{}",
3411                self.debug_trace_transient_task(child, cell_id),
3412            ))
3413        } else {
3414            Cow::Borrowed("")
3415        };
3416        panic!(
3417            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3418            parent,
3419            child.map_or("unknown", |t| t.get_name()),
3420            transient_reason,
3421        );
3422    }
3423
3424    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3425        // these checks occur in a potentially hot codepath, but they're cheap
3426        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3427            // This should never happen: The collectible APIs use ResolvedVc
3428            let task_info = if let Some(col_task_ty) = collectible
3429                .try_get_task_id()
3430                .map(|t| self.debug_get_task_description(t))
3431            {
3432                Cow::Owned(format!(" (return type of {col_task_ty})"))
3433            } else {
3434                Cow::Borrowed("")
3435            };
3436            panic!("Collectible{task_info} must be a ResolvedVc")
3437        };
3438        if col_task_id.is_transient() && !task_id.is_transient() {
3439            let transient_reason =
3440                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3441                    Cow::Owned(format!(
3442                        ". The collectible is transient because it depends on:\n{}",
3443                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3444                    ))
3445                } else {
3446                    Cow::Borrowed("")
3447                };
3448            // this should never happen: How would a persistent function get a transient Vc?
3449            panic!(
3450                "Collectible is transient, transient collectibles cannot be emitted from \
3451                 persistent tasks{transient_reason}",
3452            )
3453        }
3454    }
3455}
3456
3457impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3458    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3459        self.0.startup(turbo_tasks);
3460    }
3461
3462    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3463        self.0.stopping();
3464    }
3465
3466    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3467        self.0.stop(turbo_tasks);
3468    }
3469
3470    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3471        self.0.idle_start(turbo_tasks);
3472    }
3473
3474    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3475        self.0.idle_end();
3476    }
3477
3478    fn get_or_create_persistent_task(
3479        &self,
3480        task_type: CachedTaskType,
3481        parent_task: Option<TaskId>,
3482        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3483    ) -> TaskId {
3484        self.0
3485            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3486    }
3487
3488    fn get_or_create_transient_task(
3489        &self,
3490        task_type: CachedTaskType,
3491        parent_task: Option<TaskId>,
3492        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3493    ) -> TaskId {
3494        self.0
3495            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3496    }
3497
3498    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3499        self.0.invalidate_task(task_id, turbo_tasks);
3500    }
3501
3502    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3503        self.0.invalidate_tasks(tasks, turbo_tasks);
3504    }
3505
3506    fn invalidate_tasks_set(
3507        &self,
3508        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3509        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3510    ) {
3511        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3512    }
3513
3514    fn invalidate_serialization(
3515        &self,
3516        task_id: TaskId,
3517        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3518    ) {
3519        self.0.invalidate_serialization(task_id, turbo_tasks);
3520    }
3521
3522    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3523        self.0.task_execution_canceled(task, turbo_tasks)
3524    }
3525
3526    fn try_start_task_execution(
3527        &self,
3528        task_id: TaskId,
3529        priority: TaskPriority,
3530        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3531    ) -> Option<TaskExecutionSpec<'_>> {
3532        self.0
3533            .try_start_task_execution(task_id, priority, turbo_tasks)
3534    }
3535
3536    fn task_execution_completed(
3537        &self,
3538        task_id: TaskId,
3539        result: Result<RawVc, TurboTasksExecutionError>,
3540        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3541        #[cfg(feature = "verify_determinism")] stateful: bool,
3542        has_invalidator: bool,
3543        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3544    ) -> bool {
3545        self.0.task_execution_completed(
3546            task_id,
3547            result,
3548            cell_counters,
3549            #[cfg(feature = "verify_determinism")]
3550            stateful,
3551            has_invalidator,
3552            turbo_tasks,
3553        )
3554    }
3555
3556    type BackendJob = TurboTasksBackendJob;
3557
3558    fn run_backend_job<'a>(
3559        &'a self,
3560        job: Self::BackendJob,
3561        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3562    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3563        self.0.run_backend_job(job, turbo_tasks)
3564    }
3565
3566    fn try_read_task_output(
3567        &self,
3568        task_id: TaskId,
3569        reader: Option<TaskId>,
3570        options: ReadOutputOptions,
3571        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3572    ) -> Result<Result<RawVc, EventListener>> {
3573        self.0
3574            .try_read_task_output(task_id, reader, options, turbo_tasks)
3575    }
3576
3577    fn try_read_task_cell(
3578        &self,
3579        task_id: TaskId,
3580        cell: CellId,
3581        reader: Option<TaskId>,
3582        options: ReadCellOptions,
3583        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3584    ) -> Result<Result<TypedCellContent, EventListener>> {
3585        self.0
3586            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3587    }
3588
3589    fn try_read_own_task_cell(
3590        &self,
3591        task_id: TaskId,
3592        cell: CellId,
3593        options: ReadCellOptions,
3594        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3595    ) -> Result<TypedCellContent> {
3596        self.0
3597            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3598    }
3599
3600    fn read_task_collectibles(
3601        &self,
3602        task_id: TaskId,
3603        collectible_type: TraitTypeId,
3604        reader: Option<TaskId>,
3605        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3606    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3607        self.0
3608            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3609    }
3610
3611    fn emit_collectible(
3612        &self,
3613        collectible_type: TraitTypeId,
3614        collectible: RawVc,
3615        task_id: TaskId,
3616        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3617    ) {
3618        self.0
3619            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3620    }
3621
3622    fn unemit_collectible(
3623        &self,
3624        collectible_type: TraitTypeId,
3625        collectible: RawVc,
3626        count: u32,
3627        task_id: TaskId,
3628        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3629    ) {
3630        self.0
3631            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3632    }
3633
3634    fn update_task_cell(
3635        &self,
3636        task_id: TaskId,
3637        cell: CellId,
3638        is_serializable_cell_content: bool,
3639        content: CellContent,
3640        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3641        content_hash: Option<CellHash>,
3642        verification_mode: VerificationMode,
3643        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3644    ) {
3645        self.0.update_task_cell(
3646            task_id,
3647            cell,
3648            is_serializable_cell_content,
3649            content,
3650            updated_key_hashes,
3651            content_hash,
3652            verification_mode,
3653            turbo_tasks,
3654        );
3655    }
3656
3657    fn mark_own_task_as_finished(
3658        &self,
3659        task_id: TaskId,
3660        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3661    ) {
3662        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3663    }
3664
3665    fn mark_own_task_as_session_dependent(
3666        &self,
3667        task: TaskId,
3668        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3669    ) {
3670        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3671    }
3672
3673    fn connect_task(
3674        &self,
3675        task: TaskId,
3676        parent_task: Option<TaskId>,
3677        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3678    ) {
3679        self.0.connect_task(task, parent_task, turbo_tasks);
3680    }
3681
3682    fn create_transient_task(
3683        &self,
3684        task_type: TransientTaskType,
3685        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3686    ) -> TaskId {
3687        self.0.create_transient_task(task_type)
3688    }
3689
3690    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3691        self.0.dispose_root_task(task_id, turbo_tasks);
3692    }
3693
3694    fn task_statistics(&self) -> &TaskStatisticsApi {
3695        &self.0.task_statistics
3696    }
3697
3698    fn is_tracking_dependencies(&self) -> bool {
3699        self.0.options.dependency_tracking
3700    }
3701
3702    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3703        self.0.get_task_name(task, turbo_tasks)
3704    }
3705}
3706
3707enum DebugTraceTransientTask {
3708    Cached {
3709        task_name: &'static str,
3710        cell_type_id: Option<ValueTypeId>,
3711        cause_self: Option<Box<DebugTraceTransientTask>>,
3712        cause_args: Vec<DebugTraceTransientTask>,
3713    },
3714    /// This representation is used when this task is a duplicate of one previously shown
3715    Collapsed {
3716        task_name: &'static str,
3717        cell_type_id: Option<ValueTypeId>,
3718    },
3719    Uncached {
3720        cell_type_id: Option<ValueTypeId>,
3721    },
3722}
3723
3724impl DebugTraceTransientTask {
3725    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3726        let indent = "    ".repeat(level);
3727        f.write_str(&indent)?;
3728
3729        fn fmt_cell_type_id(
3730            f: &mut fmt::Formatter<'_>,
3731            cell_type_id: Option<ValueTypeId>,
3732        ) -> fmt::Result {
3733            if let Some(ty) = cell_type_id {
3734                write!(
3735                    f,
3736                    " (read cell of type {})",
3737                    get_value_type(ty).ty.global_name
3738                )
3739            } else {
3740                Ok(())
3741            }
3742        }
3743
3744        // write the name and type
3745        match self {
3746            Self::Cached {
3747                task_name,
3748                cell_type_id,
3749                ..
3750            }
3751            | Self::Collapsed {
3752                task_name,
3753                cell_type_id,
3754                ..
3755            } => {
3756                f.write_str(task_name)?;
3757                fmt_cell_type_id(f, *cell_type_id)?;
3758                if matches!(self, Self::Collapsed { .. }) {
3759                    f.write_str(" (collapsed)")?;
3760                }
3761            }
3762            Self::Uncached { cell_type_id } => {
3763                f.write_str("unknown transient task")?;
3764                fmt_cell_type_id(f, *cell_type_id)?;
3765            }
3766        }
3767        f.write_char('\n')?;
3768
3769        // write any extra "cause" information we might have
3770        if let Self::Cached {
3771            cause_self,
3772            cause_args,
3773            ..
3774        } = self
3775        {
3776            if let Some(c) = cause_self {
3777                writeln!(f, "{indent}  self:")?;
3778                c.fmt_indented(f, level + 1)?;
3779            }
3780            if !cause_args.is_empty() {
3781                writeln!(f, "{indent}  args:")?;
3782                for c in cause_args {
3783                    c.fmt_indented(f, level + 1)?;
3784                }
3785            }
3786        }
3787        Ok(())
3788    }
3789}
3790
3791impl fmt::Display for DebugTraceTransientTask {
3792    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3793        self.fmt_indented(f, 0)
3794    }
3795}
3796
3797// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3798fn far_future() -> Instant {
3799    // Roughly 30 years from now.
3800    // API does not provide a way to obtain max `Instant`
3801    // or convert specific date in the future to instant.
3802    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3803    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3804}
3805
3806/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3807/// buffer.
3808/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3809/// resulting buffer sizes.
3810///
3811/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3812/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3813/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3814/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3815/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3816/// spurious `Result` threading throughout the encode path.
3817fn encode_task_data(
3818    task: TaskId,
3819    data: &TaskStorage,
3820    category: SpecificTaskDataCategory,
3821    scratch_buffer: &mut TurboBincodeBuffer,
3822) -> Result<TurboBincodeBuffer> {
3823    scratch_buffer.clear();
3824    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3825    data.encode(category, &mut encoder)?;
3826
3827    if cfg!(feature = "verify_serialization") {
3828        TaskStorage::new()
3829            .decode(
3830                category,
3831                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3832            )
3833            .with_context(|| {
3834                format!(
3835                    "expected to be able to decode serialized data for '{category:?}' information \
3836                     for {task}"
3837                )
3838            })?;
3839    }
3840    Ok(SmallVec::from_slice(scratch_buffer))
3841}