Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7    borrow::Cow,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    pin::Pin,
13    sync::{
14        Arc, LazyLock,
15        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16    },
17    time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32    TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33    backend::{
34        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
35        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37        VerificationMode,
38    },
39    event::{Event, EventDescription, EventListener},
40    message_queue::{TimingEvent, TraceEvent},
41    registry::get_value_type,
42    scope::scope_and_block,
43    task_statistics::TaskStatisticsApi,
44    trace::TraceRawVcs,
45    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49    operation::AnyOperation,
50    storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55    backend::{
56        operation::{
57            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58            CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
59            ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
60            TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
61            is_root_node, make_task_dirty_internal, prepare_new_children,
62        },
63        storage::Storage,
64        storage_schema::{TaskStorage, TaskStorageAccessors},
65    },
66    backing_storage::{BackingStorage, SnapshotItem},
67    data::{
68        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69        InProgressState, InProgressStateInner, OutputValue, TransientTask,
70    },
71    error::TaskError,
72    utils::{
73        arc_or_owned::ArcOrOwned,
74        chunked_vec::ChunkedVec,
75        dash_map_drop_contents::drop_contents,
76        dash_map_raw_entry::{RawEntry, raw_entry},
77        ptr_eq_arc::PtrEqArc,
78        shard_amount::compute_shard_amount,
79        sharded::Sharded,
80        swap_retain,
81    },
82};
83
84/// Threshold for parallelizing making dependent tasks dirty.
85/// If the number of dependent tasks exceeds this threshold,
86/// the operation will be parallelized.
87const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
88
89const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
90
91/// Configurable idle timeout for snapshot persistence.
92/// Defaults to 2 seconds if not set or if the value is invalid.
93static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
94    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
95        .ok()
96        .and_then(|v| v.parse::<u64>().ok())
97        .map(Duration::from_millis)
98        .unwrap_or(Duration::from_secs(2))
99});
100
101struct SnapshotRequest {
102    snapshot_requested: bool,
103    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
104}
105
106impl SnapshotRequest {
107    fn new() -> Self {
108        Self {
109            snapshot_requested: false,
110            suspended_operations: FxHashSet::default(),
111        }
112    }
113}
114
115pub enum StorageMode {
116    /// Queries the storage for cache entries that don't exist locally.
117    ReadOnly,
118    /// Queries the storage for cache entries that don't exist locally.
119    /// Regularly pushes changes to the backing storage.
120    ReadWrite,
121    /// Queries the storage for cache entries that don't exist locally.
122    /// On shutdown, pushes all changes to the backing storage.
123    ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127    /// Enables dependency tracking.
128    ///
129    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
130    /// forever.
131    pub dependency_tracking: bool,
132
133    /// Enables active tracking.
134    ///
135    /// Automatically disabled when `dependency_tracking` is disabled.
136    ///
137    /// When disabled: All tasks are considered as active.
138    pub active_tracking: bool,
139
140    /// Enables the backing storage.
141    pub storage_mode: Option<StorageMode>,
142
143    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
144    /// datastructures. If `None`, it will use the available parallelism.
145    pub num_workers: Option<usize>,
146
147    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
148    pub small_preallocation: bool,
149}
150
151impl Default for BackendOptions {
152    fn default() -> Self {
153        Self {
154            dependency_tracking: true,
155            active_tracking: true,
156            storage_mode: Some(StorageMode::ReadWrite),
157            num_workers: None,
158            small_preallocation: false,
159        }
160    }
161}
162
163pub enum TurboTasksBackendJob {
164    InitialSnapshot,
165    FollowUpSnapshot,
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173    options: BackendOptions,
174
175    start_time: Instant,
176
177    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
178    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
179
180    persisted_task_cache_log: Option<TaskCacheLog>,
181    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
182
183    storage: Storage,
184
185    /// When true, the backing_storage has data that is not in the local storage.
186    /// This is determined once at startup and never changes.
187    local_is_partial: bool,
188
189    /// Number of executing operations + Highest bit is set when snapshot is
190    /// requested. When that bit is set, operations should pause until the
191    /// snapshot is completed. When the bit is set and in progress counter
192    /// reaches zero, `operations_completed_when_snapshot_requested` is
193    /// triggered.
194    in_progress_operations: AtomicUsize,
195
196    snapshot_request: Mutex<SnapshotRequest>,
197    /// Condition Variable that is triggered when `in_progress_operations`
198    /// reaches zero while snapshot is requested. All operations are either
199    /// completed or suspended.
200    operations_suspended: Condvar,
201    /// Condition Variable that is triggered when a snapshot is completed and
202    /// operations can continue.
203    snapshot_completed: Condvar,
204    /// The timestamp of the last started snapshot since [`Self::start_time`].
205    last_snapshot: AtomicU64,
206
207    stopping: AtomicBool,
208    stopping_event: Event,
209    idle_start_event: Event,
210    idle_end_event: Event,
211    #[cfg(feature = "verify_aggregation_graph")]
212    is_idle: AtomicBool,
213
214    task_statistics: TaskStatisticsApi,
215
216    backing_storage: B,
217
218    #[cfg(feature = "verify_aggregation_graph")]
219    root_tasks: Mutex<FxHashSet<TaskId>>,
220}
221
222impl<B: BackingStorage> TurboTasksBackend<B> {
223    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
224        Self(Arc::new(TurboTasksBackendInner::new(
225            options,
226            backing_storage,
227        )))
228    }
229
230    pub fn backing_storage(&self) -> &B {
231        &self.0.backing_storage
232    }
233}
234
235impl<B: BackingStorage> TurboTasksBackendInner<B> {
236    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
237        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
238        let need_log = matches!(
239            options.storage_mode,
240            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
241        );
242        if !options.dependency_tracking {
243            options.active_tracking = false;
244        }
245        let small_preallocation = options.small_preallocation;
246        let next_task_id = backing_storage
247            .next_free_task_id()
248            .expect("Failed to get task id");
249        Self {
250            options,
251            start_time: Instant::now(),
252            persisted_task_id_factory: IdFactoryWithReuse::new(
253                next_task_id,
254                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
255            ),
256            transient_task_id_factory: IdFactoryWithReuse::new(
257                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
258                TaskId::MAX,
259            ),
260            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
261            task_cache: FxDashMap::default(),
262            local_is_partial: next_task_id != TaskId::MIN,
263            storage: Storage::new(shard_amount, small_preallocation),
264            in_progress_operations: AtomicUsize::new(0),
265            snapshot_request: Mutex::new(SnapshotRequest::new()),
266            operations_suspended: Condvar::new(),
267            snapshot_completed: Condvar::new(),
268            last_snapshot: AtomicU64::new(0),
269            stopping: AtomicBool::new(false),
270            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
271            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
272            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
273            #[cfg(feature = "verify_aggregation_graph")]
274            is_idle: AtomicBool::new(false),
275            task_statistics: TaskStatisticsApi::default(),
276            backing_storage,
277            #[cfg(feature = "verify_aggregation_graph")]
278            root_tasks: Default::default(),
279        }
280    }
281
282    fn execute_context<'a>(
283        &'a self,
284        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
285    ) -> impl ExecuteContext<'a> {
286        ExecuteContextImpl::new(self, turbo_tasks)
287    }
288
289    fn suspending_requested(&self) -> bool {
290        self.should_persist()
291            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
292    }
293
294    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
295        #[cold]
296        fn operation_suspend_point_cold<B: BackingStorage>(
297            this: &TurboTasksBackendInner<B>,
298            suspend: impl FnOnce() -> AnyOperation,
299        ) {
300            let operation = Arc::new(suspend());
301            let mut snapshot_request = this.snapshot_request.lock();
302            if snapshot_request.snapshot_requested {
303                snapshot_request
304                    .suspended_operations
305                    .insert(operation.clone().into());
306                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
307                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
308                if value == SNAPSHOT_REQUESTED_BIT {
309                    this.operations_suspended.notify_all();
310                }
311                this.snapshot_completed
312                    .wait_while(&mut snapshot_request, |snapshot_request| {
313                        snapshot_request.snapshot_requested
314                    });
315                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316                snapshot_request
317                    .suspended_operations
318                    .remove(&operation.into());
319            }
320        }
321
322        if self.suspending_requested() {
323            operation_suspend_point_cold(self, suspend);
324        }
325    }
326
327    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
328        if !self.should_persist() {
329            return OperationGuard { backend: None };
330        }
331        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
332        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
333            let mut snapshot_request = self.snapshot_request.lock();
334            if snapshot_request.snapshot_requested {
335                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
336                if value == SNAPSHOT_REQUESTED_BIT {
337                    self.operations_suspended.notify_all();
338                }
339                self.snapshot_completed
340                    .wait_while(&mut snapshot_request, |snapshot_request| {
341                        snapshot_request.snapshot_requested
342                    });
343                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
344            }
345        }
346        OperationGuard {
347            backend: Some(self),
348        }
349    }
350
351    fn should_persist(&self) -> bool {
352        matches!(
353            self.options.storage_mode,
354            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
355        )
356    }
357
358    fn should_restore(&self) -> bool {
359        self.options.storage_mode.is_some()
360    }
361
362    fn should_track_dependencies(&self) -> bool {
363        self.options.dependency_tracking
364    }
365
366    fn should_track_activeness(&self) -> bool {
367        self.options.active_tracking
368    }
369
370    fn track_cache_hit(&self, task_type: &CachedTaskType) {
371        self.task_statistics
372            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
373    }
374
375    fn track_cache_miss(&self, task_type: &CachedTaskType) {
376        self.task_statistics
377            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
378    }
379
380    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
381    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
382    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
383    /// references for lazy name resolution.
384    fn task_error_to_turbo_tasks_execution_error(
385        &self,
386        error: &TaskError,
387        ctx: &mut impl ExecuteContext<'_>,
388    ) -> TurboTasksExecutionError {
389        match error {
390            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
391            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
392                message: item.message.clone(),
393                source: item
394                    .source
395                    .as_ref()
396                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
397            })),
398            TaskError::LocalTaskContext(local_task_context) => {
399                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
400                    name: local_task_context.name.clone(),
401                    source: local_task_context
402                        .source
403                        .as_ref()
404                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
405                }))
406            }
407            TaskError::TaskChain(chain) => {
408                let task_id = chain.last().unwrap();
409                let error = {
410                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
411                    if let Some(OutputValue::Error(error)) = task.get_output() {
412                        Some(error.clone())
413                    } else {
414                        None
415                    }
416                };
417                let error = error.map_or_else(
418                    || {
419                        // Eventual consistency will cause errors to no longer be available
420                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
421                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
422                                "Error no longer available",
423                            )),
424                            location: None,
425                        }))
426                    },
427                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
428                );
429                let mut current_error = error;
430                for &task_id in chain.iter().rev() {
431                    current_error =
432                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
433                            task_id,
434                            source: Some(current_error),
435                            turbo_tasks: ctx.turbo_tasks(),
436                        }));
437                }
438                current_error
439            }
440        }
441    }
442}
443
444pub(crate) struct OperationGuard<'a, B: BackingStorage> {
445    backend: Option<&'a TurboTasksBackendInner<B>>,
446}
447
448impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
449    fn drop(&mut self) {
450        if let Some(backend) = self.backend {
451            let fetch_sub = backend
452                .in_progress_operations
453                .fetch_sub(1, Ordering::AcqRel);
454            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
455                backend.operations_suspended.notify_all();
456            }
457        }
458    }
459}
460
461/// Intermediate result of step 1 of task execution completion.
462struct TaskExecutionCompletePrepareResult {
463    pub new_children: FxHashSet<TaskId>,
464    pub is_now_immutable: bool,
465    #[cfg(feature = "verify_determinism")]
466    pub no_output_set: bool,
467    pub new_output: Option<OutputValue>,
468    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
469}
470
471// Operations
472impl<B: BackingStorage> TurboTasksBackendInner<B> {
473    fn connect_child(
474        &self,
475        parent_task: Option<TaskId>,
476        child_task: TaskId,
477        task_type: Option<ArcOrOwned<CachedTaskType>>,
478        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
479    ) {
480        operation::ConnectChildOperation::run(
481            parent_task,
482            child_task,
483            task_type,
484            self.execute_context(turbo_tasks),
485        );
486    }
487
488    fn try_read_task_output(
489        self: &Arc<Self>,
490        task_id: TaskId,
491        reader: Option<TaskId>,
492        options: ReadOutputOptions,
493        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
494    ) -> Result<Result<RawVc, EventListener>> {
495        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
496
497        let mut ctx = self.execute_context(turbo_tasks);
498        let need_reader_task = if self.should_track_dependencies()
499            && !matches!(options.tracking, ReadTracking::Untracked)
500            && reader.is_some_and(|reader_id| reader_id != task_id)
501            && let Some(reader_id) = reader
502            && reader_id != task_id
503        {
504            Some(reader_id)
505        } else {
506            None
507        };
508        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
509            // Having a task_pair here is not optimal, but otherwise this would lead to a race
510            // condition. See below.
511            // TODO(sokra): solve that in a more performant way.
512            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
513            (task, Some(reader))
514        } else {
515            (ctx.task(task_id, TaskDataCategory::All), None)
516        };
517
518        fn listen_to_done_event(
519            reader_description: Option<EventDescription>,
520            tracking: ReadTracking,
521            done_event: &Event,
522        ) -> EventListener {
523            done_event.listen_with_note(move || {
524                move || {
525                    if let Some(reader_description) = reader_description.as_ref() {
526                        format!(
527                            "try_read_task_output from {} ({})",
528                            reader_description, tracking
529                        )
530                    } else {
531                        format!("try_read_task_output ({})", tracking)
532                    }
533                }
534            })
535        }
536
537        fn check_in_progress(
538            task: &impl TaskGuard,
539            reader_description: Option<EventDescription>,
540            tracking: ReadTracking,
541        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
542        {
543            match task.get_in_progress() {
544                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
545                    listen_to_done_event(reader_description, tracking, done_event),
546                ))),
547                Some(InProgressState::InProgress(box InProgressStateInner {
548                    done_event, ..
549                })) => Some(Ok(Err(listen_to_done_event(
550                    reader_description,
551                    tracking,
552                    done_event,
553                )))),
554                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
555                    "{} was canceled",
556                    task.get_task_description()
557                ))),
558                None => None,
559            }
560        }
561
562        if matches!(options.consistency, ReadConsistency::Strong) {
563            // Ensure it's an root node
564            loop {
565                let aggregation_number = get_aggregation_number(&task);
566                if is_root_node(aggregation_number) {
567                    break;
568                }
569                drop(task);
570                drop(reader_task);
571                {
572                    let _span = tracing::trace_span!(
573                        "make root node for strongly consistent read",
574                        %task_id
575                    )
576                    .entered();
577                    AggregationUpdateQueue::run(
578                        AggregationUpdateJob::UpdateAggregationNumber {
579                            task_id,
580                            base_aggregation_number: u32::MAX,
581                            distance: None,
582                        },
583                        &mut ctx,
584                    );
585                }
586                (task, reader_task) = if let Some(reader_id) = need_reader_task {
587                    // TODO(sokra): see comment above
588                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
589                    (task, Some(reader))
590                } else {
591                    (ctx.task(task_id, TaskDataCategory::All), None)
592                }
593            }
594
595            let is_dirty = task.is_dirty();
596
597            // Check the dirty count of the root node
598            let has_dirty_containers = task.has_dirty_containers();
599            if has_dirty_containers || is_dirty.is_some() {
600                let activeness = task.get_activeness_mut();
601                let mut task_ids_to_schedule: Vec<_> = Vec::new();
602                // When there are dirty task, subscribe to the all_clean_event
603                let activeness = if let Some(activeness) = activeness {
604                    // This makes sure all tasks stay active and this task won't stale.
605                    // active_until_clean is automatically removed when this
606                    // task is clean.
607                    activeness.set_active_until_clean();
608                    activeness
609                } else {
610                    // If we don't have a root state, add one. This also makes sure all tasks stay
611                    // active and this task won't stale. active_until_clean
612                    // is automatically removed when this task is clean.
613                    if ctx.should_track_activeness() {
614                        // A newly added Activeness need to make sure to schedule the tasks
615                        task_ids_to_schedule = task.dirty_containers().collect();
616                        task_ids_to_schedule.push(task_id);
617                    }
618                    let activeness =
619                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
620                    activeness.set_active_until_clean();
621                    activeness
622                };
623                let listener = activeness.all_clean_event.listen_with_note(move || {
624                    let this = self.clone();
625                    let tt = turbo_tasks.pin();
626                    move || {
627                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
628                        let mut ctx = this.execute_context(tt);
629                        let mut visited = FxHashSet::default();
630                        fn indent(s: &str) -> String {
631                            s.split_inclusive('\n')
632                                .flat_map(|line: &str| ["  ", line].into_iter())
633                                .collect::<String>()
634                        }
635                        fn get_info(
636                            ctx: &mut impl ExecuteContext<'_>,
637                            task_id: TaskId,
638                            parent_and_count: Option<(TaskId, i32)>,
639                            visited: &mut FxHashSet<TaskId>,
640                        ) -> String {
641                            let task = ctx.task(task_id, TaskDataCategory::All);
642                            let is_dirty = task.is_dirty();
643                            let in_progress =
644                                task.get_in_progress()
645                                    .map_or("not in progress", |p| match p {
646                                        InProgressState::InProgress(_) => "in progress",
647                                        InProgressState::Scheduled { .. } => "scheduled",
648                                        InProgressState::Canceled => "canceled",
649                                    });
650                            let activeness = task.get_activeness().map_or_else(
651                                || "not active".to_string(),
652                                |activeness| format!("{activeness:?}"),
653                            );
654                            let aggregation_number = get_aggregation_number(&task);
655                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
656                            {
657                                let uppers = get_uppers(&task);
658                                !uppers.contains(&parent_task_id)
659                            } else {
660                                false
661                            };
662
663                            // Check the dirty count of the root node
664                            let has_dirty_containers = task.has_dirty_containers();
665
666                            let task_description = task.get_task_description();
667                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
668                                format!(", dirty({parent_priority})")
669                            } else {
670                                String::new()
671                            };
672                            let has_dirty_containers_label = if has_dirty_containers {
673                                ", dirty containers"
674                            } else {
675                                ""
676                            };
677                            let count = if let Some((_, count)) = parent_and_count {
678                                format!(" {count}")
679                            } else {
680                                String::new()
681                            };
682                            let mut info = format!(
683                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
684                                 {in_progress}, \
685                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
686                            );
687                            let children: Vec<_> = task.dirty_containers_with_count().collect();
688                            drop(task);
689
690                            if missing_upper {
691                                info.push_str("\n  ERROR: missing upper connection");
692                            }
693
694                            if has_dirty_containers || !children.is_empty() {
695                                writeln!(info, "\n  dirty tasks:").unwrap();
696
697                                for (child_task_id, count) in children {
698                                    let task_description = ctx
699                                        .task(child_task_id, TaskDataCategory::Data)
700                                        .get_task_description();
701                                    if visited.insert(child_task_id) {
702                                        let child_info = get_info(
703                                            ctx,
704                                            child_task_id,
705                                            Some((task_id, count)),
706                                            visited,
707                                        );
708                                        info.push_str(&indent(&child_info));
709                                        if !info.ends_with('\n') {
710                                            info.push('\n');
711                                        }
712                                    } else {
713                                        writeln!(
714                                            info,
715                                            "  {child_task_id} {task_description} {count} \
716                                             (already visited)"
717                                        )
718                                        .unwrap();
719                                    }
720                                }
721                            }
722                            info
723                        }
724                        let info = get_info(&mut ctx, task_id, None, &mut visited);
725                        format!(
726                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
727                        )
728                    }
729                });
730                drop(reader_task);
731                drop(task);
732                if !task_ids_to_schedule.is_empty() {
733                    let mut queue = AggregationUpdateQueue::new();
734                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
735                    queue.execute(&mut ctx);
736                }
737
738                return Ok(Err(listener));
739            }
740        }
741
742        let reader_description = reader_task
743            .as_ref()
744            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
745        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
746        {
747            return value;
748        }
749
750        if let Some(output) = task.get_output() {
751            let result = match output {
752                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
753                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
754                OutputValue::Error(error) => Err(error.clone()),
755            };
756            if let Some(mut reader_task) = reader_task.take()
757                && options.tracking.should_track(result.is_err())
758                && (!task.immutable() || cfg!(feature = "verify_immutable"))
759            {
760                #[cfg(feature = "trace_task_output_dependencies")]
761                let _span = tracing::trace_span!(
762                    "add output dependency",
763                    task = %task_id,
764                    dependent_task = ?reader
765                )
766                .entered();
767                let mut queue = LeafDistanceUpdateQueue::new();
768                let reader = reader.unwrap();
769                if task.add_output_dependent(reader) {
770                    // Ensure that dependent leaf distance is strictly monotonic increasing
771                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
772                    let reader_leaf_distance =
773                        reader_task.get_leaf_distance().copied().unwrap_or_default();
774                    if reader_leaf_distance.distance <= leaf_distance.distance {
775                        queue.push(
776                            reader,
777                            leaf_distance.distance,
778                            leaf_distance.max_distance_in_buffer,
779                        );
780                    }
781                }
782
783                drop(task);
784
785                // Note: We use `task_pair` earlier to lock the task and its reader at the same
786                // time. If we didn't and just locked the reader here, an invalidation could occur
787                // between grabbing the locks. If that happened, and if the task is "outdated" or
788                // doesn't have the dependency edge yet, the invalidation would be lost.
789
790                if !reader_task.remove_outdated_output_dependencies(&task_id) {
791                    let _ = reader_task.add_output_dependencies(task_id);
792                }
793                drop(reader_task);
794
795                queue.execute(&mut ctx);
796            } else {
797                drop(task);
798            }
799
800            return result.map_err(|error| {
801                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
802                    .with_task_context(task_id, turbo_tasks.pin())
803                    .into()
804            });
805        }
806        drop(reader_task);
807
808        let note = EventDescription::new(|| {
809            move || {
810                if let Some(reader) = reader_description.as_ref() {
811                    format!("try_read_task_output (recompute) from {reader}",)
812                } else {
813                    "try_read_task_output (recompute, untracked)".to_string()
814                }
815            }
816        });
817
818        // Output doesn't exist. We need to schedule the task to compute it.
819        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
820            TaskExecutionReason::OutputNotAvailable,
821            EventDescription::new(|| task.get_task_desc_fn()),
822            note,
823        );
824
825        // It's not possible that the task is InProgress at this point. If it is InProgress {
826        // done: true } it must have Output and would early return.
827        let old = task.set_in_progress(in_progress_state);
828        debug_assert!(old.is_none(), "InProgress already exists");
829        ctx.schedule_task(task, TaskPriority::Initial);
830
831        Ok(Err(listener))
832    }
833
834    fn try_read_task_cell(
835        &self,
836        task_id: TaskId,
837        reader: Option<TaskId>,
838        cell: CellId,
839        options: ReadCellOptions,
840        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
841    ) -> Result<Result<TypedCellContent, EventListener>> {
842        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
843
844        fn add_cell_dependency(
845            task_id: TaskId,
846            mut task: impl TaskGuard,
847            reader: Option<TaskId>,
848            reader_task: Option<impl TaskGuard>,
849            cell: CellId,
850            key: Option<u64>,
851        ) {
852            if let Some(mut reader_task) = reader_task
853                && (!task.immutable() || cfg!(feature = "verify_immutable"))
854            {
855                let reader = reader.unwrap();
856                let _ = task.add_cell_dependents((cell, key, reader));
857                drop(task);
858
859                // Note: We use `task_pair` earlier to lock the task and its reader at the same
860                // time. If we didn't and just locked the reader here, an invalidation could occur
861                // between grabbing the locks. If that happened, and if the task is "outdated" or
862                // doesn't have the dependency edge yet, the invalidation would be lost.
863
864                let target = CellRef {
865                    task: task_id,
866                    cell,
867                };
868                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
869                    let _ = reader_task.add_cell_dependencies((target, key));
870                }
871                drop(reader_task);
872            }
873        }
874
875        let ReadCellOptions {
876            is_serializable_cell_content,
877            tracking,
878            final_read_hint,
879        } = options;
880
881        let mut ctx = self.execute_context(turbo_tasks);
882        let (mut task, reader_task) = if self.should_track_dependencies()
883            && !matches!(tracking, ReadCellTracking::Untracked)
884            && let Some(reader_id) = reader
885            && reader_id != task_id
886        {
887            // Having a task_pair here is not optimal, but otherwise this would lead to a race
888            // condition. See below.
889            // TODO(sokra): solve that in a more performant way.
890            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
891            (task, Some(reader))
892        } else {
893            (ctx.task(task_id, TaskDataCategory::All), None)
894        };
895
896        let content = if final_read_hint {
897            task.remove_cell_data(is_serializable_cell_content, cell)
898        } else {
899            task.get_cell_data(is_serializable_cell_content, cell)
900        };
901        if let Some(content) = content {
902            if tracking.should_track(false) {
903                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
904            }
905            return Ok(Ok(TypedCellContent(
906                cell.type_id,
907                CellContent(Some(content.reference)),
908            )));
909        }
910
911        let in_progress = task.get_in_progress();
912        if matches!(
913            in_progress,
914            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
915        ) {
916            return Ok(Err(self
917                .listen_to_cell(&mut task, task_id, &reader_task, cell)
918                .0));
919        }
920        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
921
922        // Check cell index range (cell might not exist at all)
923        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
924        let Some(max_id) = max_id else {
925            let task_desc = task.get_task_description();
926            if tracking.should_track(true) {
927                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
928            }
929            bail!(
930                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
931            );
932        };
933        if cell.index >= max_id {
934            let task_desc = task.get_task_description();
935            if tracking.should_track(true) {
936                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
937            }
938            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
939        }
940
941        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
942        // task the get the cell content.
943
944        // Listen to the cell and potentially schedule the task
945        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
946        drop(reader_task);
947        if !new_listener {
948            return Ok(Err(listener));
949        }
950
951        let _span = tracing::trace_span!(
952            "recomputation",
953            cell_type = get_value_type(cell.type_id).ty.global_name,
954            cell_index = cell.index
955        )
956        .entered();
957
958        // Schedule the task, if not already scheduled
959        if is_cancelled {
960            bail!("{} was canceled", task.get_task_description());
961        }
962
963        let _ = task.add_scheduled(
964            TaskExecutionReason::CellNotAvailable,
965            EventDescription::new(|| task.get_task_desc_fn()),
966        );
967        ctx.schedule_task(task, TaskPriority::Initial);
968
969        Ok(Err(listener))
970    }
971
972    fn listen_to_cell(
973        &self,
974        task: &mut impl TaskGuard,
975        task_id: TaskId,
976        reader_task: &Option<impl TaskGuard>,
977        cell: CellId,
978    ) -> (EventListener, bool) {
979        let note = || {
980            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
981            move || {
982                if let Some(reader_desc) = reader_desc.as_ref() {
983                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
984                } else {
985                    "try_read_task_cell (in progress, untracked)".to_string()
986                }
987            }
988        };
989        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
990            // Someone else is already computing the cell
991            let listener = in_progress.event.listen_with_note(note);
992            return (listener, false);
993        }
994        let in_progress = InProgressCellState::new(task_id, cell);
995        let listener = in_progress.event.listen_with_note(note);
996        let old = task.insert_in_progress_cells(cell, in_progress);
997        debug_assert!(old.is_none(), "InProgressCell already exists");
998        (listener, true)
999    }
1000
1001    fn snapshot_and_persist(
1002        &self,
1003        parent_span: Option<tracing::Id>,
1004        reason: &str,
1005        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1006    ) -> Option<(Instant, bool)> {
1007        let snapshot_span =
1008            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1009                .entered();
1010        let start = Instant::now();
1011        // SystemTime for wall-clock timestamps in trace events (milliseconds
1012        // since epoch). Instant is monotonic but has no defined epoch, so it
1013        // can't be used for cross-process trace correlation.
1014        let wall_start = SystemTime::now();
1015        debug_assert!(self.should_persist());
1016
1017        let suspended_operations;
1018        {
1019            let _span = tracing::info_span!("blocking").entered();
1020            let mut snapshot_request = self.snapshot_request.lock();
1021            snapshot_request.snapshot_requested = true;
1022            let active_operations = self
1023                .in_progress_operations
1024                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1025            if active_operations != 0 {
1026                self.operations_suspended
1027                    .wait_while(&mut snapshot_request, |_| {
1028                        self.in_progress_operations.load(Ordering::Relaxed)
1029                            != SNAPSHOT_REQUESTED_BIT
1030                    });
1031            }
1032            suspended_operations = snapshot_request
1033                .suspended_operations
1034                .iter()
1035                .map(|op| op.arc().clone())
1036                .collect::<Vec<_>>();
1037        }
1038        self.storage.start_snapshot();
1039        let mut persisted_task_cache_log = self
1040            .persisted_task_cache_log
1041            .as_ref()
1042            .map(|l| l.take(|i| i))
1043            .unwrap_or_default();
1044        let mut snapshot_request = self.snapshot_request.lock();
1045        snapshot_request.snapshot_requested = false;
1046        self.in_progress_operations
1047            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1048        self.snapshot_completed.notify_all();
1049        let snapshot_time = Instant::now();
1050        drop(snapshot_request);
1051
1052        #[cfg(feature = "print_cache_item_size")]
1053        #[derive(Default)]
1054        struct TaskCacheStats {
1055            data: usize,
1056            #[cfg(feature = "print_cache_item_size_with_compressed")]
1057            data_compressed: usize,
1058            data_count: usize,
1059            meta: usize,
1060            #[cfg(feature = "print_cache_item_size_with_compressed")]
1061            meta_compressed: usize,
1062            meta_count: usize,
1063            upper_count: usize,
1064            collectibles_count: usize,
1065            aggregated_collectibles_count: usize,
1066            children_count: usize,
1067            followers_count: usize,
1068            collectibles_dependents_count: usize,
1069            aggregated_dirty_containers_count: usize,
1070            output_size: usize,
1071        }
1072        /// Formats a byte size, optionally including the compressed size when the
1073        /// `print_cache_item_size_with_compressed` feature is enabled.
1074        #[cfg(feature = "print_cache_item_size")]
1075        struct FormatSizes {
1076            size: usize,
1077            #[cfg(feature = "print_cache_item_size_with_compressed")]
1078            compressed_size: usize,
1079        }
1080        #[cfg(feature = "print_cache_item_size")]
1081        impl std::fmt::Display for FormatSizes {
1082            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1083                use turbo_tasks::util::FormatBytes;
1084                #[cfg(feature = "print_cache_item_size_with_compressed")]
1085                {
1086                    write!(
1087                        f,
1088                        "{} ({} compressed)",
1089                        FormatBytes(self.size),
1090                        FormatBytes(self.compressed_size)
1091                    )
1092                }
1093                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1094                {
1095                    write!(f, "{}", FormatBytes(self.size))
1096                }
1097            }
1098        }
1099        #[cfg(feature = "print_cache_item_size")]
1100        impl TaskCacheStats {
1101            #[cfg(feature = "print_cache_item_size_with_compressed")]
1102            fn compressed_size(data: &[u8]) -> Result<usize> {
1103                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1104                    data,
1105                    &mut Vec::new(),
1106                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1107                )?)
1108            }
1109
1110            fn add_data(&mut self, data: &[u8]) {
1111                self.data += data.len();
1112                #[cfg(feature = "print_cache_item_size_with_compressed")]
1113                {
1114                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1115                }
1116                self.data_count += 1;
1117            }
1118
1119            fn add_meta(&mut self, data: &[u8]) {
1120                self.meta += data.len();
1121                #[cfg(feature = "print_cache_item_size_with_compressed")]
1122                {
1123                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1124                }
1125                self.meta_count += 1;
1126            }
1127
1128            fn add_counts(&mut self, storage: &TaskStorage) {
1129                let counts = storage.meta_counts();
1130                self.upper_count += counts.upper;
1131                self.collectibles_count += counts.collectibles;
1132                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1133                self.children_count += counts.children;
1134                self.followers_count += counts.followers;
1135                self.collectibles_dependents_count += counts.collectibles_dependents;
1136                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1137                if let Some(output) = storage.get_output() {
1138                    use turbo_bincode::turbo_bincode_encode;
1139
1140                    self.output_size += turbo_bincode_encode(&output)
1141                        .map(|data| data.len())
1142                        .unwrap_or(0);
1143                }
1144            }
1145
1146            /// Returns the task name used as the stats grouping key.
1147            fn task_name(storage: &TaskStorage) -> String {
1148                storage
1149                    .get_persistent_task_type()
1150                    .map(|t| t.to_string())
1151                    .unwrap_or_else(|| "<unknown>".to_string())
1152            }
1153
1154            /// Returns the primary sort key: compressed total when
1155            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1156            fn sort_key(&self) -> usize {
1157                #[cfg(feature = "print_cache_item_size_with_compressed")]
1158                {
1159                    self.data_compressed + self.meta_compressed
1160                }
1161                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1162                {
1163                    self.data + self.meta
1164                }
1165            }
1166
1167            fn format_total(&self) -> FormatSizes {
1168                FormatSizes {
1169                    size: self.data + self.meta,
1170                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1171                    compressed_size: self.data_compressed + self.meta_compressed,
1172                }
1173            }
1174
1175            fn format_data(&self) -> FormatSizes {
1176                FormatSizes {
1177                    size: self.data,
1178                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1179                    compressed_size: self.data_compressed,
1180                }
1181            }
1182
1183            fn format_avg_data(&self) -> FormatSizes {
1184                FormatSizes {
1185                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1186                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1187                    compressed_size: self
1188                        .data_compressed
1189                        .checked_div(self.data_count)
1190                        .unwrap_or(0),
1191                }
1192            }
1193
1194            fn format_meta(&self) -> FormatSizes {
1195                FormatSizes {
1196                    size: self.meta,
1197                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1198                    compressed_size: self.meta_compressed,
1199                }
1200            }
1201
1202            fn format_avg_meta(&self) -> FormatSizes {
1203                FormatSizes {
1204                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1205                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1206                    compressed_size: self
1207                        .meta_compressed
1208                        .checked_div(self.meta_count)
1209                        .unwrap_or(0),
1210                }
1211            }
1212        }
1213        #[cfg(feature = "print_cache_item_size")]
1214        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1215            Mutex::new(FxHashMap::default());
1216
1217        // Encode each task's modified categories. We only encode categories with `modified` set,
1218        // meaning the category was actually dirtied. Categories restored from disk but never
1219        // modified don't need re-persisting since the on-disk version is still valid.
1220        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1221        // flags were copied from the live task at snapshot creation time, reflecting which
1222        // categories were dirtied before the snapshot was taken.
1223        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1224            let encode_category = |task_id: TaskId,
1225                                   data: &TaskStorage,
1226                                   category: SpecificTaskDataCategory,
1227                                   buffer: &mut TurboBincodeBuffer|
1228             -> Option<TurboBincodeBuffer> {
1229                match encode_task_data(task_id, data, category, buffer) {
1230                    Ok(encoded) => {
1231                        #[cfg(feature = "print_cache_item_size")]
1232                        {
1233                            let mut stats = task_cache_stats.lock();
1234                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1235                            match category {
1236                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1237                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1238                            }
1239                        }
1240                        Some(encoded)
1241                    }
1242                    Err(err) => {
1243                        eprintln!(
1244                            "Serializing task {} failed ({:?}): {:?}",
1245                            self.debug_get_task_description(task_id),
1246                            category,
1247                            err
1248                        );
1249                        None
1250                    }
1251                }
1252            };
1253            if task_id.is_transient() {
1254                unreachable!("transient task_ids should never be enqueued to be persisted");
1255            }
1256
1257            let encode_meta = inner.flags.meta_modified();
1258            let encode_data = inner.flags.data_modified();
1259
1260            #[cfg(feature = "print_cache_item_size")]
1261            if encode_data || encode_meta {
1262                task_cache_stats
1263                    .lock()
1264                    .entry(TaskCacheStats::task_name(inner))
1265                    .or_default()
1266                    .add_counts(inner);
1267            }
1268
1269            let meta = if encode_meta {
1270                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1271            } else {
1272                None
1273            };
1274
1275            let data = if encode_data {
1276                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1277            } else {
1278                None
1279            };
1280
1281            SnapshotItem {
1282                task_id,
1283                meta,
1284                data,
1285            }
1286        };
1287
1288        // take_snapshot already filters empty items and empty shards in parallel
1289        let task_snapshots = self.storage.take_snapshot(&process);
1290
1291        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1292
1293        drop(snapshot_span);
1294        let snapshot_duration = start.elapsed();
1295        let task_count = task_snapshots.len();
1296
1297        if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1298            return Some((snapshot_time, false));
1299        }
1300
1301        let persist_start = Instant::now();
1302        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1303        {
1304            if let Err(err) = self.backing_storage.save_snapshot(
1305                suspended_operations,
1306                persisted_task_cache_log,
1307                task_snapshots,
1308            ) {
1309                eprintln!("Persisting failed: {err:?}");
1310                return None;
1311            }
1312            #[cfg(feature = "print_cache_item_size")]
1313            {
1314                let mut task_cache_stats = task_cache_stats
1315                    .into_inner()
1316                    .into_iter()
1317                    .collect::<Vec<_>>();
1318                if !task_cache_stats.is_empty() {
1319                    use turbo_tasks::util::FormatBytes;
1320
1321                    use crate::utils::markdown_table::print_markdown_table;
1322
1323                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1324                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1325                    });
1326
1327                    println!(
1328                        "Task cache stats: {}",
1329                        FormatSizes {
1330                            size: task_cache_stats
1331                                .iter()
1332                                .map(|(_, s)| s.data + s.meta)
1333                                .sum::<usize>(),
1334                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1335                            compressed_size: task_cache_stats
1336                                .iter()
1337                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1338                                .sum::<usize>()
1339                        },
1340                    );
1341
1342                    print_markdown_table(
1343                        [
1344                            "Task",
1345                            " Total Size",
1346                            " Data Size",
1347                            " Data Count x Avg",
1348                            " Data Count x Avg",
1349                            " Meta Size",
1350                            " Meta Count x Avg",
1351                            " Meta Count x Avg",
1352                            " Uppers",
1353                            " Coll",
1354                            " Agg Coll",
1355                            " Children",
1356                            " Followers",
1357                            " Coll Deps",
1358                            " Agg Dirty",
1359                            " Output Size",
1360                        ],
1361                        task_cache_stats.iter(),
1362                        |(task_desc, stats)| {
1363                            [
1364                                task_desc.to_string(),
1365                                format!(" {}", stats.format_total()),
1366                                format!(" {}", stats.format_data()),
1367                                format!(" {} x", stats.data_count),
1368                                format!("{}", stats.format_avg_data()),
1369                                format!(" {}", stats.format_meta()),
1370                                format!(" {} x", stats.meta_count),
1371                                format!("{}", stats.format_avg_meta()),
1372                                format!(" {}", stats.upper_count),
1373                                format!(" {}", stats.collectibles_count),
1374                                format!(" {}", stats.aggregated_collectibles_count),
1375                                format!(" {}", stats.children_count),
1376                                format!(" {}", stats.followers_count),
1377                                format!(" {}", stats.collectibles_dependents_count),
1378                                format!(" {}", stats.aggregated_dirty_containers_count),
1379                                format!(" {}", FormatBytes(stats.output_size)),
1380                            ]
1381                        },
1382                    );
1383                }
1384            }
1385        }
1386
1387        let elapsed = start.elapsed();
1388        let persist_duration = persist_start.elapsed();
1389        // avoid spamming the event queue with information about fast operations
1390        if elapsed > Duration::from_secs(10) {
1391            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1392                "Finished writing to filesystem cache".to_string(),
1393                elapsed,
1394            )));
1395        }
1396
1397        let wall_start_ms = wall_start
1398            .duration_since(SystemTime::UNIX_EPOCH)
1399            .unwrap_or_default()
1400            // as_millis_f64 is not stable yet
1401            .as_secs_f64()
1402            * 1000.0;
1403        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1404        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1405            "turbopack-persistence",
1406            wall_start_ms,
1407            wall_end_ms,
1408            vec![
1409                ("reason", serde_json::Value::from(reason)),
1410                (
1411                    "snapshot_duration_ms",
1412                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1413                ),
1414                (
1415                    "persist_duration_ms",
1416                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1417                ),
1418                ("task_count", serde_json::Value::from(task_count)),
1419            ],
1420        )));
1421
1422        Some((snapshot_time, true))
1423    }
1424
1425    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1426        if self.should_restore() {
1427            // Continue all uncompleted operations
1428            // They can't be interrupted by a snapshot since the snapshotting job has not been
1429            // scheduled yet.
1430            let uncompleted_operations = self
1431                .backing_storage
1432                .uncompleted_operations()
1433                .expect("Failed to get uncompleted operations");
1434            if !uncompleted_operations.is_empty() {
1435                let mut ctx = self.execute_context(turbo_tasks);
1436                for op in uncompleted_operations {
1437                    op.execute(&mut ctx);
1438                }
1439            }
1440        }
1441
1442        // Only when it should write regularly to the storage, we schedule the initial snapshot
1443        // job.
1444        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1445            // Schedule the snapshot job
1446            let _span = trace_span!("persisting background job").entered();
1447            let _span = tracing::info_span!("thread").entered();
1448            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1449        }
1450    }
1451
1452    fn stopping(&self) {
1453        self.stopping.store(true, Ordering::Release);
1454        self.stopping_event.notify(usize::MAX);
1455    }
1456
1457    #[allow(unused_variables)]
1458    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1459        #[cfg(feature = "verify_aggregation_graph")]
1460        {
1461            self.is_idle.store(false, Ordering::Release);
1462            self.verify_aggregation_graph(turbo_tasks, false);
1463        }
1464        if self.should_persist() {
1465            self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1466        }
1467        drop_contents(&self.task_cache);
1468        self.storage.drop_contents();
1469        if let Err(err) = self.backing_storage.shutdown() {
1470            println!("Shutting down failed: {err}");
1471        }
1472    }
1473
1474    #[allow(unused_variables)]
1475    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1476        self.idle_start_event.notify(usize::MAX);
1477
1478        #[cfg(feature = "verify_aggregation_graph")]
1479        {
1480            use tokio::select;
1481
1482            self.is_idle.store(true, Ordering::Release);
1483            let this = self.clone();
1484            let turbo_tasks = turbo_tasks.pin();
1485            tokio::task::spawn(async move {
1486                select! {
1487                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1488                        // do nothing
1489                    }
1490                    _ = this.idle_end_event.listen() => {
1491                        return;
1492                    }
1493                }
1494                if !this.is_idle.load(Ordering::Relaxed) {
1495                    return;
1496                }
1497                this.verify_aggregation_graph(&*turbo_tasks, true);
1498            });
1499        }
1500    }
1501
1502    fn idle_end(&self) {
1503        #[cfg(feature = "verify_aggregation_graph")]
1504        self.is_idle.store(false, Ordering::Release);
1505        self.idle_end_event.notify(usize::MAX);
1506    }
1507
1508    fn get_or_create_persistent_task(
1509        &self,
1510        task_type: CachedTaskType,
1511        parent_task: Option<TaskId>,
1512        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1513    ) -> TaskId {
1514        let is_root = task_type.native_fn.is_root;
1515
1516        // First check if the task exists in the cache which only uses a read lock
1517        if let Some(task_id) = self.task_cache.get(&task_type) {
1518            let task_id = *task_id;
1519            self.track_cache_hit(&task_type);
1520            self.connect_child(
1521                parent_task,
1522                task_id,
1523                Some(ArcOrOwned::Owned(task_type)),
1524                turbo_tasks,
1525            );
1526            return task_id;
1527        }
1528
1529        // Create a single ExecuteContext for both lookup and connect_child
1530        let mut ctx = self.execute_context(turbo_tasks);
1531
1532        let mut is_new = false;
1533        let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1534            // Task exists in backing storage
1535            // So we only need to insert it into the in-memory cache
1536            self.track_cache_hit(&task_type);
1537            let task_type = match raw_entry(&self.task_cache, &task_type) {
1538                RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1539                RawEntry::Vacant(e) => {
1540                    let task_type = Arc::new(task_type);
1541                    e.insert(task_type.clone(), task_id);
1542                    ArcOrOwned::Arc(task_type)
1543                }
1544            };
1545            (task_id, task_type)
1546        } else {
1547            // Task doesn't exist in memory cache or backing storage
1548            // So we might need to create a new task
1549            let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1550                RawEntry::Occupied(e) => {
1551                    // Another thread beat us to creating this task - use their task_id.
1552                    // They will handle logging to persisted_task_cache_log.
1553                    let task_id = *e.get();
1554                    drop(e);
1555                    self.track_cache_hit(&task_type);
1556                    (task_id, ArcOrOwned::Owned(task_type))
1557                }
1558                RawEntry::Vacant(e) => {
1559                    // We're creating a new task.
1560                    let task_type = Arc::new(task_type);
1561                    let task_id = self.persisted_task_id_factory.get();
1562                    e.insert(task_type.clone(), task_id);
1563                    // insert() consumes e, releasing the lock
1564                    self.track_cache_miss(&task_type);
1565                    is_new = true;
1566                    if let Some(log) = &self.persisted_task_cache_log {
1567                        log.lock(task_id).push((task_type.clone(), task_id));
1568                    }
1569                    (task_id, ArcOrOwned::Arc(task_type))
1570                }
1571            };
1572            (task_id, task_type)
1573        };
1574        if is_new && is_root {
1575            AggregationUpdateQueue::run(
1576                AggregationUpdateJob::UpdateAggregationNumber {
1577                    task_id,
1578                    base_aggregation_number: u32::MAX,
1579                    distance: None,
1580                },
1581                &mut ctx,
1582            );
1583        }
1584        // Reuse the same ExecuteContext for connect_child
1585        operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1586
1587        task_id
1588    }
1589
1590    fn get_or_create_transient_task(
1591        &self,
1592        task_type: CachedTaskType,
1593        parent_task: Option<TaskId>,
1594        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1595    ) -> TaskId {
1596        let is_root = task_type.native_fn.is_root;
1597
1598        if let Some(parent_task) = parent_task
1599            && !parent_task.is_transient()
1600        {
1601            self.panic_persistent_calling_transient(
1602                self.debug_get_task_description(parent_task),
1603                Some(&task_type),
1604                /* cell_id */ None,
1605            );
1606        }
1607        // First check if the task exists in the cache which only uses a read lock
1608        if let Some(task_id) = self.task_cache.get(&task_type) {
1609            let task_id = *task_id;
1610            self.track_cache_hit(&task_type);
1611            self.connect_child(
1612                parent_task,
1613                task_id,
1614                Some(ArcOrOwned::Owned(task_type)),
1615                turbo_tasks,
1616            );
1617            return task_id;
1618        }
1619        // If not, acquire a write lock and double check / insert
1620        match raw_entry(&self.task_cache, &task_type) {
1621            RawEntry::Occupied(e) => {
1622                let task_id = *e.get();
1623                drop(e);
1624                self.track_cache_hit(&task_type);
1625                self.connect_child(
1626                    parent_task,
1627                    task_id,
1628                    Some(ArcOrOwned::Owned(task_type)),
1629                    turbo_tasks,
1630                );
1631                task_id
1632            }
1633            RawEntry::Vacant(e) => {
1634                let task_type = Arc::new(task_type);
1635                let task_id = self.transient_task_id_factory.get();
1636                e.insert(task_type.clone(), task_id);
1637                self.track_cache_miss(&task_type);
1638
1639                if is_root {
1640                    let mut ctx = self.execute_context(turbo_tasks);
1641                    AggregationUpdateQueue::run(
1642                        AggregationUpdateJob::UpdateAggregationNumber {
1643                            task_id,
1644                            base_aggregation_number: u32::MAX,
1645                            distance: None,
1646                        },
1647                        &mut ctx,
1648                    );
1649                }
1650
1651                self.connect_child(
1652                    parent_task,
1653                    task_id,
1654                    Some(ArcOrOwned::Arc(task_type)),
1655                    turbo_tasks,
1656                );
1657
1658                task_id
1659            }
1660        }
1661    }
1662
1663    /// Generate an object that implements [`fmt::Display`] explaining why the given
1664    /// [`CachedTaskType`] is transient.
1665    fn debug_trace_transient_task(
1666        &self,
1667        task_type: &CachedTaskType,
1668        cell_id: Option<CellId>,
1669    ) -> DebugTraceTransientTask {
1670        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1671        // from tracing the same task many times, so use a visited_set
1672        fn inner_id(
1673            backend: &TurboTasksBackendInner<impl BackingStorage>,
1674            task_id: TaskId,
1675            cell_type_id: Option<ValueTypeId>,
1676            visited_set: &mut FxHashSet<TaskId>,
1677        ) -> DebugTraceTransientTask {
1678            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1679                if visited_set.contains(&task_id) {
1680                    let task_name = task_type.get_name();
1681                    DebugTraceTransientTask::Collapsed {
1682                        task_name,
1683                        cell_type_id,
1684                    }
1685                } else {
1686                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1687                }
1688            } else {
1689                DebugTraceTransientTask::Uncached { cell_type_id }
1690            }
1691        }
1692        fn inner_cached(
1693            backend: &TurboTasksBackendInner<impl BackingStorage>,
1694            task_type: &CachedTaskType,
1695            cell_type_id: Option<ValueTypeId>,
1696            visited_set: &mut FxHashSet<TaskId>,
1697        ) -> DebugTraceTransientTask {
1698            let task_name = task_type.get_name();
1699
1700            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1701                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1702                    // `task_id` should never be `None` at this point, as that would imply a
1703                    // non-local task is returning a local `Vc`...
1704                    // Just ignore if it happens, as we're likely already panicking.
1705                    return None;
1706                };
1707                if task_id.is_transient() {
1708                    Some(Box::new(inner_id(
1709                        backend,
1710                        task_id,
1711                        cause_self_raw_vc.try_get_type_id(),
1712                        visited_set,
1713                    )))
1714                } else {
1715                    None
1716                }
1717            });
1718            let cause_args = task_type
1719                .arg
1720                .get_raw_vcs()
1721                .into_iter()
1722                .filter_map(|raw_vc| {
1723                    let Some(task_id) = raw_vc.try_get_task_id() else {
1724                        // `task_id` should never be `None` (see comment above)
1725                        return None;
1726                    };
1727                    if !task_id.is_transient() {
1728                        return None;
1729                    }
1730                    Some((task_id, raw_vc.try_get_type_id()))
1731                })
1732                .collect::<IndexSet<_>>() // dedupe
1733                .into_iter()
1734                .map(|(task_id, cell_type_id)| {
1735                    inner_id(backend, task_id, cell_type_id, visited_set)
1736                })
1737                .collect();
1738
1739            DebugTraceTransientTask::Cached {
1740                task_name,
1741                cell_type_id,
1742                cause_self,
1743                cause_args,
1744            }
1745        }
1746        inner_cached(
1747            self,
1748            task_type,
1749            cell_id.map(|c| c.type_id),
1750            &mut FxHashSet::default(),
1751        )
1752    }
1753
1754    fn invalidate_task(
1755        &self,
1756        task_id: TaskId,
1757        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1758    ) {
1759        if !self.should_track_dependencies() {
1760            panic!("Dependency tracking is disabled so invalidation is not allowed");
1761        }
1762        operation::InvalidateOperation::run(
1763            smallvec![task_id],
1764            #[cfg(feature = "trace_task_dirty")]
1765            TaskDirtyCause::Invalidator,
1766            self.execute_context(turbo_tasks),
1767        );
1768    }
1769
1770    fn invalidate_tasks(
1771        &self,
1772        tasks: &[TaskId],
1773        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1774    ) {
1775        if !self.should_track_dependencies() {
1776            panic!("Dependency tracking is disabled so invalidation is not allowed");
1777        }
1778        operation::InvalidateOperation::run(
1779            tasks.iter().copied().collect(),
1780            #[cfg(feature = "trace_task_dirty")]
1781            TaskDirtyCause::Unknown,
1782            self.execute_context(turbo_tasks),
1783        );
1784    }
1785
1786    fn invalidate_tasks_set(
1787        &self,
1788        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1789        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1790    ) {
1791        if !self.should_track_dependencies() {
1792            panic!("Dependency tracking is disabled so invalidation is not allowed");
1793        }
1794        operation::InvalidateOperation::run(
1795            tasks.iter().copied().collect(),
1796            #[cfg(feature = "trace_task_dirty")]
1797            TaskDirtyCause::Unknown,
1798            self.execute_context(turbo_tasks),
1799        );
1800    }
1801
1802    fn invalidate_serialization(
1803        &self,
1804        task_id: TaskId,
1805        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1806    ) {
1807        if task_id.is_transient() {
1808            return;
1809        }
1810        let mut ctx = self.execute_context(turbo_tasks);
1811        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1812        task.invalidate_serialization();
1813    }
1814
1815    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1816        let task = self.storage.access_mut(task_id);
1817        if let Some(value) = task.get_persistent_task_type() {
1818            format!("{task_id:?} {}", value)
1819        } else if let Some(value) = task.get_transient_task_type() {
1820            format!("{task_id:?} {}", value)
1821        } else {
1822            format!("{task_id:?} unknown")
1823        }
1824    }
1825
1826    fn get_task_name(
1827        &self,
1828        task_id: TaskId,
1829        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1830    ) -> String {
1831        let mut ctx = self.execute_context(turbo_tasks);
1832        let task = ctx.task(task_id, TaskDataCategory::Data);
1833        if let Some(value) = task.get_persistent_task_type() {
1834            value.to_string()
1835        } else if let Some(value) = task.get_transient_task_type() {
1836            value.to_string()
1837        } else {
1838            "unknown".to_string()
1839        }
1840    }
1841
1842    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1843        let task = self.storage.access_mut(task_id);
1844        task.get_persistent_task_type().cloned()
1845    }
1846
1847    fn task_execution_canceled(
1848        &self,
1849        task_id: TaskId,
1850        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1851    ) {
1852        let mut ctx = self.execute_context(turbo_tasks);
1853        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1854        if let Some(in_progress) = task.take_in_progress() {
1855            match in_progress {
1856                InProgressState::Scheduled {
1857                    done_event,
1858                    reason: _,
1859                } => done_event.notify(usize::MAX),
1860                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1861                    done_event.notify(usize::MAX)
1862                }
1863                InProgressState::Canceled => {}
1864            }
1865        }
1866        let old = task.set_in_progress(InProgressState::Canceled);
1867        debug_assert!(old.is_none(), "InProgress already exists");
1868    }
1869
1870    fn try_start_task_execution(
1871        &self,
1872        task_id: TaskId,
1873        priority: TaskPriority,
1874        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1875    ) -> Option<TaskExecutionSpec<'_>> {
1876        let execution_reason;
1877        let task_type;
1878        {
1879            let mut ctx = self.execute_context(turbo_tasks);
1880            let mut task = ctx.task(task_id, TaskDataCategory::All);
1881            task_type = task.get_task_type().to_owned();
1882            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1883            if let Some(tasks) = task.prefetch() {
1884                drop(task);
1885                ctx.prepare_tasks(tasks);
1886                task = ctx.task(task_id, TaskDataCategory::All);
1887            }
1888            let in_progress = task.take_in_progress()?;
1889            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1890                let old = task.set_in_progress(in_progress);
1891                debug_assert!(old.is_none(), "InProgress already exists");
1892                return None;
1893            };
1894            execution_reason = reason;
1895            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1896                InProgressStateInner {
1897                    stale: false,
1898                    once_task,
1899                    done_event,
1900                    session_dependent: false,
1901                    marked_as_completed: false,
1902                    new_children: Default::default(),
1903                },
1904            )));
1905            debug_assert!(old.is_none(), "InProgress already exists");
1906
1907            // Make all current collectibles outdated (remove left-over outdated collectibles)
1908            enum Collectible {
1909                Current(CollectibleRef, i32),
1910                Outdated(CollectibleRef),
1911            }
1912            let collectibles = task
1913                .iter_collectibles()
1914                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1915                .chain(
1916                    task.iter_outdated_collectibles()
1917                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1918                )
1919                .collect::<Vec<_>>();
1920            for collectible in collectibles {
1921                match collectible {
1922                    Collectible::Current(collectible, value) => {
1923                        let _ = task.insert_outdated_collectible(collectible, value);
1924                    }
1925                    Collectible::Outdated(collectible) => {
1926                        if task
1927                            .collectibles()
1928                            .is_none_or(|m| m.get(&collectible).is_none())
1929                        {
1930                            task.remove_outdated_collectibles(&collectible);
1931                        }
1932                    }
1933                }
1934            }
1935
1936            if self.should_track_dependencies() {
1937                // Make all dependencies outdated
1938                let cell_dependencies = task.iter_cell_dependencies().collect();
1939                task.set_outdated_cell_dependencies(cell_dependencies);
1940
1941                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1942                task.set_outdated_output_dependencies(outdated_output_dependencies);
1943            }
1944        }
1945
1946        let (span, future) = match task_type {
1947            TaskType::Cached(task_type) => {
1948                let CachedTaskType {
1949                    native_fn,
1950                    this,
1951                    arg,
1952                } = &*task_type;
1953                (
1954                    native_fn.span(task_id.persistence(), execution_reason, priority),
1955                    native_fn.execute(*this, &**arg),
1956                )
1957            }
1958            TaskType::Transient(task_type) => {
1959                let span = tracing::trace_span!("turbo_tasks::root_task");
1960                let future = match &*task_type {
1961                    TransientTask::Root(f) => f(),
1962                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1963                };
1964                (span, future)
1965            }
1966        };
1967        Some(TaskExecutionSpec { future, span })
1968    }
1969
1970    fn task_execution_completed(
1971        &self,
1972        task_id: TaskId,
1973        result: Result<RawVc, TurboTasksExecutionError>,
1974        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1975        #[cfg(feature = "verify_determinism")] stateful: bool,
1976        has_invalidator: bool,
1977        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1978    ) -> bool {
1979        // Task completion is a 4 step process:
1980        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1981        //    aggregation number of the task and the new children.
1982        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1983        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1984        // 4. Shrink the task memory to reduce footprint of the task.
1985
1986        // Due to persistence it is possible that the process is cancelled after any step. This is
1987        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1988        // in-memory representation.
1989
1990        // The task might be invalidated during this process, so we need to check the stale flag
1991        // at the start of every step.
1992
1993        #[cfg(not(feature = "trace_task_details"))]
1994        let span = tracing::trace_span!(
1995            "task execution completed",
1996            new_children = tracing::field::Empty
1997        )
1998        .entered();
1999        #[cfg(feature = "trace_task_details")]
2000        let span = tracing::trace_span!(
2001            "task execution completed",
2002            task_id = display(task_id),
2003            result = match result.as_ref() {
2004                Ok(value) => display(either::Either::Left(value)),
2005                Err(err) => display(either::Either::Right(err)),
2006            },
2007            new_children = tracing::field::Empty,
2008            immutable = tracing::field::Empty,
2009            new_output = tracing::field::Empty,
2010            output_dependents = tracing::field::Empty,
2011            stale = tracing::field::Empty,
2012        )
2013        .entered();
2014
2015        let is_error = result.is_err();
2016
2017        let mut ctx = self.execute_context(turbo_tasks);
2018
2019        let Some(TaskExecutionCompletePrepareResult {
2020            new_children,
2021            is_now_immutable,
2022            #[cfg(feature = "verify_determinism")]
2023            no_output_set,
2024            new_output,
2025            output_dependent_tasks,
2026        }) = self.task_execution_completed_prepare(
2027            &mut ctx,
2028            #[cfg(feature = "trace_task_details")]
2029            &span,
2030            task_id,
2031            result,
2032            cell_counters,
2033            #[cfg(feature = "verify_determinism")]
2034            stateful,
2035            has_invalidator,
2036        )
2037        else {
2038            // Task was stale and has been rescheduled
2039            #[cfg(feature = "trace_task_details")]
2040            span.record("stale", "prepare");
2041            return true;
2042        };
2043
2044        #[cfg(feature = "trace_task_details")]
2045        span.record("new_output", new_output.is_some());
2046        #[cfg(feature = "trace_task_details")]
2047        span.record("output_dependents", output_dependent_tasks.len());
2048
2049        // When restoring from filesystem cache the following might not be executed (since we can
2050        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2051        // would be executed again.
2052
2053        if !output_dependent_tasks.is_empty() {
2054            self.task_execution_completed_invalidate_output_dependent(
2055                &mut ctx,
2056                task_id,
2057                output_dependent_tasks,
2058            );
2059        }
2060
2061        let has_new_children = !new_children.is_empty();
2062        span.record("new_children", new_children.len());
2063
2064        if has_new_children {
2065            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2066        }
2067
2068        if has_new_children
2069            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2070        {
2071            // Task was stale and has been rescheduled
2072            #[cfg(feature = "trace_task_details")]
2073            span.record("stale", "connect");
2074            return true;
2075        }
2076
2077        let (stale, in_progress_cells) = self.task_execution_completed_finish(
2078            &mut ctx,
2079            task_id,
2080            #[cfg(feature = "verify_determinism")]
2081            no_output_set,
2082            new_output,
2083            is_now_immutable,
2084        );
2085        if stale {
2086            // Task was stale and has been rescheduled
2087            #[cfg(feature = "trace_task_details")]
2088            span.record("stale", "finish");
2089            return true;
2090        }
2091
2092        let removed_data =
2093            self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2094
2095        // Drop data outside of critical sections
2096        drop(removed_data);
2097        drop(in_progress_cells);
2098
2099        false
2100    }
2101
2102    fn task_execution_completed_prepare(
2103        &self,
2104        ctx: &mut impl ExecuteContext<'_>,
2105        #[cfg(feature = "trace_task_details")] span: &Span,
2106        task_id: TaskId,
2107        result: Result<RawVc, TurboTasksExecutionError>,
2108        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2109        #[cfg(feature = "verify_determinism")] stateful: bool,
2110        has_invalidator: bool,
2111    ) -> Option<TaskExecutionCompletePrepareResult> {
2112        let mut task = ctx.task(task_id, TaskDataCategory::All);
2113        let Some(in_progress) = task.get_in_progress_mut() else {
2114            panic!("Task execution completed, but task is not in progress: {task:#?}");
2115        };
2116        if matches!(in_progress, InProgressState::Canceled) {
2117            return Some(TaskExecutionCompletePrepareResult {
2118                new_children: Default::default(),
2119                is_now_immutable: false,
2120                #[cfg(feature = "verify_determinism")]
2121                no_output_set: false,
2122                new_output: None,
2123                output_dependent_tasks: Default::default(),
2124            });
2125        }
2126        let &mut InProgressState::InProgress(box InProgressStateInner {
2127            stale,
2128            ref mut new_children,
2129            session_dependent,
2130            once_task: is_once_task,
2131            ..
2132        }) = in_progress
2133        else {
2134            panic!("Task execution completed, but task is not in progress: {task:#?}");
2135        };
2136
2137        // If the task is stale, reschedule it
2138        #[cfg(not(feature = "no_fast_stale"))]
2139        if stale && !is_once_task {
2140            let Some(InProgressState::InProgress(box InProgressStateInner {
2141                done_event,
2142                mut new_children,
2143                ..
2144            })) = task.take_in_progress()
2145            else {
2146                unreachable!();
2147            };
2148            let old = task.set_in_progress(InProgressState::Scheduled {
2149                done_event,
2150                reason: TaskExecutionReason::Stale,
2151            });
2152            debug_assert!(old.is_none(), "InProgress already exists");
2153            // Remove old children from new_children to leave only the children that had their
2154            // active count increased
2155            for task in task.iter_children() {
2156                new_children.remove(&task);
2157            }
2158            drop(task);
2159
2160            // We need to undo the active count increase for the children since we throw away the
2161            // new_children list now.
2162            AggregationUpdateQueue::run(
2163                AggregationUpdateJob::DecreaseActiveCounts {
2164                    task_ids: new_children.into_iter().collect(),
2165                },
2166                ctx,
2167            );
2168            return None;
2169        }
2170
2171        // take the children from the task to process them
2172        let mut new_children = take(new_children);
2173
2174        // handle stateful (only tracked when verify_determinism is enabled)
2175        #[cfg(feature = "verify_determinism")]
2176        if stateful {
2177            task.set_stateful(true);
2178        }
2179
2180        // handle has_invalidator
2181        if has_invalidator {
2182            task.set_invalidator(true);
2183        }
2184
2185        // handle cell counters: update max index and remove cells that are no longer used
2186        let old_counters: FxHashMap<_, _> = task
2187            .iter_cell_type_max_index()
2188            .map(|(&k, &v)| (k, v))
2189            .collect();
2190        let mut counters_to_remove = old_counters.clone();
2191
2192        for (&cell_type, &max_index) in cell_counters.iter() {
2193            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2194                if old_max_index != max_index {
2195                    task.insert_cell_type_max_index(cell_type, max_index);
2196                }
2197            } else {
2198                task.insert_cell_type_max_index(cell_type, max_index);
2199            }
2200        }
2201        for (cell_type, _) in counters_to_remove {
2202            task.remove_cell_type_max_index(&cell_type);
2203        }
2204
2205        let mut queue = AggregationUpdateQueue::new();
2206
2207        let mut old_edges = Vec::new();
2208
2209        let has_children = !new_children.is_empty();
2210        let is_immutable = task.immutable();
2211        let task_dependencies_for_immutable =
2212            // Task was previously marked as immutable
2213            if !is_immutable
2214            // Task is not session dependent (session dependent tasks can change between sessions)
2215            && !session_dependent
2216            // Task has no invalidator
2217            && !task.invalidator()
2218            // Task has no dependencies on collectibles
2219            && task.is_collectibles_dependencies_empty()
2220        {
2221            Some(
2222                // Collect all dependencies on tasks to check if all dependencies are immutable
2223                task.iter_output_dependencies()
2224                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2225                    .collect::<FxHashSet<_>>(),
2226            )
2227        } else {
2228            None
2229        };
2230
2231        if has_children {
2232            // Prepare all new children
2233            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2234
2235            // Filter actual new children
2236            old_edges.extend(
2237                task.iter_children()
2238                    .filter(|task| !new_children.remove(task))
2239                    .map(OutdatedEdge::Child),
2240            );
2241        } else {
2242            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2243        }
2244
2245        old_edges.extend(
2246            task.iter_outdated_collectibles()
2247                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2248        );
2249
2250        if self.should_track_dependencies() {
2251            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2252            // At execution start, active deps are copied to outdated as a "before" snapshot.
2253            // During execution, new deps are added to active.
2254            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2255            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2256            // breaking dependency tracking.
2257            old_edges.extend(
2258                task.iter_outdated_cell_dependencies()
2259                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2260            );
2261            old_edges.extend(
2262                task.iter_outdated_output_dependencies()
2263                    .map(OutdatedEdge::OutputDependency),
2264            );
2265        }
2266
2267        // Check if output need to be updated
2268        let current_output = task.get_output();
2269        #[cfg(feature = "verify_determinism")]
2270        let no_output_set = current_output.is_none();
2271        let new_output = match result {
2272            Ok(RawVc::TaskOutput(output_task_id)) => {
2273                if let Some(OutputValue::Output(current_task_id)) = current_output
2274                    && *current_task_id == output_task_id
2275                {
2276                    None
2277                } else {
2278                    Some(OutputValue::Output(output_task_id))
2279                }
2280            }
2281            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2282                if let Some(OutputValue::Cell(CellRef {
2283                    task: current_task_id,
2284                    cell: current_cell,
2285                })) = current_output
2286                    && *current_task_id == output_task_id
2287                    && *current_cell == cell
2288                {
2289                    None
2290                } else {
2291                    Some(OutputValue::Cell(CellRef {
2292                        task: output_task_id,
2293                        cell,
2294                    }))
2295                }
2296            }
2297            Ok(RawVc::LocalOutput(..)) => {
2298                panic!("Non-local tasks must not return a local Vc");
2299            }
2300            Err(err) => {
2301                if let Some(OutputValue::Error(old_error)) = current_output
2302                    && **old_error == err
2303                {
2304                    None
2305                } else {
2306                    Some(OutputValue::Error(Arc::new((&err).into())))
2307                }
2308            }
2309        };
2310        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2311        // When output has changed, grab the dependent tasks
2312        if new_output.is_some() && ctx.should_track_dependencies() {
2313            output_dependent_tasks = task.iter_output_dependent().collect();
2314        }
2315
2316        drop(task);
2317
2318        // Check if the task can be marked as immutable
2319        let mut is_now_immutable = false;
2320        if let Some(dependencies) = task_dependencies_for_immutable
2321            && dependencies
2322                .iter()
2323                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2324        {
2325            is_now_immutable = true;
2326        }
2327        #[cfg(feature = "trace_task_details")]
2328        span.record("immutable", is_immutable || is_now_immutable);
2329
2330        if !queue.is_empty() || !old_edges.is_empty() {
2331            #[cfg(feature = "trace_task_completion")]
2332            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2333            // Remove outdated edges first, before removing in_progress+dirty flag.
2334            // We need to make sure all outdated edges are removed before the task can potentially
2335            // be scheduled and executed again
2336            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2337        }
2338
2339        Some(TaskExecutionCompletePrepareResult {
2340            new_children,
2341            is_now_immutable,
2342            #[cfg(feature = "verify_determinism")]
2343            no_output_set,
2344            new_output,
2345            output_dependent_tasks,
2346        })
2347    }
2348
2349    fn task_execution_completed_invalidate_output_dependent(
2350        &self,
2351        ctx: &mut impl ExecuteContext<'_>,
2352        task_id: TaskId,
2353        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2354    ) {
2355        debug_assert!(!output_dependent_tasks.is_empty());
2356
2357        if output_dependent_tasks.len() > 1 {
2358            ctx.prepare_tasks(
2359                output_dependent_tasks
2360                    .iter()
2361                    .map(|&id| (id, TaskDataCategory::All)),
2362            );
2363        }
2364
2365        fn process_output_dependents(
2366            ctx: &mut impl ExecuteContext<'_>,
2367            task_id: TaskId,
2368            dependent_task_id: TaskId,
2369            queue: &mut AggregationUpdateQueue,
2370        ) {
2371            #[cfg(feature = "trace_task_output_dependencies")]
2372            let span = tracing::trace_span!(
2373                "invalidate output dependency",
2374                task = %task_id,
2375                dependent_task = %dependent_task_id,
2376                result = tracing::field::Empty,
2377            )
2378            .entered();
2379            let mut make_stale = true;
2380            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2381            let transient_task_type = dependent.get_transient_task_type();
2382            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2383                // once tasks are never invalidated
2384                #[cfg(feature = "trace_task_output_dependencies")]
2385                span.record("result", "once task");
2386                return;
2387            }
2388            if dependent.outdated_output_dependencies_contains(&task_id) {
2389                #[cfg(feature = "trace_task_output_dependencies")]
2390                span.record("result", "outdated dependency");
2391                // output dependency is outdated, so it hasn't read the output yet
2392                // and doesn't need to be invalidated
2393                // But importantly we still need to make the task dirty as it should no longer
2394                // be considered as "recomputation".
2395                make_stale = false;
2396            } else if !dependent.output_dependencies_contains(&task_id) {
2397                // output dependency has been removed, so the task doesn't depend on the
2398                // output anymore and doesn't need to be invalidated
2399                #[cfg(feature = "trace_task_output_dependencies")]
2400                span.record("result", "no backward dependency");
2401                return;
2402            }
2403            make_task_dirty_internal(
2404                dependent,
2405                dependent_task_id,
2406                make_stale,
2407                #[cfg(feature = "trace_task_dirty")]
2408                TaskDirtyCause::OutputChange { task_id },
2409                queue,
2410                ctx,
2411            );
2412            #[cfg(feature = "trace_task_output_dependencies")]
2413            span.record("result", "marked dirty");
2414        }
2415
2416        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2417            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2418            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2419            let _ = scope_and_block(chunks.len(), |scope| {
2420                for chunk in chunks {
2421                    let child_ctx = ctx.child_context();
2422                    scope.spawn(move || {
2423                        let mut ctx = child_ctx.create();
2424                        let mut queue = AggregationUpdateQueue::new();
2425                        for dependent_task_id in chunk {
2426                            process_output_dependents(
2427                                &mut ctx,
2428                                task_id,
2429                                dependent_task_id,
2430                                &mut queue,
2431                            )
2432                        }
2433                        queue.execute(&mut ctx);
2434                    });
2435                }
2436            });
2437        } else {
2438            let mut queue = AggregationUpdateQueue::new();
2439            for dependent_task_id in output_dependent_tasks {
2440                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2441            }
2442            queue.execute(ctx);
2443        }
2444    }
2445
2446    fn task_execution_completed_unfinished_children_dirty(
2447        &self,
2448        ctx: &mut impl ExecuteContext<'_>,
2449        new_children: &FxHashSet<TaskId>,
2450    ) {
2451        debug_assert!(!new_children.is_empty());
2452
2453        let mut queue = AggregationUpdateQueue::new();
2454        ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2455            if !child_task.has_output() {
2456                let child_id = child_task.id();
2457                make_task_dirty_internal(
2458                    child_task,
2459                    child_id,
2460                    false,
2461                    #[cfg(feature = "trace_task_dirty")]
2462                    TaskDirtyCause::InitialDirty,
2463                    &mut queue,
2464                    ctx,
2465                );
2466            }
2467        });
2468
2469        queue.execute(ctx);
2470    }
2471
2472    fn task_execution_completed_connect(
2473        &self,
2474        ctx: &mut impl ExecuteContext<'_>,
2475        task_id: TaskId,
2476        new_children: FxHashSet<TaskId>,
2477    ) -> bool {
2478        debug_assert!(!new_children.is_empty());
2479
2480        let mut task = ctx.task(task_id, TaskDataCategory::All);
2481        let Some(in_progress) = task.get_in_progress() else {
2482            panic!("Task execution completed, but task is not in progress: {task:#?}");
2483        };
2484        if matches!(in_progress, InProgressState::Canceled) {
2485            // Task was canceled in the meantime, so we don't connect the children
2486            return false;
2487        }
2488        let InProgressState::InProgress(box InProgressStateInner {
2489            #[cfg(not(feature = "no_fast_stale"))]
2490            stale,
2491            once_task: is_once_task,
2492            ..
2493        }) = in_progress
2494        else {
2495            panic!("Task execution completed, but task is not in progress: {task:#?}");
2496        };
2497
2498        // If the task is stale, reschedule it
2499        #[cfg(not(feature = "no_fast_stale"))]
2500        if *stale && !is_once_task {
2501            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2502                task.take_in_progress()
2503            else {
2504                unreachable!();
2505            };
2506            let old = task.set_in_progress(InProgressState::Scheduled {
2507                done_event,
2508                reason: TaskExecutionReason::Stale,
2509            });
2510            debug_assert!(old.is_none(), "InProgress already exists");
2511            drop(task);
2512
2513            // All `new_children` are currently hold active with an active count and we need to undo
2514            // that. (We already filtered out the old children from that list)
2515            AggregationUpdateQueue::run(
2516                AggregationUpdateJob::DecreaseActiveCounts {
2517                    task_ids: new_children.into_iter().collect(),
2518                },
2519                ctx,
2520            );
2521            return true;
2522        }
2523
2524        let has_active_count = ctx.should_track_activeness()
2525            && task
2526                .get_activeness()
2527                .is_some_and(|activeness| activeness.active_counter > 0);
2528        connect_children(
2529            ctx,
2530            task_id,
2531            task,
2532            new_children,
2533            has_active_count,
2534            ctx.should_track_activeness(),
2535        );
2536
2537        false
2538    }
2539
2540    fn task_execution_completed_finish(
2541        &self,
2542        ctx: &mut impl ExecuteContext<'_>,
2543        task_id: TaskId,
2544        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2545        new_output: Option<OutputValue>,
2546        is_now_immutable: bool,
2547    ) -> (
2548        bool,
2549        Option<
2550            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2551        >,
2552    ) {
2553        let mut task = ctx.task(task_id, TaskDataCategory::All);
2554        let Some(in_progress) = task.take_in_progress() else {
2555            panic!("Task execution completed, but task is not in progress: {task:#?}");
2556        };
2557        if matches!(in_progress, InProgressState::Canceled) {
2558            // Task was canceled in the meantime, so we don't finish it
2559            return (false, None);
2560        }
2561        let InProgressState::InProgress(box InProgressStateInner {
2562            done_event,
2563            once_task: is_once_task,
2564            stale,
2565            session_dependent,
2566            marked_as_completed: _,
2567            new_children,
2568        }) = in_progress
2569        else {
2570            panic!("Task execution completed, but task is not in progress: {task:#?}");
2571        };
2572        debug_assert!(new_children.is_empty());
2573
2574        // If the task is stale, reschedule it
2575        if stale && !is_once_task {
2576            let old = task.set_in_progress(InProgressState::Scheduled {
2577                done_event,
2578                reason: TaskExecutionReason::Stale,
2579            });
2580            debug_assert!(old.is_none(), "InProgress already exists");
2581            return (true, None);
2582        }
2583
2584        // Set the output if it has changed
2585        let mut old_content = None;
2586        if let Some(value) = new_output {
2587            old_content = task.set_output(value);
2588        }
2589
2590        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2591        // to be invalidated and we can mark it as immutable.
2592        if is_now_immutable {
2593            task.set_immutable(true);
2594        }
2595
2596        // Notify in progress cells and remove all of them
2597        let in_progress_cells = task.take_in_progress_cells();
2598        if let Some(ref cells) = in_progress_cells {
2599            for state in cells.values() {
2600                state.event.notify(usize::MAX);
2601            }
2602        }
2603
2604        // Grab the old dirty state
2605        let old_dirtyness = task.get_dirty().cloned();
2606        let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2607            None => (false, false),
2608            Some(Dirtyness::Dirty(_)) => (true, false),
2609            Some(Dirtyness::SessionDependent) => {
2610                let clean_in_current_session = task.current_session_clean();
2611                (true, clean_in_current_session)
2612            }
2613        };
2614
2615        // Compute the new dirty state
2616        let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2617            (Some(Dirtyness::SessionDependent), true, true)
2618        } else {
2619            (None, false, false)
2620        };
2621
2622        // Update the dirty state
2623        let dirty_changed = old_dirtyness != new_dirtyness;
2624        if dirty_changed {
2625            if let Some(value) = new_dirtyness {
2626                task.set_dirty(value);
2627            } else if old_dirtyness.is_some() {
2628                task.take_dirty();
2629            }
2630        }
2631        if old_current_session_self_clean != new_current_session_self_clean {
2632            if new_current_session_self_clean {
2633                task.set_current_session_clean(true);
2634            } else if old_current_session_self_clean {
2635                task.set_current_session_clean(false);
2636            }
2637        }
2638
2639        // Propagate dirtyness changes
2640        let data_update = if old_self_dirty != new_self_dirty
2641            || old_current_session_self_clean != new_current_session_self_clean
2642        {
2643            let dirty_container_count = task
2644                .get_aggregated_dirty_container_count()
2645                .cloned()
2646                .unwrap_or_default();
2647            let current_session_clean_container_count = task
2648                .get_aggregated_current_session_clean_container_count()
2649                .copied()
2650                .unwrap_or_default();
2651            let result = ComputeDirtyAndCleanUpdate {
2652                old_dirty_container_count: dirty_container_count,
2653                new_dirty_container_count: dirty_container_count,
2654                old_current_session_clean_container_count: current_session_clean_container_count,
2655                new_current_session_clean_container_count: current_session_clean_container_count,
2656                old_self_dirty,
2657                new_self_dirty,
2658                old_current_session_self_clean,
2659                new_current_session_self_clean,
2660            }
2661            .compute();
2662            if result.dirty_count_update - result.current_session_clean_update < 0 {
2663                // The task is clean now
2664                if let Some(activeness_state) = task.get_activeness_mut() {
2665                    activeness_state.all_clean_event.notify(usize::MAX);
2666                    activeness_state.unset_active_until_clean();
2667                    if activeness_state.is_empty() {
2668                        task.take_activeness();
2669                    }
2670                }
2671            }
2672            result
2673                .aggregated_update(task_id)
2674                .and_then(|aggregated_update| {
2675                    AggregationUpdateJob::data_update(&mut task, aggregated_update)
2676                })
2677        } else {
2678            None
2679        };
2680
2681        #[cfg(feature = "verify_determinism")]
2682        let reschedule =
2683            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2684        #[cfg(not(feature = "verify_determinism"))]
2685        let reschedule = false;
2686        if reschedule {
2687            let old = task.set_in_progress(InProgressState::Scheduled {
2688                done_event,
2689                reason: TaskExecutionReason::Stale,
2690            });
2691            debug_assert!(old.is_none(), "InProgress already exists");
2692            drop(task);
2693        } else {
2694            drop(task);
2695
2696            // Notify dependent tasks that are waiting for this task to finish
2697            done_event.notify(usize::MAX);
2698        }
2699
2700        drop(old_content);
2701
2702        if let Some(data_update) = data_update {
2703            AggregationUpdateQueue::run(data_update, ctx);
2704        }
2705
2706        // We return so the data can be dropped outside of critical sections
2707        (reschedule, in_progress_cells)
2708    }
2709
2710    fn task_execution_completed_cleanup(
2711        &self,
2712        ctx: &mut impl ExecuteContext<'_>,
2713        task_id: TaskId,
2714        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2715        is_error: bool,
2716    ) -> Vec<SharedReference> {
2717        let mut task = ctx.task(task_id, TaskDataCategory::All);
2718        let mut removed_cell_data = Vec::new();
2719        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2720        // after an error as it is likely transient and we want to keep the dependent tasks
2721        // clean to avoid re-executions.
2722        if !is_error {
2723            // Remove no longer existing cells and
2724            // find all outdated data items (removed cells, outdated edges)
2725            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2726            // anyway and we want to avoid needless re-executions. When the cells become
2727            // used again, they are invalidated from the update cell operation.
2728            // Remove cell data for cells that no longer exist
2729            let to_remove_persistent: Vec<_> = task
2730                .iter_persistent_cell_data()
2731                .filter_map(|(cell, _)| {
2732                    cell_counters
2733                        .get(&cell.type_id)
2734                        .is_none_or(|start_index| cell.index >= *start_index)
2735                        .then_some(*cell)
2736                })
2737                .collect();
2738
2739            // Remove transient cell data for cells that no longer exist
2740            let to_remove_transient: Vec<_> = task
2741                .iter_transient_cell_data()
2742                .filter_map(|(cell, _)| {
2743                    cell_counters
2744                        .get(&cell.type_id)
2745                        .is_none_or(|start_index| cell.index >= *start_index)
2746                        .then_some(*cell)
2747                })
2748                .collect();
2749            removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2750            for cell in to_remove_persistent {
2751                if let Some(data) = task.remove_persistent_cell_data(&cell) {
2752                    removed_cell_data.push(data.into_untyped());
2753                }
2754            }
2755            for cell in to_remove_transient {
2756                if let Some(data) = task.remove_transient_cell_data(&cell) {
2757                    removed_cell_data.push(data);
2758                }
2759            }
2760        }
2761
2762        // Clean up task storage after execution:
2763        // - Shrink collections marked with shrink_on_completion
2764        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2765        task.cleanup_after_execution();
2766
2767        drop(task);
2768
2769        // Return so we can drop outside of critical sections
2770        removed_cell_data
2771    }
2772
2773    fn run_backend_job<'a>(
2774        self: &'a Arc<Self>,
2775        job: TurboTasksBackendJob,
2776        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2777    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2778        Box::pin(async move {
2779            match job {
2780                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2781                    debug_assert!(self.should_persist());
2782
2783                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2784                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2785                    let mut idle_start_listener = self.idle_start_event.listen();
2786                    let mut idle_end_listener = self.idle_end_event.listen();
2787                    let mut fresh_idle = true;
2788                    loop {
2789                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2790                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2791                        let idle_timeout = *IDLE_TIMEOUT;
2792                        let (time, mut reason) =
2793                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2794                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2795                            } else {
2796                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2797                            };
2798
2799                        let until = last_snapshot + time;
2800                        if until > Instant::now() {
2801                            let mut stop_listener = self.stopping_event.listen();
2802                            if self.stopping.load(Ordering::Acquire) {
2803                                return;
2804                            }
2805                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2806                                Instant::now() + idle_timeout
2807                            } else {
2808                                far_future()
2809                            };
2810                            loop {
2811                                tokio::select! {
2812                                    _ = &mut stop_listener => {
2813                                        return;
2814                                    },
2815                                    _ = &mut idle_start_listener => {
2816                                        fresh_idle = true;
2817                                        idle_time = Instant::now() + idle_timeout;
2818                                        idle_start_listener = self.idle_start_event.listen()
2819                                    },
2820                                    _ = &mut idle_end_listener => {
2821                                        idle_time = until + idle_timeout;
2822                                        idle_end_listener = self.idle_end_event.listen()
2823                                    },
2824                                    _ = tokio::time::sleep_until(until) => {
2825                                        break;
2826                                    },
2827                                    _ = tokio::time::sleep_until(idle_time) => {
2828                                        if turbo_tasks.is_idle() {
2829                                            reason = "idle timeout";
2830                                            break;
2831                                        }
2832                                    },
2833                                }
2834                            }
2835                        }
2836
2837                        let this = self.clone();
2838                        // Create a root span shared by both the snapshot/persist
2839                        // work and the subsequent compaction so they appear
2840                        // grouped together in trace viewers.
2841                        let background_span =
2842                            tracing::info_span!(parent: None, "background snapshot");
2843                        let snapshot =
2844                            this.snapshot_and_persist(background_span.id(), reason, turbo_tasks);
2845                        if let Some((snapshot_start, new_data)) = snapshot {
2846                            last_snapshot = snapshot_start;
2847
2848                            // Compact while idle (up to limit), regardless of
2849                            // whether the snapshot had new data.
2850                            // `background_span` is not entered here because
2851                            // `EnteredSpan` is `!Send` and would prevent the
2852                            // future from being sent across threads when it
2853                            // suspends at the `select!` await below.
2854                            const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2855                            for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2856                                let idle_ended = tokio::select! {
2857                                    biased;
2858                                    _ = &mut idle_end_listener => {
2859                                        idle_end_listener = self.idle_end_event.listen();
2860                                        true
2861                                    },
2862                                    _ = std::future::ready(()) => false,
2863                                };
2864                                if idle_ended {
2865                                    break;
2866                                }
2867                                // Enter the span only around the synchronous
2868                                // compact() call so we never hold an
2869                                // `EnteredSpan` across an await point.
2870                                let _compact_span = tracing::info_span!(
2871                                    parent: background_span.id(),
2872                                    "compact database"
2873                                )
2874                                .entered();
2875                                match self.backing_storage.compact() {
2876                                    Ok(true) => {}
2877                                    Ok(false) => break,
2878                                    Err(err) => {
2879                                        eprintln!("Compaction failed: {err:?}");
2880                                        break;
2881                                    }
2882                                }
2883                            }
2884
2885                            if !new_data {
2886                                fresh_idle = false;
2887                                continue;
2888                            }
2889                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2890                            self.last_snapshot.store(
2891                                last_snapshot.as_millis().try_into().unwrap(),
2892                                Ordering::Relaxed,
2893                            );
2894
2895                            turbo_tasks.schedule_backend_background_job(
2896                                TurboTasksBackendJob::FollowUpSnapshot,
2897                            );
2898                            return;
2899                        }
2900                    }
2901                }
2902            }
2903        })
2904    }
2905
2906    fn try_read_own_task_cell(
2907        &self,
2908        task_id: TaskId,
2909        cell: CellId,
2910        options: ReadCellOptions,
2911        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2912    ) -> Result<TypedCellContent> {
2913        let mut ctx = self.execute_context(turbo_tasks);
2914        let task = ctx.task(task_id, TaskDataCategory::Data);
2915        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2916            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2917            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2918        } else {
2919            Ok(CellContent(None).into_typed(cell.type_id))
2920        }
2921    }
2922
2923    fn read_task_collectibles(
2924        &self,
2925        task_id: TaskId,
2926        collectible_type: TraitTypeId,
2927        reader_id: Option<TaskId>,
2928        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2929    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2930        let mut ctx = self.execute_context(turbo_tasks);
2931        let mut collectibles = AutoMap::default();
2932        {
2933            let mut task = ctx.task(task_id, TaskDataCategory::All);
2934            // Ensure it's an root node
2935            loop {
2936                let aggregation_number = get_aggregation_number(&task);
2937                if is_root_node(aggregation_number) {
2938                    break;
2939                }
2940                drop(task);
2941                AggregationUpdateQueue::run(
2942                    AggregationUpdateJob::UpdateAggregationNumber {
2943                        task_id,
2944                        base_aggregation_number: u32::MAX,
2945                        distance: None,
2946                    },
2947                    &mut ctx,
2948                );
2949                task = ctx.task(task_id, TaskDataCategory::All);
2950            }
2951            for (collectible, count) in task.iter_aggregated_collectibles() {
2952                if *count > 0 && collectible.collectible_type == collectible_type {
2953                    *collectibles
2954                        .entry(RawVc::TaskCell(
2955                            collectible.cell.task,
2956                            collectible.cell.cell,
2957                        ))
2958                        .or_insert(0) += 1;
2959                }
2960            }
2961            for (&collectible, &count) in task.iter_collectibles() {
2962                if collectible.collectible_type == collectible_type {
2963                    *collectibles
2964                        .entry(RawVc::TaskCell(
2965                            collectible.cell.task,
2966                            collectible.cell.cell,
2967                        ))
2968                        .or_insert(0) += count;
2969                }
2970            }
2971            if let Some(reader_id) = reader_id {
2972                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2973            }
2974        }
2975        if let Some(reader_id) = reader_id {
2976            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2977            let target = CollectiblesRef {
2978                task: task_id,
2979                collectible_type,
2980            };
2981            if !reader.remove_outdated_collectibles_dependencies(&target) {
2982                let _ = reader.add_collectibles_dependencies(target);
2983            }
2984        }
2985        collectibles
2986    }
2987
2988    fn emit_collectible(
2989        &self,
2990        collectible_type: TraitTypeId,
2991        collectible: RawVc,
2992        task_id: TaskId,
2993        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2994    ) {
2995        self.assert_valid_collectible(task_id, collectible);
2996
2997        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2998            panic!("Collectibles need to be resolved");
2999        };
3000        let cell = CellRef {
3001            task: collectible_task,
3002            cell,
3003        };
3004        operation::UpdateCollectibleOperation::run(
3005            task_id,
3006            CollectibleRef {
3007                collectible_type,
3008                cell,
3009            },
3010            1,
3011            self.execute_context(turbo_tasks),
3012        );
3013    }
3014
3015    fn unemit_collectible(
3016        &self,
3017        collectible_type: TraitTypeId,
3018        collectible: RawVc,
3019        count: u32,
3020        task_id: TaskId,
3021        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3022    ) {
3023        self.assert_valid_collectible(task_id, collectible);
3024
3025        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3026            panic!("Collectibles need to be resolved");
3027        };
3028        let cell = CellRef {
3029            task: collectible_task,
3030            cell,
3031        };
3032        operation::UpdateCollectibleOperation::run(
3033            task_id,
3034            CollectibleRef {
3035                collectible_type,
3036                cell,
3037            },
3038            -(i32::try_from(count).unwrap()),
3039            self.execute_context(turbo_tasks),
3040        );
3041    }
3042
3043    fn update_task_cell(
3044        &self,
3045        task_id: TaskId,
3046        cell: CellId,
3047        is_serializable_cell_content: bool,
3048        content: CellContent,
3049        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3050        content_hash: Option<[u8; 16]>,
3051        verification_mode: VerificationMode,
3052        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3053    ) {
3054        operation::UpdateCellOperation::run(
3055            task_id,
3056            cell,
3057            content,
3058            is_serializable_cell_content,
3059            updated_key_hashes,
3060            content_hash,
3061            verification_mode,
3062            self.execute_context(turbo_tasks),
3063        );
3064    }
3065
3066    fn mark_own_task_as_session_dependent(
3067        &self,
3068        task_id: TaskId,
3069        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3070    ) {
3071        if !self.should_track_dependencies() {
3072            // Without dependency tracking we don't need session dependent tasks
3073            return;
3074        }
3075        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3076        let mut ctx = self.execute_context(turbo_tasks);
3077        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3078        let aggregation_number = get_aggregation_number(&task);
3079        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3080            drop(task);
3081            // We want to use a high aggregation number to avoid large aggregation chains for
3082            // session dependent tasks (which change on every run)
3083            AggregationUpdateQueue::run(
3084                AggregationUpdateJob::UpdateAggregationNumber {
3085                    task_id,
3086                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3087                    distance: None,
3088                },
3089                &mut ctx,
3090            );
3091            task = ctx.task(task_id, TaskDataCategory::Meta);
3092        }
3093        if let Some(InProgressState::InProgress(box InProgressStateInner {
3094            session_dependent,
3095            ..
3096        })) = task.get_in_progress_mut()
3097        {
3098            *session_dependent = true;
3099        }
3100    }
3101
3102    fn mark_own_task_as_finished(
3103        &self,
3104        task: TaskId,
3105        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3106    ) {
3107        let mut ctx = self.execute_context(turbo_tasks);
3108        let mut task = ctx.task(task, TaskDataCategory::Data);
3109        if let Some(InProgressState::InProgress(box InProgressStateInner {
3110            marked_as_completed,
3111            ..
3112        })) = task.get_in_progress_mut()
3113        {
3114            *marked_as_completed = true;
3115            // TODO this should remove the dirty state (also check session_dependent)
3116            // but this would break some assumptions for strongly consistent reads.
3117            // Client tasks are not connected yet, so we wouldn't wait for them.
3118            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3119        }
3120    }
3121
3122    fn connect_task(
3123        &self,
3124        task: TaskId,
3125        parent_task: Option<TaskId>,
3126        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3127    ) {
3128        self.assert_not_persistent_calling_transient(parent_task, task, None);
3129        ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3130    }
3131
3132    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3133        let task_id = self.transient_task_id_factory.get();
3134        {
3135            let mut task = self.storage.access_mut(task_id);
3136            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3137        }
3138        #[cfg(feature = "verify_aggregation_graph")]
3139        self.root_tasks.lock().insert(task_id);
3140        task_id
3141    }
3142
3143    fn dispose_root_task(
3144        &self,
3145        task_id: TaskId,
3146        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3147    ) {
3148        #[cfg(feature = "verify_aggregation_graph")]
3149        self.root_tasks.lock().remove(&task_id);
3150
3151        let mut ctx = self.execute_context(turbo_tasks);
3152        let mut task = ctx.task(task_id, TaskDataCategory::All);
3153        let is_dirty = task.is_dirty();
3154        let has_dirty_containers = task.has_dirty_containers();
3155        if is_dirty.is_some() || has_dirty_containers {
3156            if let Some(activeness_state) = task.get_activeness_mut() {
3157                // We will finish the task, but it would be removed after the task is done
3158                activeness_state.unset_root_type();
3159                activeness_state.set_active_until_clean();
3160            };
3161        } else if let Some(activeness_state) = task.take_activeness() {
3162            // Technically nobody should be listening to this event, but just in case
3163            // we notify it anyway
3164            activeness_state.all_clean_event.notify(usize::MAX);
3165        }
3166    }
3167
3168    #[cfg(feature = "verify_aggregation_graph")]
3169    fn verify_aggregation_graph(
3170        &self,
3171        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3172        idle: bool,
3173    ) {
3174        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3175            return;
3176        }
3177        use std::{collections::VecDeque, env, io::stdout};
3178
3179        use crate::backend::operation::{get_uppers, is_aggregating_node};
3180
3181        let mut ctx = self.execute_context(turbo_tasks);
3182        let root_tasks = self.root_tasks.lock().clone();
3183
3184        for task_id in root_tasks.into_iter() {
3185            let mut queue = VecDeque::new();
3186            let mut visited = FxHashSet::default();
3187            let mut aggregated_nodes = FxHashSet::default();
3188            let mut collectibles = FxHashMap::default();
3189            let root_task_id = task_id;
3190            visited.insert(task_id);
3191            aggregated_nodes.insert(task_id);
3192            queue.push_back(task_id);
3193            let mut counter = 0;
3194            while let Some(task_id) = queue.pop_front() {
3195                counter += 1;
3196                if counter % 100000 == 0 {
3197                    println!(
3198                        "queue={}, visited={}, aggregated_nodes={}",
3199                        queue.len(),
3200                        visited.len(),
3201                        aggregated_nodes.len()
3202                    );
3203                }
3204                let task = ctx.task(task_id, TaskDataCategory::All);
3205                if idle && !self.is_idle.load(Ordering::Relaxed) {
3206                    return;
3207                }
3208
3209                let uppers = get_uppers(&task);
3210                if task_id != root_task_id
3211                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3212                {
3213                    panic!(
3214                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3215                         {:?})",
3216                        task_id,
3217                        task.get_task_description(),
3218                        uppers
3219                    );
3220                }
3221
3222                for (collectible, _) in task.iter_aggregated_collectibles() {
3223                    collectibles
3224                        .entry(*collectible)
3225                        .or_insert_with(|| (false, Vec::new()))
3226                        .1
3227                        .push(task_id);
3228                }
3229
3230                for (&collectible, &value) in task.iter_collectibles() {
3231                    if value > 0 {
3232                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3233                            *flag = true
3234                        } else {
3235                            panic!(
3236                                "Task {} has a collectible {:?} that is not in any upper task",
3237                                task_id, collectible
3238                            );
3239                        }
3240                    }
3241                }
3242
3243                let is_dirty = task.has_dirty();
3244                let has_dirty_container = task.has_dirty_containers();
3245                let should_be_in_upper = is_dirty || has_dirty_container;
3246
3247                let aggregation_number = get_aggregation_number(&task);
3248                if is_aggregating_node(aggregation_number) {
3249                    aggregated_nodes.insert(task_id);
3250                }
3251                // println!(
3252                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3253                //     ctx.get_task_description(task_id),
3254                //     uppers
3255                // );
3256
3257                for child_id in task.iter_children() {
3258                    // println!("{task_id}: child -> {child_id}");
3259                    if visited.insert(child_id) {
3260                        queue.push_back(child_id);
3261                    }
3262                }
3263                drop(task);
3264
3265                if should_be_in_upper {
3266                    for upper_id in uppers {
3267                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3268                        let in_upper = upper
3269                            .get_aggregated_dirty_containers(&task_id)
3270                            .is_some_and(|&dirty| dirty > 0);
3271                        if !in_upper {
3272                            let containers: Vec<_> = upper
3273                                .iter_aggregated_dirty_containers()
3274                                .map(|(&k, &v)| (k, v))
3275                                .collect();
3276                            let upper_task_desc = upper.get_task_description();
3277                            drop(upper);
3278                            panic!(
3279                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3280                                 ({})\nThese dirty containers are present:\n{:#?}",
3281                                task_id,
3282                                ctx.task(task_id, TaskDataCategory::Data)
3283                                    .get_task_description(),
3284                                upper_id,
3285                                upper_task_desc,
3286                                containers,
3287                            );
3288                        }
3289                    }
3290                }
3291            }
3292
3293            for (collectible, (flag, task_ids)) in collectibles {
3294                if !flag {
3295                    use std::io::Write;
3296                    let mut stdout = stdout().lock();
3297                    writeln!(
3298                        stdout,
3299                        "{:?} that is not emitted in any child task but in these aggregated \
3300                         tasks: {:#?}",
3301                        collectible,
3302                        task_ids
3303                            .iter()
3304                            .map(|t| format!(
3305                                "{t} {}",
3306                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3307                            ))
3308                            .collect::<Vec<_>>()
3309                    )
3310                    .unwrap();
3311
3312                    let task_id = collectible.cell.task;
3313                    let mut queue = {
3314                        let task = ctx.task(task_id, TaskDataCategory::All);
3315                        get_uppers(&task)
3316                    };
3317                    let mut visited = FxHashSet::default();
3318                    for &upper_id in queue.iter() {
3319                        visited.insert(upper_id);
3320                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3321                    }
3322                    while let Some(task_id) = queue.pop() {
3323                        let task = ctx.task(task_id, TaskDataCategory::All);
3324                        let desc = task.get_task_description();
3325                        let aggregated_collectible = task
3326                            .get_aggregated_collectibles(&collectible)
3327                            .copied()
3328                            .unwrap_or_default();
3329                        let uppers = get_uppers(&task);
3330                        drop(task);
3331                        writeln!(
3332                            stdout,
3333                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3334                        )
3335                        .unwrap();
3336                        if task_ids.contains(&task_id) {
3337                            writeln!(
3338                                stdout,
3339                                "Task has an upper connection to an aggregated task that doesn't \
3340                                 reference it. Upper connection is invalid!"
3341                            )
3342                            .unwrap();
3343                        }
3344                        for upper_id in uppers {
3345                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3346                            if !visited.contains(&upper_id) {
3347                                queue.push(upper_id);
3348                            }
3349                        }
3350                    }
3351                    panic!("See stdout for more details");
3352                }
3353            }
3354        }
3355    }
3356
3357    fn assert_not_persistent_calling_transient(
3358        &self,
3359        parent_id: Option<TaskId>,
3360        child_id: TaskId,
3361        cell_id: Option<CellId>,
3362    ) {
3363        if let Some(parent_id) = parent_id
3364            && !parent_id.is_transient()
3365            && child_id.is_transient()
3366        {
3367            self.panic_persistent_calling_transient(
3368                self.debug_get_task_description(parent_id),
3369                self.debug_get_cached_task_type(child_id).as_deref(),
3370                cell_id,
3371            );
3372        }
3373    }
3374
3375    fn panic_persistent_calling_transient(
3376        &self,
3377        parent: String,
3378        child: Option<&CachedTaskType>,
3379        cell_id: Option<CellId>,
3380    ) {
3381        let transient_reason = if let Some(child) = child {
3382            Cow::Owned(format!(
3383                " The callee is transient because it depends on:\n{}",
3384                self.debug_trace_transient_task(child, cell_id),
3385            ))
3386        } else {
3387            Cow::Borrowed("")
3388        };
3389        panic!(
3390            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3391            parent,
3392            child.map_or("unknown", |t| t.get_name()),
3393            transient_reason,
3394        );
3395    }
3396
3397    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3398        // these checks occur in a potentially hot codepath, but they're cheap
3399        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3400            // This should never happen: The collectible APIs use ResolvedVc
3401            let task_info = if let Some(col_task_ty) = collectible
3402                .try_get_task_id()
3403                .map(|t| self.debug_get_task_description(t))
3404            {
3405                Cow::Owned(format!(" (return type of {col_task_ty})"))
3406            } else {
3407                Cow::Borrowed("")
3408            };
3409            panic!("Collectible{task_info} must be a ResolvedVc")
3410        };
3411        if col_task_id.is_transient() && !task_id.is_transient() {
3412            let transient_reason =
3413                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3414                    Cow::Owned(format!(
3415                        ". The collectible is transient because it depends on:\n{}",
3416                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3417                    ))
3418                } else {
3419                    Cow::Borrowed("")
3420                };
3421            // this should never happen: How would a persistent function get a transient Vc?
3422            panic!(
3423                "Collectible is transient, transient collectibles cannot be emitted from \
3424                 persistent tasks{transient_reason}",
3425            )
3426        }
3427    }
3428}
3429
3430impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3431    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3432        self.0.startup(turbo_tasks);
3433    }
3434
3435    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3436        self.0.stopping();
3437    }
3438
3439    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3440        self.0.stop(turbo_tasks);
3441    }
3442
3443    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3444        self.0.idle_start(turbo_tasks);
3445    }
3446
3447    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3448        self.0.idle_end();
3449    }
3450
3451    fn get_or_create_persistent_task(
3452        &self,
3453        task_type: CachedTaskType,
3454        parent_task: Option<TaskId>,
3455        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3456    ) -> TaskId {
3457        self.0
3458            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3459    }
3460
3461    fn get_or_create_transient_task(
3462        &self,
3463        task_type: CachedTaskType,
3464        parent_task: Option<TaskId>,
3465        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3466    ) -> TaskId {
3467        self.0
3468            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3469    }
3470
3471    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3472        self.0.invalidate_task(task_id, turbo_tasks);
3473    }
3474
3475    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3476        self.0.invalidate_tasks(tasks, turbo_tasks);
3477    }
3478
3479    fn invalidate_tasks_set(
3480        &self,
3481        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3482        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3483    ) {
3484        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3485    }
3486
3487    fn invalidate_serialization(
3488        &self,
3489        task_id: TaskId,
3490        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3491    ) {
3492        self.0.invalidate_serialization(task_id, turbo_tasks);
3493    }
3494
3495    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3496        self.0.task_execution_canceled(task, turbo_tasks)
3497    }
3498
3499    fn try_start_task_execution(
3500        &self,
3501        task_id: TaskId,
3502        priority: TaskPriority,
3503        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3504    ) -> Option<TaskExecutionSpec<'_>> {
3505        self.0
3506            .try_start_task_execution(task_id, priority, turbo_tasks)
3507    }
3508
3509    fn task_execution_completed(
3510        &self,
3511        task_id: TaskId,
3512        result: Result<RawVc, TurboTasksExecutionError>,
3513        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3514        #[cfg(feature = "verify_determinism")] stateful: bool,
3515        has_invalidator: bool,
3516        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3517    ) -> bool {
3518        self.0.task_execution_completed(
3519            task_id,
3520            result,
3521            cell_counters,
3522            #[cfg(feature = "verify_determinism")]
3523            stateful,
3524            has_invalidator,
3525            turbo_tasks,
3526        )
3527    }
3528
3529    type BackendJob = TurboTasksBackendJob;
3530
3531    fn run_backend_job<'a>(
3532        &'a self,
3533        job: Self::BackendJob,
3534        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3535    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3536        self.0.run_backend_job(job, turbo_tasks)
3537    }
3538
3539    fn try_read_task_output(
3540        &self,
3541        task_id: TaskId,
3542        reader: Option<TaskId>,
3543        options: ReadOutputOptions,
3544        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3545    ) -> Result<Result<RawVc, EventListener>> {
3546        self.0
3547            .try_read_task_output(task_id, reader, options, turbo_tasks)
3548    }
3549
3550    fn try_read_task_cell(
3551        &self,
3552        task_id: TaskId,
3553        cell: CellId,
3554        reader: Option<TaskId>,
3555        options: ReadCellOptions,
3556        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3557    ) -> Result<Result<TypedCellContent, EventListener>> {
3558        self.0
3559            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3560    }
3561
3562    fn try_read_own_task_cell(
3563        &self,
3564        task_id: TaskId,
3565        cell: CellId,
3566        options: ReadCellOptions,
3567        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3568    ) -> Result<TypedCellContent> {
3569        self.0
3570            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3571    }
3572
3573    fn read_task_collectibles(
3574        &self,
3575        task_id: TaskId,
3576        collectible_type: TraitTypeId,
3577        reader: Option<TaskId>,
3578        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3579    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3580        self.0
3581            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3582    }
3583
3584    fn emit_collectible(
3585        &self,
3586        collectible_type: TraitTypeId,
3587        collectible: RawVc,
3588        task_id: TaskId,
3589        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3590    ) {
3591        self.0
3592            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3593    }
3594
3595    fn unemit_collectible(
3596        &self,
3597        collectible_type: TraitTypeId,
3598        collectible: RawVc,
3599        count: u32,
3600        task_id: TaskId,
3601        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3602    ) {
3603        self.0
3604            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3605    }
3606
3607    fn update_task_cell(
3608        &self,
3609        task_id: TaskId,
3610        cell: CellId,
3611        is_serializable_cell_content: bool,
3612        content: CellContent,
3613        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3614        content_hash: Option<[u8; 16]>,
3615        verification_mode: VerificationMode,
3616        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3617    ) {
3618        self.0.update_task_cell(
3619            task_id,
3620            cell,
3621            is_serializable_cell_content,
3622            content,
3623            updated_key_hashes,
3624            content_hash,
3625            verification_mode,
3626            turbo_tasks,
3627        );
3628    }
3629
3630    fn mark_own_task_as_finished(
3631        &self,
3632        task_id: TaskId,
3633        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3634    ) {
3635        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3636    }
3637
3638    fn mark_own_task_as_session_dependent(
3639        &self,
3640        task: TaskId,
3641        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3642    ) {
3643        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3644    }
3645
3646    fn connect_task(
3647        &self,
3648        task: TaskId,
3649        parent_task: Option<TaskId>,
3650        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3651    ) {
3652        self.0.connect_task(task, parent_task, turbo_tasks);
3653    }
3654
3655    fn create_transient_task(
3656        &self,
3657        task_type: TransientTaskType,
3658        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3659    ) -> TaskId {
3660        self.0.create_transient_task(task_type)
3661    }
3662
3663    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3664        self.0.dispose_root_task(task_id, turbo_tasks);
3665    }
3666
3667    fn task_statistics(&self) -> &TaskStatisticsApi {
3668        &self.0.task_statistics
3669    }
3670
3671    fn is_tracking_dependencies(&self) -> bool {
3672        self.0.options.dependency_tracking
3673    }
3674
3675    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3676        self.0.get_task_name(task, turbo_tasks)
3677    }
3678}
3679
3680enum DebugTraceTransientTask {
3681    Cached {
3682        task_name: &'static str,
3683        cell_type_id: Option<ValueTypeId>,
3684        cause_self: Option<Box<DebugTraceTransientTask>>,
3685        cause_args: Vec<DebugTraceTransientTask>,
3686    },
3687    /// This representation is used when this task is a duplicate of one previously shown
3688    Collapsed {
3689        task_name: &'static str,
3690        cell_type_id: Option<ValueTypeId>,
3691    },
3692    Uncached {
3693        cell_type_id: Option<ValueTypeId>,
3694    },
3695}
3696
3697impl DebugTraceTransientTask {
3698    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3699        let indent = "    ".repeat(level);
3700        f.write_str(&indent)?;
3701
3702        fn fmt_cell_type_id(
3703            f: &mut fmt::Formatter<'_>,
3704            cell_type_id: Option<ValueTypeId>,
3705        ) -> fmt::Result {
3706            if let Some(ty) = cell_type_id {
3707                write!(
3708                    f,
3709                    " (read cell of type {})",
3710                    get_value_type(ty).ty.global_name
3711                )
3712            } else {
3713                Ok(())
3714            }
3715        }
3716
3717        // write the name and type
3718        match self {
3719            Self::Cached {
3720                task_name,
3721                cell_type_id,
3722                ..
3723            }
3724            | Self::Collapsed {
3725                task_name,
3726                cell_type_id,
3727                ..
3728            } => {
3729                f.write_str(task_name)?;
3730                fmt_cell_type_id(f, *cell_type_id)?;
3731                if matches!(self, Self::Collapsed { .. }) {
3732                    f.write_str(" (collapsed)")?;
3733                }
3734            }
3735            Self::Uncached { cell_type_id } => {
3736                f.write_str("unknown transient task")?;
3737                fmt_cell_type_id(f, *cell_type_id)?;
3738            }
3739        }
3740        f.write_char('\n')?;
3741
3742        // write any extra "cause" information we might have
3743        if let Self::Cached {
3744            cause_self,
3745            cause_args,
3746            ..
3747        } = self
3748        {
3749            if let Some(c) = cause_self {
3750                writeln!(f, "{indent}  self:")?;
3751                c.fmt_indented(f, level + 1)?;
3752            }
3753            if !cause_args.is_empty() {
3754                writeln!(f, "{indent}  args:")?;
3755                for c in cause_args {
3756                    c.fmt_indented(f, level + 1)?;
3757                }
3758            }
3759        }
3760        Ok(())
3761    }
3762}
3763
3764impl fmt::Display for DebugTraceTransientTask {
3765    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3766        self.fmt_indented(f, 0)
3767    }
3768}
3769
3770// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3771fn far_future() -> Instant {
3772    // Roughly 30 years from now.
3773    // API does not provide a way to obtain max `Instant`
3774    // or convert specific date in the future to instant.
3775    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3776    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3777}
3778
3779/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3780/// buffer.
3781/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3782/// resulting buffer sizes.
3783fn encode_task_data(
3784    task: TaskId,
3785    data: &TaskStorage,
3786    category: SpecificTaskDataCategory,
3787    scratch_buffer: &mut TurboBincodeBuffer,
3788) -> Result<TurboBincodeBuffer> {
3789    scratch_buffer.clear();
3790    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3791    data.encode(category, &mut encoder)?;
3792
3793    if cfg!(feature = "verify_serialization") {
3794        TaskStorage::new()
3795            .decode(
3796                category,
3797                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3798            )
3799            .with_context(|| {
3800                format!(
3801                    "expected to be able to decode serialized data for '{category:?}' information \
3802                     for {task}"
3803                )
3804            })?;
3805    }
3806    Ok(SmallVec::from_slice(scratch_buffer))
3807}