Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7    borrow::Cow,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    pin::Pin,
13    sync::{
14        Arc, LazyLock,
15        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16    },
17    time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32    TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33    backend::{
34        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
35        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37        VerificationMode,
38    },
39    event::{Event, EventDescription, EventListener},
40    message_queue::{TimingEvent, TraceEvent},
41    registry::get_value_type,
42    scope::scope_and_block,
43    task_statistics::TaskStatisticsApi,
44    trace::TraceRawVcs,
45    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49    operation::AnyOperation,
50    storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55    backend::{
56        operation::{
57            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58            CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
59            ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
60            TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
61            is_root_node, make_task_dirty_internal, prepare_new_children,
62        },
63        storage::Storage,
64        storage_schema::{TaskStorage, TaskStorageAccessors},
65    },
66    backing_storage::BackingStorage,
67    data::{
68        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69        InProgressState, InProgressStateInner, OutputValue, TransientTask,
70    },
71    error::TaskError,
72    utils::{
73        arc_or_owned::ArcOrOwned,
74        chunked_vec::ChunkedVec,
75        dash_map_drop_contents::drop_contents,
76        dash_map_raw_entry::{RawEntry, raw_entry},
77        ptr_eq_arc::PtrEqArc,
78        shard_amount::compute_shard_amount,
79        sharded::Sharded,
80        swap_retain,
81    },
82};
83
84/// Threshold for parallelizing making dependent tasks dirty.
85/// If the number of dependent tasks exceeds this threshold,
86/// the operation will be parallelized.
87const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
88
89const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
90
91/// Configurable idle timeout for snapshot persistence.
92/// Defaults to 2 seconds if not set or if the value is invalid.
93static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
94    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
95        .ok()
96        .and_then(|v| v.parse::<u64>().ok())
97        .map(Duration::from_millis)
98        .unwrap_or(Duration::from_secs(2))
99});
100
101struct SnapshotRequest {
102    snapshot_requested: bool,
103    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
104}
105
106impl SnapshotRequest {
107    fn new() -> Self {
108        Self {
109            snapshot_requested: false,
110            suspended_operations: FxHashSet::default(),
111        }
112    }
113}
114
115pub enum StorageMode {
116    /// Queries the storage for cache entries that don't exist locally.
117    ReadOnly,
118    /// Queries the storage for cache entries that don't exist locally.
119    /// Regularly pushes changes to the backing storage.
120    ReadWrite,
121    /// Queries the storage for cache entries that don't exist locally.
122    /// On shutdown, pushes all changes to the backing storage.
123    ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127    /// Enables dependency tracking.
128    ///
129    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
130    /// forever.
131    pub dependency_tracking: bool,
132
133    /// Enables active tracking.
134    ///
135    /// Automatically disabled when `dependency_tracking` is disabled.
136    ///
137    /// When disabled: All tasks are considered as active.
138    pub active_tracking: bool,
139
140    /// Enables the backing storage.
141    pub storage_mode: Option<StorageMode>,
142
143    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
144    /// datastructures. If `None`, it will use the available parallelism.
145    pub num_workers: Option<usize>,
146
147    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
148    pub small_preallocation: bool,
149}
150
151impl Default for BackendOptions {
152    fn default() -> Self {
153        Self {
154            dependency_tracking: true,
155            active_tracking: true,
156            storage_mode: Some(StorageMode::ReadWrite),
157            num_workers: None,
158            small_preallocation: false,
159        }
160    }
161}
162
163pub enum TurboTasksBackendJob {
164    InitialSnapshot,
165    FollowUpSnapshot,
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173    options: BackendOptions,
174
175    start_time: Instant,
176
177    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
178    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
179
180    persisted_task_cache_log: Option<TaskCacheLog>,
181    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
182
183    storage: Storage,
184
185    /// When true, the backing_storage has data that is not in the local storage.
186    local_is_partial: AtomicBool,
187
188    /// Number of executing operations + Highest bit is set when snapshot is
189    /// requested. When that bit is set, operations should pause until the
190    /// snapshot is completed. When the bit is set and in progress counter
191    /// reaches zero, `operations_completed_when_snapshot_requested` is
192    /// triggered.
193    in_progress_operations: AtomicUsize,
194
195    snapshot_request: Mutex<SnapshotRequest>,
196    /// Condition Variable that is triggered when `in_progress_operations`
197    /// reaches zero while snapshot is requested. All operations are either
198    /// completed or suspended.
199    operations_suspended: Condvar,
200    /// Condition Variable that is triggered when a snapshot is completed and
201    /// operations can continue.
202    snapshot_completed: Condvar,
203    /// The timestamp of the last started snapshot since [`Self::start_time`].
204    last_snapshot: AtomicU64,
205
206    stopping: AtomicBool,
207    stopping_event: Event,
208    idle_start_event: Event,
209    idle_end_event: Event,
210    #[cfg(feature = "verify_aggregation_graph")]
211    is_idle: AtomicBool,
212
213    task_statistics: TaskStatisticsApi,
214
215    backing_storage: B,
216
217    #[cfg(feature = "verify_aggregation_graph")]
218    root_tasks: Mutex<FxHashSet<TaskId>>,
219}
220
221impl<B: BackingStorage> TurboTasksBackend<B> {
222    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
223        Self(Arc::new(TurboTasksBackendInner::new(
224            options,
225            backing_storage,
226        )))
227    }
228
229    pub fn backing_storage(&self) -> &B {
230        &self.0.backing_storage
231    }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237        let need_log = matches!(
238            options.storage_mode,
239            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
240        );
241        if !options.dependency_tracking {
242            options.active_tracking = false;
243        }
244        let small_preallocation = options.small_preallocation;
245        let next_task_id = backing_storage
246            .next_free_task_id()
247            .expect("Failed to get task id");
248        Self {
249            options,
250            start_time: Instant::now(),
251            persisted_task_id_factory: IdFactoryWithReuse::new(
252                next_task_id,
253                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
254            ),
255            transient_task_id_factory: IdFactoryWithReuse::new(
256                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
257                TaskId::MAX,
258            ),
259            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
260            task_cache: FxDashMap::default(),
261            local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
262            storage: Storage::new(shard_amount, small_preallocation),
263            in_progress_operations: AtomicUsize::new(0),
264            snapshot_request: Mutex::new(SnapshotRequest::new()),
265            operations_suspended: Condvar::new(),
266            snapshot_completed: Condvar::new(),
267            last_snapshot: AtomicU64::new(0),
268            stopping: AtomicBool::new(false),
269            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
270            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
271            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
272            #[cfg(feature = "verify_aggregation_graph")]
273            is_idle: AtomicBool::new(false),
274            task_statistics: TaskStatisticsApi::default(),
275            backing_storage,
276            #[cfg(feature = "verify_aggregation_graph")]
277            root_tasks: Default::default(),
278        }
279    }
280
281    fn execute_context<'a>(
282        &'a self,
283        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
284    ) -> impl ExecuteContext<'a> {
285        ExecuteContextImpl::new(self, turbo_tasks)
286    }
287
288    fn suspending_requested(&self) -> bool {
289        self.should_persist()
290            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
291    }
292
293    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
294        #[cold]
295        fn operation_suspend_point_cold<B: BackingStorage>(
296            this: &TurboTasksBackendInner<B>,
297            suspend: impl FnOnce() -> AnyOperation,
298        ) {
299            let operation = Arc::new(suspend());
300            let mut snapshot_request = this.snapshot_request.lock();
301            if snapshot_request.snapshot_requested {
302                snapshot_request
303                    .suspended_operations
304                    .insert(operation.clone().into());
305                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
306                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
307                if value == SNAPSHOT_REQUESTED_BIT {
308                    this.operations_suspended.notify_all();
309                }
310                this.snapshot_completed
311                    .wait_while(&mut snapshot_request, |snapshot_request| {
312                        snapshot_request.snapshot_requested
313                    });
314                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
315                snapshot_request
316                    .suspended_operations
317                    .remove(&operation.into());
318            }
319        }
320
321        if self.suspending_requested() {
322            operation_suspend_point_cold(self, suspend);
323        }
324    }
325
326    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
327        if !self.should_persist() {
328            return OperationGuard { backend: None };
329        }
330        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
331        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
332            let mut snapshot_request = self.snapshot_request.lock();
333            if snapshot_request.snapshot_requested {
334                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
335                if value == SNAPSHOT_REQUESTED_BIT {
336                    self.operations_suspended.notify_all();
337                }
338                self.snapshot_completed
339                    .wait_while(&mut snapshot_request, |snapshot_request| {
340                        snapshot_request.snapshot_requested
341                    });
342                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
343            }
344        }
345        OperationGuard {
346            backend: Some(self),
347        }
348    }
349
350    fn should_persist(&self) -> bool {
351        matches!(
352            self.options.storage_mode,
353            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
354        )
355    }
356
357    fn should_restore(&self) -> bool {
358        self.options.storage_mode.is_some()
359    }
360
361    fn should_track_dependencies(&self) -> bool {
362        self.options.dependency_tracking
363    }
364
365    fn should_track_activeness(&self) -> bool {
366        self.options.active_tracking
367    }
368
369    fn track_cache_hit(&self, task_type: &CachedTaskType) {
370        self.task_statistics
371            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
372    }
373
374    fn track_cache_miss(&self, task_type: &CachedTaskType) {
375        self.task_statistics
376            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
377    }
378
379    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
380    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
381    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
382    /// references for lazy name resolution.
383    fn task_error_to_turbo_tasks_execution_error(
384        &self,
385        error: &TaskError,
386        ctx: &mut impl ExecuteContext<'_>,
387    ) -> TurboTasksExecutionError {
388        match error {
389            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
390            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
391                message: item.message.clone(),
392                source: item
393                    .source
394                    .as_ref()
395                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
396            })),
397            TaskError::LocalTaskContext(local_task_context) => {
398                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
399                    name: local_task_context.name.clone(),
400                    source: local_task_context
401                        .source
402                        .as_ref()
403                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
404                }))
405            }
406            TaskError::TaskChain(chain) => {
407                let task_id = chain.last().unwrap();
408                let error = {
409                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
410                    if let Some(OutputValue::Error(error)) = task.get_output() {
411                        Some(error.clone())
412                    } else {
413                        None
414                    }
415                };
416                let error = error.map_or_else(
417                    || {
418                        // Eventual consistency will cause errors to no longer be available
419                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
420                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
421                                "Error no longer available",
422                            )),
423                            location: None,
424                        }))
425                    },
426                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
427                );
428                let mut current_error = error;
429                for &task_id in chain.iter().rev() {
430                    current_error =
431                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
432                            task_id,
433                            source: Some(current_error),
434                            turbo_tasks: ctx.turbo_tasks(),
435                        }));
436                }
437                current_error
438            }
439        }
440    }
441}
442
443pub(crate) struct OperationGuard<'a, B: BackingStorage> {
444    backend: Option<&'a TurboTasksBackendInner<B>>,
445}
446
447impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
448    fn drop(&mut self) {
449        if let Some(backend) = self.backend {
450            let fetch_sub = backend
451                .in_progress_operations
452                .fetch_sub(1, Ordering::AcqRel);
453            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
454                backend.operations_suspended.notify_all();
455            }
456        }
457    }
458}
459
460/// Intermediate result of step 1 of task execution completion.
461struct TaskExecutionCompletePrepareResult {
462    pub new_children: FxHashSet<TaskId>,
463    pub is_now_immutable: bool,
464    #[cfg(feature = "verify_determinism")]
465    pub no_output_set: bool,
466    pub new_output: Option<OutputValue>,
467    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
468}
469
470// Operations
471impl<B: BackingStorage> TurboTasksBackendInner<B> {
472    fn connect_child(
473        &self,
474        parent_task: Option<TaskId>,
475        child_task: TaskId,
476        task_type: Option<ArcOrOwned<CachedTaskType>>,
477        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
478    ) {
479        operation::ConnectChildOperation::run(
480            parent_task,
481            child_task,
482            task_type,
483            self.execute_context(turbo_tasks),
484        );
485    }
486
487    fn try_read_task_output(
488        self: &Arc<Self>,
489        task_id: TaskId,
490        reader: Option<TaskId>,
491        options: ReadOutputOptions,
492        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
493    ) -> Result<Result<RawVc, EventListener>> {
494        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
495
496        let mut ctx = self.execute_context(turbo_tasks);
497        let need_reader_task = if self.should_track_dependencies()
498            && !matches!(options.tracking, ReadTracking::Untracked)
499            && reader.is_some_and(|reader_id| reader_id != task_id)
500            && let Some(reader_id) = reader
501            && reader_id != task_id
502        {
503            Some(reader_id)
504        } else {
505            None
506        };
507        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
508            // Having a task_pair here is not optimal, but otherwise this would lead to a race
509            // condition. See below.
510            // TODO(sokra): solve that in a more performant way.
511            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
512            (task, Some(reader))
513        } else {
514            (ctx.task(task_id, TaskDataCategory::All), None)
515        };
516
517        fn listen_to_done_event(
518            reader_description: Option<EventDescription>,
519            tracking: ReadTracking,
520            done_event: &Event,
521        ) -> EventListener {
522            done_event.listen_with_note(move || {
523                move || {
524                    if let Some(reader_description) = reader_description.as_ref() {
525                        format!(
526                            "try_read_task_output from {} ({})",
527                            reader_description, tracking
528                        )
529                    } else {
530                        format!("try_read_task_output ({})", tracking)
531                    }
532                }
533            })
534        }
535
536        fn check_in_progress(
537            task: &impl TaskGuard,
538            reader_description: Option<EventDescription>,
539            tracking: ReadTracking,
540        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
541        {
542            match task.get_in_progress() {
543                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
544                    listen_to_done_event(reader_description, tracking, done_event),
545                ))),
546                Some(InProgressState::InProgress(box InProgressStateInner {
547                    done_event, ..
548                })) => Some(Ok(Err(listen_to_done_event(
549                    reader_description,
550                    tracking,
551                    done_event,
552                )))),
553                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
554                    "{} was canceled",
555                    task.get_task_description()
556                ))),
557                None => None,
558            }
559        }
560
561        if matches!(options.consistency, ReadConsistency::Strong) {
562            // Ensure it's an root node
563            loop {
564                let aggregation_number = get_aggregation_number(&task);
565                if is_root_node(aggregation_number) {
566                    break;
567                }
568                drop(task);
569                drop(reader_task);
570                {
571                    let _span = tracing::trace_span!(
572                        "make root node for strongly consistent read",
573                        %task_id
574                    )
575                    .entered();
576                    AggregationUpdateQueue::run(
577                        AggregationUpdateJob::UpdateAggregationNumber {
578                            task_id,
579                            base_aggregation_number: u32::MAX,
580                            distance: None,
581                        },
582                        &mut ctx,
583                    );
584                }
585                (task, reader_task) = if let Some(reader_id) = need_reader_task {
586                    // TODO(sokra): see comment above
587                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
588                    (task, Some(reader))
589                } else {
590                    (ctx.task(task_id, TaskDataCategory::All), None)
591                }
592            }
593
594            let is_dirty = task.is_dirty();
595
596            // Check the dirty count of the root node
597            let has_dirty_containers = task.has_dirty_containers();
598            if has_dirty_containers || is_dirty.is_some() {
599                let activeness = task.get_activeness_mut();
600                let mut task_ids_to_schedule: Vec<_> = Vec::new();
601                // When there are dirty task, subscribe to the all_clean_event
602                let activeness = if let Some(activeness) = activeness {
603                    // This makes sure all tasks stay active and this task won't stale.
604                    // active_until_clean is automatically removed when this
605                    // task is clean.
606                    activeness.set_active_until_clean();
607                    activeness
608                } else {
609                    // If we don't have a root state, add one. This also makes sure all tasks stay
610                    // active and this task won't stale. active_until_clean
611                    // is automatically removed when this task is clean.
612                    if ctx.should_track_activeness() {
613                        // A newly added Activeness need to make sure to schedule the tasks
614                        task_ids_to_schedule = task.dirty_containers().collect();
615                        task_ids_to_schedule.push(task_id);
616                    }
617                    let activeness =
618                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
619                    activeness.set_active_until_clean();
620                    activeness
621                };
622                let listener = activeness.all_clean_event.listen_with_note(move || {
623                    let this = self.clone();
624                    let tt = turbo_tasks.pin();
625                    move || {
626                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
627                        let mut ctx = this.execute_context(tt);
628                        let mut visited = FxHashSet::default();
629                        fn indent(s: &str) -> String {
630                            s.split_inclusive('\n')
631                                .flat_map(|line: &str| ["  ", line].into_iter())
632                                .collect::<String>()
633                        }
634                        fn get_info(
635                            ctx: &mut impl ExecuteContext<'_>,
636                            task_id: TaskId,
637                            parent_and_count: Option<(TaskId, i32)>,
638                            visited: &mut FxHashSet<TaskId>,
639                        ) -> String {
640                            let task = ctx.task(task_id, TaskDataCategory::All);
641                            let is_dirty = task.is_dirty();
642                            let in_progress =
643                                task.get_in_progress()
644                                    .map_or("not in progress", |p| match p {
645                                        InProgressState::InProgress(_) => "in progress",
646                                        InProgressState::Scheduled { .. } => "scheduled",
647                                        InProgressState::Canceled => "canceled",
648                                    });
649                            let activeness = task.get_activeness().map_or_else(
650                                || "not active".to_string(),
651                                |activeness| format!("{activeness:?}"),
652                            );
653                            let aggregation_number = get_aggregation_number(&task);
654                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
655                            {
656                                let uppers = get_uppers(&task);
657                                !uppers.contains(&parent_task_id)
658                            } else {
659                                false
660                            };
661
662                            // Check the dirty count of the root node
663                            let has_dirty_containers = task.has_dirty_containers();
664
665                            let task_description = task.get_task_description();
666                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
667                                format!(", dirty({parent_priority})")
668                            } else {
669                                String::new()
670                            };
671                            let has_dirty_containers_label = if has_dirty_containers {
672                                ", dirty containers"
673                            } else {
674                                ""
675                            };
676                            let count = if let Some((_, count)) = parent_and_count {
677                                format!(" {count}")
678                            } else {
679                                String::new()
680                            };
681                            let mut info = format!(
682                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
683                                 {in_progress}, \
684                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
685                            );
686                            let children: Vec<_> = task.dirty_containers_with_count().collect();
687                            drop(task);
688
689                            if missing_upper {
690                                info.push_str("\n  ERROR: missing upper connection");
691                            }
692
693                            if has_dirty_containers || !children.is_empty() {
694                                writeln!(info, "\n  dirty tasks:").unwrap();
695
696                                for (child_task_id, count) in children {
697                                    let task_description = ctx
698                                        .task(child_task_id, TaskDataCategory::Data)
699                                        .get_task_description();
700                                    if visited.insert(child_task_id) {
701                                        let child_info = get_info(
702                                            ctx,
703                                            child_task_id,
704                                            Some((task_id, count)),
705                                            visited,
706                                        );
707                                        info.push_str(&indent(&child_info));
708                                        if !info.ends_with('\n') {
709                                            info.push('\n');
710                                        }
711                                    } else {
712                                        writeln!(
713                                            info,
714                                            "  {child_task_id} {task_description} {count} \
715                                             (already visited)"
716                                        )
717                                        .unwrap();
718                                    }
719                                }
720                            }
721                            info
722                        }
723                        let info = get_info(&mut ctx, task_id, None, &mut visited);
724                        format!(
725                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
726                        )
727                    }
728                });
729                drop(reader_task);
730                drop(task);
731                if !task_ids_to_schedule.is_empty() {
732                    let mut queue = AggregationUpdateQueue::new();
733                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
734                    queue.execute(&mut ctx);
735                }
736
737                return Ok(Err(listener));
738            }
739        }
740
741        let reader_description = reader_task
742            .as_ref()
743            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
744        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
745        {
746            return value;
747        }
748
749        if let Some(output) = task.get_output() {
750            let result = match output {
751                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
752                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
753                OutputValue::Error(error) => Err(error.clone()),
754            };
755            if let Some(mut reader_task) = reader_task.take()
756                && options.tracking.should_track(result.is_err())
757                && (!task.immutable() || cfg!(feature = "verify_immutable"))
758            {
759                #[cfg(feature = "trace_task_output_dependencies")]
760                let _span = tracing::trace_span!(
761                    "add output dependency",
762                    task = %task_id,
763                    dependent_task = ?reader
764                )
765                .entered();
766                let mut queue = LeafDistanceUpdateQueue::new();
767                let reader = reader.unwrap();
768                if task.add_output_dependent(reader) {
769                    // Ensure that dependent leaf distance is strictly monotonic increasing
770                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
771                    let reader_leaf_distance =
772                        reader_task.get_leaf_distance().copied().unwrap_or_default();
773                    if reader_leaf_distance.distance <= leaf_distance.distance {
774                        queue.push(
775                            reader,
776                            leaf_distance.distance,
777                            leaf_distance.max_distance_in_buffer,
778                        );
779                    }
780                }
781
782                drop(task);
783
784                // Note: We use `task_pair` earlier to lock the task and its reader at the same
785                // time. If we didn't and just locked the reader here, an invalidation could occur
786                // between grabbing the locks. If that happened, and if the task is "outdated" or
787                // doesn't have the dependency edge yet, the invalidation would be lost.
788
789                if !reader_task.remove_outdated_output_dependencies(&task_id) {
790                    let _ = reader_task.add_output_dependencies(task_id);
791                }
792                drop(reader_task);
793
794                queue.execute(&mut ctx);
795            } else {
796                drop(task);
797            }
798
799            return result.map_err(|error| {
800                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
801                    .with_task_context(task_id, turbo_tasks.pin())
802                    .into()
803            });
804        }
805        drop(reader_task);
806
807        let note = EventDescription::new(|| {
808            move || {
809                if let Some(reader) = reader_description.as_ref() {
810                    format!("try_read_task_output (recompute) from {reader}",)
811                } else {
812                    "try_read_task_output (recompute, untracked)".to_string()
813                }
814            }
815        });
816
817        // Output doesn't exist. We need to schedule the task to compute it.
818        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
819            TaskExecutionReason::OutputNotAvailable,
820            EventDescription::new(|| task.get_task_desc_fn()),
821            note,
822        );
823
824        // It's not possible that the task is InProgress at this point. If it is InProgress {
825        // done: true } it must have Output and would early return.
826        let old = task.set_in_progress(in_progress_state);
827        debug_assert!(old.is_none(), "InProgress already exists");
828        ctx.schedule_task(task, TaskPriority::Initial);
829
830        Ok(Err(listener))
831    }
832
833    fn try_read_task_cell(
834        &self,
835        task_id: TaskId,
836        reader: Option<TaskId>,
837        cell: CellId,
838        options: ReadCellOptions,
839        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
840    ) -> Result<Result<TypedCellContent, EventListener>> {
841        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
842
843        fn add_cell_dependency(
844            task_id: TaskId,
845            mut task: impl TaskGuard,
846            reader: Option<TaskId>,
847            reader_task: Option<impl TaskGuard>,
848            cell: CellId,
849            key: Option<u64>,
850        ) {
851            if let Some(mut reader_task) = reader_task
852                && (!task.immutable() || cfg!(feature = "verify_immutable"))
853            {
854                let reader = reader.unwrap();
855                let _ = task.add_cell_dependents((cell, key, reader));
856                drop(task);
857
858                // Note: We use `task_pair` earlier to lock the task and its reader at the same
859                // time. If we didn't and just locked the reader here, an invalidation could occur
860                // between grabbing the locks. If that happened, and if the task is "outdated" or
861                // doesn't have the dependency edge yet, the invalidation would be lost.
862
863                let target = CellRef {
864                    task: task_id,
865                    cell,
866                };
867                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
868                    let _ = reader_task.add_cell_dependencies((target, key));
869                }
870                drop(reader_task);
871            }
872        }
873
874        let ReadCellOptions {
875            is_serializable_cell_content,
876            tracking,
877            final_read_hint,
878        } = options;
879
880        let mut ctx = self.execute_context(turbo_tasks);
881        let (mut task, reader_task) = if self.should_track_dependencies()
882            && !matches!(tracking, ReadCellTracking::Untracked)
883            && let Some(reader_id) = reader
884            && reader_id != task_id
885        {
886            // Having a task_pair here is not optimal, but otherwise this would lead to a race
887            // condition. See below.
888            // TODO(sokra): solve that in a more performant way.
889            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
890            (task, Some(reader))
891        } else {
892            (ctx.task(task_id, TaskDataCategory::All), None)
893        };
894
895        let content = if final_read_hint {
896            task.remove_cell_data(is_serializable_cell_content, cell)
897        } else {
898            task.get_cell_data(is_serializable_cell_content, cell)
899        };
900        if let Some(content) = content {
901            if tracking.should_track(false) {
902                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
903            }
904            return Ok(Ok(TypedCellContent(
905                cell.type_id,
906                CellContent(Some(content.reference)),
907            )));
908        }
909
910        let in_progress = task.get_in_progress();
911        if matches!(
912            in_progress,
913            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
914        ) {
915            return Ok(Err(self
916                .listen_to_cell(&mut task, task_id, &reader_task, cell)
917                .0));
918        }
919        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
920
921        // Check cell index range (cell might not exist at all)
922        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
923        let Some(max_id) = max_id else {
924            let task_desc = task.get_task_description();
925            if tracking.should_track(true) {
926                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
927            }
928            bail!(
929                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
930            );
931        };
932        if cell.index >= max_id {
933            let task_desc = task.get_task_description();
934            if tracking.should_track(true) {
935                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
936            }
937            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
938        }
939
940        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
941        // task the get the cell content.
942
943        // Listen to the cell and potentially schedule the task
944        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
945        drop(reader_task);
946        if !new_listener {
947            return Ok(Err(listener));
948        }
949
950        let _span = tracing::trace_span!(
951            "recomputation",
952            cell_type = get_value_type(cell.type_id).global_name,
953            cell_index = cell.index
954        )
955        .entered();
956
957        // Schedule the task, if not already scheduled
958        if is_cancelled {
959            bail!("{} was canceled", task.get_task_description());
960        }
961
962        let _ = task.add_scheduled(
963            TaskExecutionReason::CellNotAvailable,
964            EventDescription::new(|| task.get_task_desc_fn()),
965        );
966        ctx.schedule_task(task, TaskPriority::Initial);
967
968        Ok(Err(listener))
969    }
970
971    fn listen_to_cell(
972        &self,
973        task: &mut impl TaskGuard,
974        task_id: TaskId,
975        reader_task: &Option<impl TaskGuard>,
976        cell: CellId,
977    ) -> (EventListener, bool) {
978        let note = || {
979            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
980            move || {
981                if let Some(reader_desc) = reader_desc.as_ref() {
982                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
983                } else {
984                    "try_read_task_cell (in progress, untracked)".to_string()
985                }
986            }
987        };
988        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
989            // Someone else is already computing the cell
990            let listener = in_progress.event.listen_with_note(note);
991            return (listener, false);
992        }
993        let in_progress = InProgressCellState::new(task_id, cell);
994        let listener = in_progress.event.listen_with_note(note);
995        let old = task.insert_in_progress_cells(cell, in_progress);
996        debug_assert!(old.is_none(), "InProgressCell already exists");
997        (listener, true)
998    }
999
1000    fn snapshot_and_persist(
1001        &self,
1002        parent_span: Option<tracing::Id>,
1003        reason: &str,
1004        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1005    ) -> Option<(Instant, bool)> {
1006        let snapshot_span =
1007            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1008                .entered();
1009        let start = Instant::now();
1010        // SystemTime for wall-clock timestamps in trace events (milliseconds
1011        // since epoch). Instant is monotonic but has no defined epoch, so it
1012        // can't be used for cross-process trace correlation.
1013        let wall_start = SystemTime::now();
1014        debug_assert!(self.should_persist());
1015
1016        let suspended_operations;
1017        {
1018            let _span = tracing::info_span!("blocking").entered();
1019            let mut snapshot_request = self.snapshot_request.lock();
1020            snapshot_request.snapshot_requested = true;
1021            let active_operations = self
1022                .in_progress_operations
1023                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1024            if active_operations != 0 {
1025                self.operations_suspended
1026                    .wait_while(&mut snapshot_request, |_| {
1027                        self.in_progress_operations.load(Ordering::Relaxed)
1028                            != SNAPSHOT_REQUESTED_BIT
1029                    });
1030            }
1031            suspended_operations = snapshot_request
1032                .suspended_operations
1033                .iter()
1034                .map(|op| op.arc().clone())
1035                .collect::<Vec<_>>();
1036        }
1037        self.storage.start_snapshot();
1038        let mut persisted_task_cache_log = self
1039            .persisted_task_cache_log
1040            .as_ref()
1041            .map(|l| l.take(|i| i))
1042            .unwrap_or_default();
1043        let mut snapshot_request = self.snapshot_request.lock();
1044        snapshot_request.snapshot_requested = false;
1045        self.in_progress_operations
1046            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1047        self.snapshot_completed.notify_all();
1048        let snapshot_time = Instant::now();
1049        drop(snapshot_request);
1050
1051        #[cfg(feature = "print_cache_item_size")]
1052        #[derive(Default)]
1053        struct TaskCacheStats {
1054            data: usize,
1055            data_compressed: usize,
1056            data_count: usize,
1057            meta: usize,
1058            meta_compressed: usize,
1059            meta_count: usize,
1060            upper_count: usize,
1061            collectibles_count: usize,
1062            aggregated_collectibles_count: usize,
1063            children_count: usize,
1064            followers_count: usize,
1065            collectibles_dependents_count: usize,
1066            aggregated_dirty_containers_count: usize,
1067            output_size: usize,
1068        }
1069        #[cfg(feature = "print_cache_item_size")]
1070        impl TaskCacheStats {
1071            fn compressed_size(data: &[u8]) -> Result<usize> {
1072                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1073                    data,
1074                    &mut Vec::new(),
1075                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1076                )?)
1077            }
1078
1079            fn add_data(&mut self, data: &[u8]) {
1080                self.data += data.len();
1081                self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1082                self.data_count += 1;
1083            }
1084
1085            fn add_meta(&mut self, data: &[u8]) {
1086                self.meta += data.len();
1087                self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1088                self.meta_count += 1;
1089            }
1090
1091            fn add_counts(&mut self, storage: &TaskStorage) {
1092                let counts = storage.meta_counts();
1093                self.upper_count += counts.upper;
1094                self.collectibles_count += counts.collectibles;
1095                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1096                self.children_count += counts.children;
1097                self.followers_count += counts.followers;
1098                self.collectibles_dependents_count += counts.collectibles_dependents;
1099                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1100                if let Some(output) = storage.get_output() {
1101                    use turbo_bincode::turbo_bincode_encode;
1102
1103                    self.output_size += turbo_bincode_encode(&output)
1104                        .map(|data| data.len())
1105                        .unwrap_or(0);
1106                }
1107            }
1108        }
1109        #[cfg(feature = "print_cache_item_size")]
1110        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1111            Mutex::new(FxHashMap::default());
1112
1113        let preprocess = |task_id: TaskId, inner: &TaskStorage| {
1114            if task_id.is_transient() {
1115                return (None, None);
1116            }
1117
1118            let meta_restored = inner.flags.meta_restored();
1119            let data_restored = inner.flags.data_restored();
1120
1121            // Encode meta/data directly from TaskStorage
1122            let meta = meta_restored.then(|| inner.clone_meta_snapshot());
1123            let data = data_restored.then(|| inner.clone_data_snapshot());
1124
1125            (meta, data)
1126        };
1127        let process = |task_id: TaskId,
1128                       (meta, data): (Option<TaskStorage>, Option<TaskStorage>),
1129                       buffer: &mut TurboBincodeBuffer| {
1130            #[cfg(feature = "print_cache_item_size")]
1131            if let Some(ref m) = meta {
1132                task_cache_stats
1133                    .lock()
1134                    .entry(self.get_task_name(task_id, turbo_tasks))
1135                    .or_default()
1136                    .add_counts(m);
1137            }
1138            (
1139                task_id,
1140                meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)),
1141                data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)),
1142            )
1143        };
1144        let process_snapshot =
1145            |task_id: TaskId, inner: Box<TaskStorage>, buffer: &mut TurboBincodeBuffer| {
1146                if task_id.is_transient() {
1147                    return (task_id, None, None);
1148                }
1149
1150                #[cfg(feature = "print_cache_item_size")]
1151                if inner.flags.meta_modified() {
1152                    task_cache_stats
1153                        .lock()
1154                        .entry(self.get_task_name(task_id, turbo_tasks))
1155                        .or_default()
1156                        .add_counts(&inner);
1157                }
1158
1159                // Encode meta/data directly from TaskStorage snapshot
1160                (
1161                    task_id,
1162                    inner.flags.meta_modified().then(|| {
1163                        encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer)
1164                    }),
1165                    inner.flags.data_modified().then(|| {
1166                        encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer)
1167                    }),
1168                )
1169            };
1170
1171        let snapshot = self
1172            .storage
1173            .take_snapshot(&preprocess, &process, &process_snapshot);
1174
1175        let task_snapshots = snapshot
1176            .into_iter()
1177            .filter_map(|iter| {
1178                let mut iter = iter
1179                    .filter_map(
1180                        |(task_id, meta, data): (
1181                            _,
1182                            Option<Result<SmallVec<_>>>,
1183                            Option<Result<SmallVec<_>>>,
1184                        )| {
1185                            let meta = match meta {
1186                                Some(Ok(meta)) => {
1187                                    #[cfg(feature = "print_cache_item_size")]
1188                                    task_cache_stats
1189                                        .lock()
1190                                        .entry(self.get_task_name(task_id, turbo_tasks))
1191                                        .or_default()
1192                                        .add_meta(&meta);
1193                                    Some(meta)
1194                                }
1195                                None => None,
1196                                Some(Err(err)) => {
1197                                    println!(
1198                                        "Serializing task {} failed (meta): {:?}",
1199                                        self.debug_get_task_description(task_id),
1200                                        err
1201                                    );
1202                                    None
1203                                }
1204                            };
1205                            let data = match data {
1206                                Some(Ok(data)) => {
1207                                    #[cfg(feature = "print_cache_item_size")]
1208                                    task_cache_stats
1209                                        .lock()
1210                                        .entry(self.get_task_name(task_id, turbo_tasks))
1211                                        .or_default()
1212                                        .add_data(&data);
1213                                    Some(data)
1214                                }
1215                                None => None,
1216                                Some(Err(err)) => {
1217                                    println!(
1218                                        "Serializing task {} failed (data): {:?}",
1219                                        self.debug_get_task_description(task_id),
1220                                        err
1221                                    );
1222                                    None
1223                                }
1224                            };
1225                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1226                        },
1227                    )
1228                    .peekable();
1229                iter.peek().is_some().then_some(iter)
1230            })
1231            .collect::<Vec<_>>();
1232
1233        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1234
1235        drop(snapshot_span);
1236        let snapshot_duration = start.elapsed();
1237        let task_count = task_snapshots.len();
1238
1239        if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1240            return Some((snapshot_time, false));
1241        }
1242
1243        let persist_start = Instant::now();
1244        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1245        {
1246            if let Err(err) = self.backing_storage.save_snapshot(
1247                suspended_operations,
1248                persisted_task_cache_log,
1249                task_snapshots,
1250            ) {
1251                eprintln!("Persisting failed: {err:?}");
1252                return None;
1253            }
1254            #[cfg(feature = "print_cache_item_size")]
1255            {
1256                let mut task_cache_stats = task_cache_stats
1257                    .into_inner()
1258                    .into_iter()
1259                    .collect::<Vec<_>>();
1260                if !task_cache_stats.is_empty() {
1261                    use turbo_tasks::util::FormatBytes;
1262
1263                    use crate::utils::markdown_table::print_markdown_table;
1264
1265                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1266                        (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1267                            .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1268                    });
1269                    println!(
1270                        "Task cache stats: {} ({})",
1271                        FormatBytes(
1272                            task_cache_stats
1273                                .iter()
1274                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1275                                .sum::<usize>()
1276                        ),
1277                        FormatBytes(
1278                            task_cache_stats
1279                                .iter()
1280                                .map(|(_, s)| s.data + s.meta)
1281                                .sum::<usize>()
1282                        )
1283                    );
1284
1285                    print_markdown_table(
1286                        [
1287                            "Task",
1288                            " Total Size",
1289                            " Data Size",
1290                            " Data Count x Avg",
1291                            " Data Count x Avg",
1292                            " Meta Size",
1293                            " Meta Count x Avg",
1294                            " Meta Count x Avg",
1295                            " Uppers",
1296                            " Coll",
1297                            " Agg Coll",
1298                            " Children",
1299                            " Followers",
1300                            " Coll Deps",
1301                            " Agg Dirty",
1302                            " Output Size",
1303                        ],
1304                        task_cache_stats.iter(),
1305                        |(task_desc, stats)| {
1306                            [
1307                                task_desc.to_string(),
1308                                format!(
1309                                    " {} ({})",
1310                                    FormatBytes(stats.data_compressed + stats.meta_compressed),
1311                                    FormatBytes(stats.data + stats.meta)
1312                                ),
1313                                format!(
1314                                    " {} ({})",
1315                                    FormatBytes(stats.data_compressed),
1316                                    FormatBytes(stats.data)
1317                                ),
1318                                format!(" {} x", stats.data_count,),
1319                                format!(
1320                                    "{} ({})",
1321                                    FormatBytes(
1322                                        stats
1323                                            .data_compressed
1324                                            .checked_div(stats.data_count)
1325                                            .unwrap_or(0)
1326                                    ),
1327                                    FormatBytes(
1328                                        stats.data.checked_div(stats.data_count).unwrap_or(0)
1329                                    ),
1330                                ),
1331                                format!(
1332                                    " {} ({})",
1333                                    FormatBytes(stats.meta_compressed),
1334                                    FormatBytes(stats.meta)
1335                                ),
1336                                format!(" {} x", stats.meta_count,),
1337                                format!(
1338                                    "{} ({})",
1339                                    FormatBytes(
1340                                        stats
1341                                            .meta_compressed
1342                                            .checked_div(stats.meta_count)
1343                                            .unwrap_or(0)
1344                                    ),
1345                                    FormatBytes(
1346                                        stats.meta.checked_div(stats.meta_count).unwrap_or(0)
1347                                    ),
1348                                ),
1349                                format!(" {}", stats.upper_count),
1350                                format!(" {}", stats.collectibles_count),
1351                                format!(" {}", stats.aggregated_collectibles_count),
1352                                format!(" {}", stats.children_count),
1353                                format!(" {}", stats.followers_count),
1354                                format!(" {}", stats.collectibles_dependents_count),
1355                                format!(" {}", stats.aggregated_dirty_containers_count),
1356                                format!(" {}", FormatBytes(stats.output_size)),
1357                            ]
1358                        },
1359                    );
1360                }
1361            }
1362        }
1363
1364        let elapsed = start.elapsed();
1365        let persist_duration = persist_start.elapsed();
1366        // avoid spamming the event queue with information about fast operations
1367        if elapsed > Duration::from_secs(10) {
1368            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1369                "Finished writing to filesystem cache".to_string(),
1370                elapsed,
1371            )));
1372        }
1373
1374        let wall_start_ms = wall_start
1375            .duration_since(SystemTime::UNIX_EPOCH)
1376            .unwrap_or_default()
1377            // as_millis_f64 is not stable yet
1378            .as_secs_f64()
1379            * 1000.0;
1380        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1381        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1382            "turbopack-persistence",
1383            wall_start_ms,
1384            wall_end_ms,
1385            vec![
1386                ("reason", serde_json::Value::from(reason)),
1387                (
1388                    "snapshot_duration_ms",
1389                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1390                ),
1391                (
1392                    "persist_duration_ms",
1393                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1394                ),
1395                ("task_count", serde_json::Value::from(task_count)),
1396            ],
1397        )));
1398
1399        Some((snapshot_time, true))
1400    }
1401
1402    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1403        if self.should_restore() {
1404            // Continue all uncompleted operations
1405            // They can't be interrupted by a snapshot since the snapshotting job has not been
1406            // scheduled yet.
1407            let uncompleted_operations = self
1408                .backing_storage
1409                .uncompleted_operations()
1410                .expect("Failed to get uncompleted operations");
1411            if !uncompleted_operations.is_empty() {
1412                let mut ctx = self.execute_context(turbo_tasks);
1413                for op in uncompleted_operations {
1414                    op.execute(&mut ctx);
1415                }
1416            }
1417        }
1418
1419        // Only when it should write regularly to the storage, we schedule the initial snapshot
1420        // job.
1421        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1422            // Schedule the snapshot job
1423            let _span = trace_span!("persisting background job").entered();
1424            let _span = tracing::info_span!("thread").entered();
1425            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1426        }
1427    }
1428
1429    fn stopping(&self) {
1430        self.stopping.store(true, Ordering::Release);
1431        self.stopping_event.notify(usize::MAX);
1432    }
1433
1434    #[allow(unused_variables)]
1435    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1436        #[cfg(feature = "verify_aggregation_graph")]
1437        {
1438            self.is_idle.store(false, Ordering::Release);
1439            self.verify_aggregation_graph(turbo_tasks, false);
1440        }
1441        if self.should_persist() {
1442            self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1443        }
1444        drop_contents(&self.task_cache);
1445        self.storage.drop_contents();
1446        if let Err(err) = self.backing_storage.shutdown() {
1447            println!("Shutting down failed: {err}");
1448        }
1449    }
1450
1451    #[allow(unused_variables)]
1452    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1453        self.idle_start_event.notify(usize::MAX);
1454
1455        #[cfg(feature = "verify_aggregation_graph")]
1456        {
1457            use tokio::select;
1458
1459            self.is_idle.store(true, Ordering::Release);
1460            let this = self.clone();
1461            let turbo_tasks = turbo_tasks.pin();
1462            tokio::task::spawn(async move {
1463                select! {
1464                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1465                        // do nothing
1466                    }
1467                    _ = this.idle_end_event.listen() => {
1468                        return;
1469                    }
1470                }
1471                if !this.is_idle.load(Ordering::Relaxed) {
1472                    return;
1473                }
1474                this.verify_aggregation_graph(&*turbo_tasks, true);
1475            });
1476        }
1477    }
1478
1479    fn idle_end(&self) {
1480        #[cfg(feature = "verify_aggregation_graph")]
1481        self.is_idle.store(false, Ordering::Release);
1482        self.idle_end_event.notify(usize::MAX);
1483    }
1484
1485    fn get_or_create_persistent_task(
1486        &self,
1487        task_type: CachedTaskType,
1488        parent_task: Option<TaskId>,
1489        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1490    ) -> TaskId {
1491        let is_root = task_type.native_fn.is_root;
1492
1493        // First check if the task exists in the cache which only uses a read lock
1494        if let Some(task_id) = self.task_cache.get(&task_type) {
1495            let task_id = *task_id;
1496            self.track_cache_hit(&task_type);
1497            self.connect_child(
1498                parent_task,
1499                task_id,
1500                Some(ArcOrOwned::Owned(task_type)),
1501                turbo_tasks,
1502            );
1503            return task_id;
1504        }
1505
1506        // Create a single ExecuteContext for both lookup and connect_child
1507        let mut ctx = self.execute_context(turbo_tasks);
1508
1509        let mut is_new = false;
1510        let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1511            // Task exists in backing storage
1512            // So we only need to insert it into the in-memory cache
1513            self.track_cache_hit(&task_type);
1514            let task_type = match raw_entry(&self.task_cache, &task_type) {
1515                RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1516                RawEntry::Vacant(e) => {
1517                    let task_type = Arc::new(task_type);
1518                    e.insert(task_type.clone(), task_id);
1519                    ArcOrOwned::Arc(task_type)
1520                }
1521            };
1522            (task_id, task_type)
1523        } else {
1524            // Task doesn't exist in memory cache or backing storage
1525            // So we might need to create a new task
1526            let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1527                RawEntry::Occupied(e) => {
1528                    // Another thread beat us to creating this task - use their task_id.
1529                    // They will handle logging to persisted_task_cache_log.
1530                    let task_id = *e.get();
1531                    drop(e);
1532                    self.track_cache_hit(&task_type);
1533                    (task_id, ArcOrOwned::Owned(task_type))
1534                }
1535                RawEntry::Vacant(e) => {
1536                    // We're creating a new task.
1537                    let task_type = Arc::new(task_type);
1538                    let task_id = self.persisted_task_id_factory.get();
1539                    e.insert(task_type.clone(), task_id);
1540                    // insert() consumes e, releasing the lock
1541                    self.track_cache_miss(&task_type);
1542                    is_new = true;
1543                    if let Some(log) = &self.persisted_task_cache_log {
1544                        log.lock(task_id).push((task_type.clone(), task_id));
1545                    }
1546                    (task_id, ArcOrOwned::Arc(task_type))
1547                }
1548            };
1549            (task_id, task_type)
1550        };
1551        if is_new && is_root {
1552            AggregationUpdateQueue::run(
1553                AggregationUpdateJob::UpdateAggregationNumber {
1554                    task_id,
1555                    base_aggregation_number: u32::MAX,
1556                    distance: None,
1557                },
1558                &mut ctx,
1559            );
1560        }
1561        // Reuse the same ExecuteContext for connect_child
1562        operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1563
1564        task_id
1565    }
1566
1567    fn get_or_create_transient_task(
1568        &self,
1569        task_type: CachedTaskType,
1570        parent_task: Option<TaskId>,
1571        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1572    ) -> TaskId {
1573        let is_root = task_type.native_fn.is_root;
1574
1575        if let Some(parent_task) = parent_task
1576            && !parent_task.is_transient()
1577        {
1578            self.panic_persistent_calling_transient(
1579                self.debug_get_task_description(parent_task),
1580                Some(&task_type),
1581                /* cell_id */ None,
1582            );
1583        }
1584        // First check if the task exists in the cache which only uses a read lock
1585        if let Some(task_id) = self.task_cache.get(&task_type) {
1586            let task_id = *task_id;
1587            self.track_cache_hit(&task_type);
1588            self.connect_child(
1589                parent_task,
1590                task_id,
1591                Some(ArcOrOwned::Owned(task_type)),
1592                turbo_tasks,
1593            );
1594            return task_id;
1595        }
1596        // If not, acquire a write lock and double check / insert
1597        match raw_entry(&self.task_cache, &task_type) {
1598            RawEntry::Occupied(e) => {
1599                let task_id = *e.get();
1600                drop(e);
1601                self.track_cache_hit(&task_type);
1602                self.connect_child(
1603                    parent_task,
1604                    task_id,
1605                    Some(ArcOrOwned::Owned(task_type)),
1606                    turbo_tasks,
1607                );
1608                task_id
1609            }
1610            RawEntry::Vacant(e) => {
1611                let task_type = Arc::new(task_type);
1612                let task_id = self.transient_task_id_factory.get();
1613                e.insert(task_type.clone(), task_id);
1614                self.track_cache_miss(&task_type);
1615
1616                if is_root {
1617                    let mut ctx = self.execute_context(turbo_tasks);
1618                    AggregationUpdateQueue::run(
1619                        AggregationUpdateJob::UpdateAggregationNumber {
1620                            task_id,
1621                            base_aggregation_number: u32::MAX,
1622                            distance: None,
1623                        },
1624                        &mut ctx,
1625                    );
1626                }
1627
1628                self.connect_child(
1629                    parent_task,
1630                    task_id,
1631                    Some(ArcOrOwned::Arc(task_type)),
1632                    turbo_tasks,
1633                );
1634
1635                task_id
1636            }
1637        }
1638    }
1639
1640    /// Generate an object that implements [`fmt::Display`] explaining why the given
1641    /// [`CachedTaskType`] is transient.
1642    fn debug_trace_transient_task(
1643        &self,
1644        task_type: &CachedTaskType,
1645        cell_id: Option<CellId>,
1646    ) -> DebugTraceTransientTask {
1647        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1648        // from tracing the same task many times, so use a visited_set
1649        fn inner_id(
1650            backend: &TurboTasksBackendInner<impl BackingStorage>,
1651            task_id: TaskId,
1652            cell_type_id: Option<ValueTypeId>,
1653            visited_set: &mut FxHashSet<TaskId>,
1654        ) -> DebugTraceTransientTask {
1655            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1656                if visited_set.contains(&task_id) {
1657                    let task_name = task_type.get_name();
1658                    DebugTraceTransientTask::Collapsed {
1659                        task_name,
1660                        cell_type_id,
1661                    }
1662                } else {
1663                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1664                }
1665            } else {
1666                DebugTraceTransientTask::Uncached { cell_type_id }
1667            }
1668        }
1669        fn inner_cached(
1670            backend: &TurboTasksBackendInner<impl BackingStorage>,
1671            task_type: &CachedTaskType,
1672            cell_type_id: Option<ValueTypeId>,
1673            visited_set: &mut FxHashSet<TaskId>,
1674        ) -> DebugTraceTransientTask {
1675            let task_name = task_type.get_name();
1676
1677            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1678                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1679                    // `task_id` should never be `None` at this point, as that would imply a
1680                    // non-local task is returning a local `Vc`...
1681                    // Just ignore if it happens, as we're likely already panicking.
1682                    return None;
1683                };
1684                if task_id.is_transient() {
1685                    Some(Box::new(inner_id(
1686                        backend,
1687                        task_id,
1688                        cause_self_raw_vc.try_get_type_id(),
1689                        visited_set,
1690                    )))
1691                } else {
1692                    None
1693                }
1694            });
1695            let cause_args = task_type
1696                .arg
1697                .get_raw_vcs()
1698                .into_iter()
1699                .filter_map(|raw_vc| {
1700                    let Some(task_id) = raw_vc.try_get_task_id() else {
1701                        // `task_id` should never be `None` (see comment above)
1702                        return None;
1703                    };
1704                    if !task_id.is_transient() {
1705                        return None;
1706                    }
1707                    Some((task_id, raw_vc.try_get_type_id()))
1708                })
1709                .collect::<IndexSet<_>>() // dedupe
1710                .into_iter()
1711                .map(|(task_id, cell_type_id)| {
1712                    inner_id(backend, task_id, cell_type_id, visited_set)
1713                })
1714                .collect();
1715
1716            DebugTraceTransientTask::Cached {
1717                task_name,
1718                cell_type_id,
1719                cause_self,
1720                cause_args,
1721            }
1722        }
1723        inner_cached(
1724            self,
1725            task_type,
1726            cell_id.map(|c| c.type_id),
1727            &mut FxHashSet::default(),
1728        )
1729    }
1730
1731    fn invalidate_task(
1732        &self,
1733        task_id: TaskId,
1734        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1735    ) {
1736        if !self.should_track_dependencies() {
1737            panic!("Dependency tracking is disabled so invalidation is not allowed");
1738        }
1739        operation::InvalidateOperation::run(
1740            smallvec![task_id],
1741            #[cfg(feature = "trace_task_dirty")]
1742            TaskDirtyCause::Invalidator,
1743            self.execute_context(turbo_tasks),
1744        );
1745    }
1746
1747    fn invalidate_tasks(
1748        &self,
1749        tasks: &[TaskId],
1750        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1751    ) {
1752        if !self.should_track_dependencies() {
1753            panic!("Dependency tracking is disabled so invalidation is not allowed");
1754        }
1755        operation::InvalidateOperation::run(
1756            tasks.iter().copied().collect(),
1757            #[cfg(feature = "trace_task_dirty")]
1758            TaskDirtyCause::Unknown,
1759            self.execute_context(turbo_tasks),
1760        );
1761    }
1762
1763    fn invalidate_tasks_set(
1764        &self,
1765        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1766        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1767    ) {
1768        if !self.should_track_dependencies() {
1769            panic!("Dependency tracking is disabled so invalidation is not allowed");
1770        }
1771        operation::InvalidateOperation::run(
1772            tasks.iter().copied().collect(),
1773            #[cfg(feature = "trace_task_dirty")]
1774            TaskDirtyCause::Unknown,
1775            self.execute_context(turbo_tasks),
1776        );
1777    }
1778
1779    fn invalidate_serialization(
1780        &self,
1781        task_id: TaskId,
1782        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1783    ) {
1784        if task_id.is_transient() {
1785            return;
1786        }
1787        let mut ctx = self.execute_context(turbo_tasks);
1788        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1789        task.invalidate_serialization();
1790    }
1791
1792    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1793        let task = self.storage.access_mut(task_id);
1794        if let Some(value) = task.get_persistent_task_type() {
1795            format!("{task_id:?} {}", value)
1796        } else if let Some(value) = task.get_transient_task_type() {
1797            format!("{task_id:?} {}", value)
1798        } else {
1799            format!("{task_id:?} unknown")
1800        }
1801    }
1802
1803    fn get_task_name(
1804        &self,
1805        task_id: TaskId,
1806        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1807    ) -> String {
1808        let mut ctx = self.execute_context(turbo_tasks);
1809        let task = ctx.task(task_id, TaskDataCategory::Data);
1810        if let Some(value) = task.get_persistent_task_type() {
1811            value.to_string()
1812        } else if let Some(value) = task.get_transient_task_type() {
1813            value.to_string()
1814        } else {
1815            "unknown".to_string()
1816        }
1817    }
1818
1819    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1820        let task = self.storage.access_mut(task_id);
1821        task.get_persistent_task_type().cloned()
1822    }
1823
1824    fn task_execution_canceled(
1825        &self,
1826        task_id: TaskId,
1827        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1828    ) {
1829        let mut ctx = self.execute_context(turbo_tasks);
1830        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1831        if let Some(in_progress) = task.take_in_progress() {
1832            match in_progress {
1833                InProgressState::Scheduled {
1834                    done_event,
1835                    reason: _,
1836                } => done_event.notify(usize::MAX),
1837                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1838                    done_event.notify(usize::MAX)
1839                }
1840                InProgressState::Canceled => {}
1841            }
1842        }
1843        let old = task.set_in_progress(InProgressState::Canceled);
1844        debug_assert!(old.is_none(), "InProgress already exists");
1845    }
1846
1847    fn try_start_task_execution(
1848        &self,
1849        task_id: TaskId,
1850        priority: TaskPriority,
1851        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1852    ) -> Option<TaskExecutionSpec<'_>> {
1853        let execution_reason;
1854        let task_type;
1855        {
1856            let mut ctx = self.execute_context(turbo_tasks);
1857            let mut task = ctx.task(task_id, TaskDataCategory::All);
1858            task_type = task.get_task_type().to_owned();
1859            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1860            if let Some(tasks) = task.prefetch() {
1861                drop(task);
1862                ctx.prepare_tasks(tasks);
1863                task = ctx.task(task_id, TaskDataCategory::All);
1864            }
1865            let in_progress = task.take_in_progress()?;
1866            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1867                let old = task.set_in_progress(in_progress);
1868                debug_assert!(old.is_none(), "InProgress already exists");
1869                return None;
1870            };
1871            execution_reason = reason;
1872            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1873                InProgressStateInner {
1874                    stale: false,
1875                    once_task,
1876                    done_event,
1877                    session_dependent: false,
1878                    marked_as_completed: false,
1879                    new_children: Default::default(),
1880                },
1881            )));
1882            debug_assert!(old.is_none(), "InProgress already exists");
1883
1884            // Make all current collectibles outdated (remove left-over outdated collectibles)
1885            enum Collectible {
1886                Current(CollectibleRef, i32),
1887                Outdated(CollectibleRef),
1888            }
1889            let collectibles = task
1890                .iter_collectibles()
1891                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1892                .chain(
1893                    task.iter_outdated_collectibles()
1894                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1895                )
1896                .collect::<Vec<_>>();
1897            for collectible in collectibles {
1898                match collectible {
1899                    Collectible::Current(collectible, value) => {
1900                        let _ = task.insert_outdated_collectible(collectible, value);
1901                    }
1902                    Collectible::Outdated(collectible) => {
1903                        if task
1904                            .collectibles()
1905                            .is_none_or(|m| m.get(&collectible).is_none())
1906                        {
1907                            task.remove_outdated_collectibles(&collectible);
1908                        }
1909                    }
1910                }
1911            }
1912
1913            if self.should_track_dependencies() {
1914                // Make all dependencies outdated
1915                let cell_dependencies = task.iter_cell_dependencies().collect();
1916                task.set_outdated_cell_dependencies(cell_dependencies);
1917
1918                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1919                task.set_outdated_output_dependencies(outdated_output_dependencies);
1920            }
1921        }
1922
1923        let (span, future) = match task_type {
1924            TaskType::Cached(task_type) => {
1925                let CachedTaskType {
1926                    native_fn,
1927                    this,
1928                    arg,
1929                } = &*task_type;
1930                (
1931                    native_fn.span(task_id.persistence(), execution_reason, priority),
1932                    native_fn.execute(*this, &**arg),
1933                )
1934            }
1935            TaskType::Transient(task_type) => {
1936                let span = tracing::trace_span!("turbo_tasks::root_task");
1937                let future = match &*task_type {
1938                    TransientTask::Root(f) => f(),
1939                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1940                };
1941                (span, future)
1942            }
1943        };
1944        Some(TaskExecutionSpec { future, span })
1945    }
1946
1947    fn task_execution_completed(
1948        &self,
1949        task_id: TaskId,
1950        result: Result<RawVc, TurboTasksExecutionError>,
1951        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1952        #[cfg(feature = "verify_determinism")] stateful: bool,
1953        has_invalidator: bool,
1954        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1955    ) -> bool {
1956        // Task completion is a 4 step process:
1957        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1958        //    aggregation number of the task and the new children.
1959        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1960        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1961        // 4. Shrink the task memory to reduce footprint of the task.
1962
1963        // Due to persistence it is possible that the process is cancelled after any step. This is
1964        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1965        // in-memory representation.
1966
1967        // The task might be invalidated during this process, so we need to check the stale flag
1968        // at the start of every step.
1969
1970        #[cfg(not(feature = "trace_task_details"))]
1971        let span = tracing::trace_span!(
1972            "task execution completed",
1973            new_children = tracing::field::Empty
1974        )
1975        .entered();
1976        #[cfg(feature = "trace_task_details")]
1977        let span = tracing::trace_span!(
1978            "task execution completed",
1979            task_id = display(task_id),
1980            result = match result.as_ref() {
1981                Ok(value) => display(either::Either::Left(value)),
1982                Err(err) => display(either::Either::Right(err)),
1983            },
1984            new_children = tracing::field::Empty,
1985            immutable = tracing::field::Empty,
1986            new_output = tracing::field::Empty,
1987            output_dependents = tracing::field::Empty,
1988            stale = tracing::field::Empty,
1989        )
1990        .entered();
1991
1992        let is_error = result.is_err();
1993
1994        let mut ctx = self.execute_context(turbo_tasks);
1995
1996        let Some(TaskExecutionCompletePrepareResult {
1997            new_children,
1998            is_now_immutable,
1999            #[cfg(feature = "verify_determinism")]
2000            no_output_set,
2001            new_output,
2002            output_dependent_tasks,
2003        }) = self.task_execution_completed_prepare(
2004            &mut ctx,
2005            #[cfg(feature = "trace_task_details")]
2006            &span,
2007            task_id,
2008            result,
2009            cell_counters,
2010            #[cfg(feature = "verify_determinism")]
2011            stateful,
2012            has_invalidator,
2013        )
2014        else {
2015            // Task was stale and has been rescheduled
2016            #[cfg(feature = "trace_task_details")]
2017            span.record("stale", "prepare");
2018            return true;
2019        };
2020
2021        #[cfg(feature = "trace_task_details")]
2022        span.record("new_output", new_output.is_some());
2023        #[cfg(feature = "trace_task_details")]
2024        span.record("output_dependents", output_dependent_tasks.len());
2025
2026        // When restoring from filesystem cache the following might not be executed (since we can
2027        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2028        // would be executed again.
2029
2030        if !output_dependent_tasks.is_empty() {
2031            self.task_execution_completed_invalidate_output_dependent(
2032                &mut ctx,
2033                task_id,
2034                output_dependent_tasks,
2035            );
2036        }
2037
2038        let has_new_children = !new_children.is_empty();
2039        span.record("new_children", new_children.len());
2040
2041        if has_new_children {
2042            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2043        }
2044
2045        if has_new_children
2046            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2047        {
2048            // Task was stale and has been rescheduled
2049            #[cfg(feature = "trace_task_details")]
2050            span.record("stale", "connect");
2051            return true;
2052        }
2053
2054        let (stale, in_progress_cells) = self.task_execution_completed_finish(
2055            &mut ctx,
2056            task_id,
2057            #[cfg(feature = "verify_determinism")]
2058            no_output_set,
2059            new_output,
2060            is_now_immutable,
2061        );
2062        if stale {
2063            // Task was stale and has been rescheduled
2064            #[cfg(feature = "trace_task_details")]
2065            span.record("stale", "finish");
2066            return true;
2067        }
2068
2069        let removed_data =
2070            self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2071
2072        // Drop data outside of critical sections
2073        drop(removed_data);
2074        drop(in_progress_cells);
2075
2076        false
2077    }
2078
2079    fn task_execution_completed_prepare(
2080        &self,
2081        ctx: &mut impl ExecuteContext<'_>,
2082        #[cfg(feature = "trace_task_details")] span: &Span,
2083        task_id: TaskId,
2084        result: Result<RawVc, TurboTasksExecutionError>,
2085        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2086        #[cfg(feature = "verify_determinism")] stateful: bool,
2087        has_invalidator: bool,
2088    ) -> Option<TaskExecutionCompletePrepareResult> {
2089        let mut task = ctx.task(task_id, TaskDataCategory::All);
2090        let Some(in_progress) = task.get_in_progress_mut() else {
2091            panic!("Task execution completed, but task is not in progress: {task:#?}");
2092        };
2093        if matches!(in_progress, InProgressState::Canceled) {
2094            return Some(TaskExecutionCompletePrepareResult {
2095                new_children: Default::default(),
2096                is_now_immutable: false,
2097                #[cfg(feature = "verify_determinism")]
2098                no_output_set: false,
2099                new_output: None,
2100                output_dependent_tasks: Default::default(),
2101            });
2102        }
2103        let &mut InProgressState::InProgress(box InProgressStateInner {
2104            stale,
2105            ref mut new_children,
2106            session_dependent,
2107            once_task: is_once_task,
2108            ..
2109        }) = in_progress
2110        else {
2111            panic!("Task execution completed, but task is not in progress: {task:#?}");
2112        };
2113
2114        // If the task is stale, reschedule it
2115        #[cfg(not(feature = "no_fast_stale"))]
2116        if stale && !is_once_task {
2117            let Some(InProgressState::InProgress(box InProgressStateInner {
2118                done_event,
2119                mut new_children,
2120                ..
2121            })) = task.take_in_progress()
2122            else {
2123                unreachable!();
2124            };
2125            let old = task.set_in_progress(InProgressState::Scheduled {
2126                done_event,
2127                reason: TaskExecutionReason::Stale,
2128            });
2129            debug_assert!(old.is_none(), "InProgress already exists");
2130            // Remove old children from new_children to leave only the children that had their
2131            // active count increased
2132            for task in task.iter_children() {
2133                new_children.remove(&task);
2134            }
2135            drop(task);
2136
2137            // We need to undo the active count increase for the children since we throw away the
2138            // new_children list now.
2139            AggregationUpdateQueue::run(
2140                AggregationUpdateJob::DecreaseActiveCounts {
2141                    task_ids: new_children.into_iter().collect(),
2142                },
2143                ctx,
2144            );
2145            return None;
2146        }
2147
2148        // take the children from the task to process them
2149        let mut new_children = take(new_children);
2150
2151        // handle stateful (only tracked when verify_determinism is enabled)
2152        #[cfg(feature = "verify_determinism")]
2153        if stateful {
2154            task.set_stateful(true);
2155        }
2156
2157        // handle has_invalidator
2158        if has_invalidator {
2159            task.set_invalidator(true);
2160        }
2161
2162        // handle cell counters: update max index and remove cells that are no longer used
2163        let old_counters: FxHashMap<_, _> = task
2164            .iter_cell_type_max_index()
2165            .map(|(&k, &v)| (k, v))
2166            .collect();
2167        let mut counters_to_remove = old_counters.clone();
2168
2169        for (&cell_type, &max_index) in cell_counters.iter() {
2170            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2171                if old_max_index != max_index {
2172                    task.insert_cell_type_max_index(cell_type, max_index);
2173                }
2174            } else {
2175                task.insert_cell_type_max_index(cell_type, max_index);
2176            }
2177        }
2178        for (cell_type, _) in counters_to_remove {
2179            task.remove_cell_type_max_index(&cell_type);
2180        }
2181
2182        let mut queue = AggregationUpdateQueue::new();
2183
2184        let mut old_edges = Vec::new();
2185
2186        let has_children = !new_children.is_empty();
2187        let is_immutable = task.immutable();
2188        let task_dependencies_for_immutable =
2189            // Task was previously marked as immutable
2190            if !is_immutable
2191            // Task is not session dependent (session dependent tasks can change between sessions)
2192            && !session_dependent
2193            // Task has no invalidator
2194            && !task.invalidator()
2195            // Task has no dependencies on collectibles
2196            && task.is_collectibles_dependencies_empty()
2197        {
2198            Some(
2199                // Collect all dependencies on tasks to check if all dependencies are immutable
2200                task.iter_output_dependencies()
2201                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2202                    .collect::<FxHashSet<_>>(),
2203            )
2204        } else {
2205            None
2206        };
2207
2208        if has_children {
2209            // Prepare all new children
2210            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2211
2212            // Filter actual new children
2213            old_edges.extend(
2214                task.iter_children()
2215                    .filter(|task| !new_children.remove(task))
2216                    .map(OutdatedEdge::Child),
2217            );
2218        } else {
2219            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2220        }
2221
2222        old_edges.extend(
2223            task.iter_outdated_collectibles()
2224                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2225        );
2226
2227        if self.should_track_dependencies() {
2228            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2229            // At execution start, active deps are copied to outdated as a "before" snapshot.
2230            // During execution, new deps are added to active.
2231            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2232            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2233            // breaking dependency tracking.
2234            old_edges.extend(
2235                task.iter_outdated_cell_dependencies()
2236                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2237            );
2238            old_edges.extend(
2239                task.iter_outdated_output_dependencies()
2240                    .map(OutdatedEdge::OutputDependency),
2241            );
2242        }
2243
2244        // Check if output need to be updated
2245        let current_output = task.get_output();
2246        #[cfg(feature = "verify_determinism")]
2247        let no_output_set = current_output.is_none();
2248        let new_output = match result {
2249            Ok(RawVc::TaskOutput(output_task_id)) => {
2250                if let Some(OutputValue::Output(current_task_id)) = current_output
2251                    && *current_task_id == output_task_id
2252                {
2253                    None
2254                } else {
2255                    Some(OutputValue::Output(output_task_id))
2256                }
2257            }
2258            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2259                if let Some(OutputValue::Cell(CellRef {
2260                    task: current_task_id,
2261                    cell: current_cell,
2262                })) = current_output
2263                    && *current_task_id == output_task_id
2264                    && *current_cell == cell
2265                {
2266                    None
2267                } else {
2268                    Some(OutputValue::Cell(CellRef {
2269                        task: output_task_id,
2270                        cell,
2271                    }))
2272                }
2273            }
2274            Ok(RawVc::LocalOutput(..)) => {
2275                panic!("Non-local tasks must not return a local Vc");
2276            }
2277            Err(err) => {
2278                if let Some(OutputValue::Error(old_error)) = current_output
2279                    && **old_error == err
2280                {
2281                    None
2282                } else {
2283                    Some(OutputValue::Error(Arc::new((&err).into())))
2284                }
2285            }
2286        };
2287        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2288        // When output has changed, grab the dependent tasks
2289        if new_output.is_some() && ctx.should_track_dependencies() {
2290            output_dependent_tasks = task.iter_output_dependent().collect();
2291        }
2292
2293        drop(task);
2294
2295        // Check if the task can be marked as immutable
2296        let mut is_now_immutable = false;
2297        if let Some(dependencies) = task_dependencies_for_immutable
2298            && dependencies
2299                .iter()
2300                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2301        {
2302            is_now_immutable = true;
2303        }
2304        #[cfg(feature = "trace_task_details")]
2305        span.record("immutable", is_immutable || is_now_immutable);
2306
2307        if !queue.is_empty() || !old_edges.is_empty() {
2308            #[cfg(feature = "trace_task_completion")]
2309            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2310            // Remove outdated edges first, before removing in_progress+dirty flag.
2311            // We need to make sure all outdated edges are removed before the task can potentially
2312            // be scheduled and executed again
2313            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2314        }
2315
2316        Some(TaskExecutionCompletePrepareResult {
2317            new_children,
2318            is_now_immutable,
2319            #[cfg(feature = "verify_determinism")]
2320            no_output_set,
2321            new_output,
2322            output_dependent_tasks,
2323        })
2324    }
2325
2326    fn task_execution_completed_invalidate_output_dependent(
2327        &self,
2328        ctx: &mut impl ExecuteContext<'_>,
2329        task_id: TaskId,
2330        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2331    ) {
2332        debug_assert!(!output_dependent_tasks.is_empty());
2333
2334        if output_dependent_tasks.len() > 1 {
2335            ctx.prepare_tasks(
2336                output_dependent_tasks
2337                    .iter()
2338                    .map(|&id| (id, TaskDataCategory::All)),
2339            );
2340        }
2341
2342        fn process_output_dependents(
2343            ctx: &mut impl ExecuteContext<'_>,
2344            task_id: TaskId,
2345            dependent_task_id: TaskId,
2346            queue: &mut AggregationUpdateQueue,
2347        ) {
2348            #[cfg(feature = "trace_task_output_dependencies")]
2349            let span = tracing::trace_span!(
2350                "invalidate output dependency",
2351                task = %task_id,
2352                dependent_task = %dependent_task_id,
2353                result = tracing::field::Empty,
2354            )
2355            .entered();
2356            let mut make_stale = true;
2357            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2358            let transient_task_type = dependent.get_transient_task_type();
2359            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2360                // once tasks are never invalidated
2361                #[cfg(feature = "trace_task_output_dependencies")]
2362                span.record("result", "once task");
2363                return;
2364            }
2365            if dependent.outdated_output_dependencies_contains(&task_id) {
2366                #[cfg(feature = "trace_task_output_dependencies")]
2367                span.record("result", "outdated dependency");
2368                // output dependency is outdated, so it hasn't read the output yet
2369                // and doesn't need to be invalidated
2370                // But importantly we still need to make the task dirty as it should no longer
2371                // be considered as "recomputation".
2372                make_stale = false;
2373            } else if !dependent.output_dependencies_contains(&task_id) {
2374                // output dependency has been removed, so the task doesn't depend on the
2375                // output anymore and doesn't need to be invalidated
2376                #[cfg(feature = "trace_task_output_dependencies")]
2377                span.record("result", "no backward dependency");
2378                return;
2379            }
2380            make_task_dirty_internal(
2381                dependent,
2382                dependent_task_id,
2383                make_stale,
2384                #[cfg(feature = "trace_task_dirty")]
2385                TaskDirtyCause::OutputChange { task_id },
2386                queue,
2387                ctx,
2388            );
2389            #[cfg(feature = "trace_task_output_dependencies")]
2390            span.record("result", "marked dirty");
2391        }
2392
2393        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2394            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2395            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2396            let _ = scope_and_block(chunks.len(), |scope| {
2397                for chunk in chunks {
2398                    let child_ctx = ctx.child_context();
2399                    scope.spawn(move || {
2400                        let mut ctx = child_ctx.create();
2401                        let mut queue = AggregationUpdateQueue::new();
2402                        for dependent_task_id in chunk {
2403                            process_output_dependents(
2404                                &mut ctx,
2405                                task_id,
2406                                dependent_task_id,
2407                                &mut queue,
2408                            )
2409                        }
2410                        queue.execute(&mut ctx);
2411                    });
2412                }
2413            });
2414        } else {
2415            let mut queue = AggregationUpdateQueue::new();
2416            for dependent_task_id in output_dependent_tasks {
2417                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2418            }
2419            queue.execute(ctx);
2420        }
2421    }
2422
2423    fn task_execution_completed_unfinished_children_dirty(
2424        &self,
2425        ctx: &mut impl ExecuteContext<'_>,
2426        new_children: &FxHashSet<TaskId>,
2427    ) {
2428        debug_assert!(!new_children.is_empty());
2429
2430        let mut queue = AggregationUpdateQueue::new();
2431        ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2432            if !child_task.has_output() {
2433                let child_id = child_task.id();
2434                make_task_dirty_internal(
2435                    child_task,
2436                    child_id,
2437                    false,
2438                    #[cfg(feature = "trace_task_dirty")]
2439                    TaskDirtyCause::InitialDirty,
2440                    &mut queue,
2441                    ctx,
2442                );
2443            }
2444        });
2445
2446        queue.execute(ctx);
2447    }
2448
2449    fn task_execution_completed_connect(
2450        &self,
2451        ctx: &mut impl ExecuteContext<'_>,
2452        task_id: TaskId,
2453        new_children: FxHashSet<TaskId>,
2454    ) -> bool {
2455        debug_assert!(!new_children.is_empty());
2456
2457        let mut task = ctx.task(task_id, TaskDataCategory::All);
2458        let Some(in_progress) = task.get_in_progress() else {
2459            panic!("Task execution completed, but task is not in progress: {task:#?}");
2460        };
2461        if matches!(in_progress, InProgressState::Canceled) {
2462            // Task was canceled in the meantime, so we don't connect the children
2463            return false;
2464        }
2465        let InProgressState::InProgress(box InProgressStateInner {
2466            #[cfg(not(feature = "no_fast_stale"))]
2467            stale,
2468            once_task: is_once_task,
2469            ..
2470        }) = in_progress
2471        else {
2472            panic!("Task execution completed, but task is not in progress: {task:#?}");
2473        };
2474
2475        // If the task is stale, reschedule it
2476        #[cfg(not(feature = "no_fast_stale"))]
2477        if *stale && !is_once_task {
2478            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2479                task.take_in_progress()
2480            else {
2481                unreachable!();
2482            };
2483            let old = task.set_in_progress(InProgressState::Scheduled {
2484                done_event,
2485                reason: TaskExecutionReason::Stale,
2486            });
2487            debug_assert!(old.is_none(), "InProgress already exists");
2488            drop(task);
2489
2490            // All `new_children` are currently hold active with an active count and we need to undo
2491            // that. (We already filtered out the old children from that list)
2492            AggregationUpdateQueue::run(
2493                AggregationUpdateJob::DecreaseActiveCounts {
2494                    task_ids: new_children.into_iter().collect(),
2495                },
2496                ctx,
2497            );
2498            return true;
2499        }
2500
2501        let has_active_count = ctx.should_track_activeness()
2502            && task
2503                .get_activeness()
2504                .is_some_and(|activeness| activeness.active_counter > 0);
2505        connect_children(
2506            ctx,
2507            task_id,
2508            task,
2509            new_children,
2510            has_active_count,
2511            ctx.should_track_activeness(),
2512        );
2513
2514        false
2515    }
2516
2517    fn task_execution_completed_finish(
2518        &self,
2519        ctx: &mut impl ExecuteContext<'_>,
2520        task_id: TaskId,
2521        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2522        new_output: Option<OutputValue>,
2523        is_now_immutable: bool,
2524    ) -> (
2525        bool,
2526        Option<
2527            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2528        >,
2529    ) {
2530        let mut task = ctx.task(task_id, TaskDataCategory::All);
2531        let Some(in_progress) = task.take_in_progress() else {
2532            panic!("Task execution completed, but task is not in progress: {task:#?}");
2533        };
2534        if matches!(in_progress, InProgressState::Canceled) {
2535            // Task was canceled in the meantime, so we don't finish it
2536            return (false, None);
2537        }
2538        let InProgressState::InProgress(box InProgressStateInner {
2539            done_event,
2540            once_task: is_once_task,
2541            stale,
2542            session_dependent,
2543            marked_as_completed: _,
2544            new_children,
2545        }) = in_progress
2546        else {
2547            panic!("Task execution completed, but task is not in progress: {task:#?}");
2548        };
2549        debug_assert!(new_children.is_empty());
2550
2551        // If the task is stale, reschedule it
2552        if stale && !is_once_task {
2553            let old = task.set_in_progress(InProgressState::Scheduled {
2554                done_event,
2555                reason: TaskExecutionReason::Stale,
2556            });
2557            debug_assert!(old.is_none(), "InProgress already exists");
2558            return (true, None);
2559        }
2560
2561        // Set the output if it has changed
2562        let mut old_content = None;
2563        if let Some(value) = new_output {
2564            old_content = task.set_output(value);
2565        }
2566
2567        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2568        // to be invalidated and we can mark it as immutable.
2569        if is_now_immutable {
2570            task.set_immutable(true);
2571        }
2572
2573        // Notify in progress cells and remove all of them
2574        let in_progress_cells = task.take_in_progress_cells();
2575        if let Some(ref cells) = in_progress_cells {
2576            for state in cells.values() {
2577                state.event.notify(usize::MAX);
2578            }
2579        }
2580
2581        // Grab the old dirty state
2582        let old_dirtyness = task.get_dirty().cloned();
2583        let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2584            None => (false, false),
2585            Some(Dirtyness::Dirty(_)) => (true, false),
2586            Some(Dirtyness::SessionDependent) => {
2587                let clean_in_current_session = task.current_session_clean();
2588                (true, clean_in_current_session)
2589            }
2590        };
2591
2592        // Compute the new dirty state
2593        let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2594            (Some(Dirtyness::SessionDependent), true, true)
2595        } else {
2596            (None, false, false)
2597        };
2598
2599        // Update the dirty state
2600        let dirty_changed = old_dirtyness != new_dirtyness;
2601        if dirty_changed {
2602            if let Some(value) = new_dirtyness {
2603                task.set_dirty(value);
2604            } else if old_dirtyness.is_some() {
2605                task.take_dirty();
2606            }
2607        }
2608        if old_current_session_self_clean != new_current_session_self_clean {
2609            if new_current_session_self_clean {
2610                task.set_current_session_clean(true);
2611            } else if old_current_session_self_clean {
2612                task.set_current_session_clean(false);
2613            }
2614        }
2615
2616        // Propagate dirtyness changes
2617        let data_update = if old_self_dirty != new_self_dirty
2618            || old_current_session_self_clean != new_current_session_self_clean
2619        {
2620            let dirty_container_count = task
2621                .get_aggregated_dirty_container_count()
2622                .cloned()
2623                .unwrap_or_default();
2624            let current_session_clean_container_count = task
2625                .get_aggregated_current_session_clean_container_count()
2626                .copied()
2627                .unwrap_or_default();
2628            let result = ComputeDirtyAndCleanUpdate {
2629                old_dirty_container_count: dirty_container_count,
2630                new_dirty_container_count: dirty_container_count,
2631                old_current_session_clean_container_count: current_session_clean_container_count,
2632                new_current_session_clean_container_count: current_session_clean_container_count,
2633                old_self_dirty,
2634                new_self_dirty,
2635                old_current_session_self_clean,
2636                new_current_session_self_clean,
2637            }
2638            .compute();
2639            if result.dirty_count_update - result.current_session_clean_update < 0 {
2640                // The task is clean now
2641                if let Some(activeness_state) = task.get_activeness_mut() {
2642                    activeness_state.all_clean_event.notify(usize::MAX);
2643                    activeness_state.unset_active_until_clean();
2644                    if activeness_state.is_empty() {
2645                        task.take_activeness();
2646                    }
2647                }
2648            }
2649            result
2650                .aggregated_update(task_id)
2651                .and_then(|aggregated_update| {
2652                    AggregationUpdateJob::data_update(&mut task, aggregated_update)
2653                })
2654        } else {
2655            None
2656        };
2657
2658        #[cfg(feature = "verify_determinism")]
2659        let reschedule =
2660            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2661        #[cfg(not(feature = "verify_determinism"))]
2662        let reschedule = false;
2663        if reschedule {
2664            let old = task.set_in_progress(InProgressState::Scheduled {
2665                done_event,
2666                reason: TaskExecutionReason::Stale,
2667            });
2668            debug_assert!(old.is_none(), "InProgress already exists");
2669            drop(task);
2670        } else {
2671            drop(task);
2672
2673            // Notify dependent tasks that are waiting for this task to finish
2674            done_event.notify(usize::MAX);
2675        }
2676
2677        drop(old_content);
2678
2679        if let Some(data_update) = data_update {
2680            AggregationUpdateQueue::run(data_update, ctx);
2681        }
2682
2683        // We return so the data can be dropped outside of critical sections
2684        (reschedule, in_progress_cells)
2685    }
2686
2687    fn task_execution_completed_cleanup(
2688        &self,
2689        ctx: &mut impl ExecuteContext<'_>,
2690        task_id: TaskId,
2691        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2692        is_error: bool,
2693    ) -> Vec<SharedReference> {
2694        let mut task = ctx.task(task_id, TaskDataCategory::All);
2695        let mut removed_cell_data = Vec::new();
2696        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2697        // after an error as it is likely transient and we want to keep the dependent tasks
2698        // clean to avoid re-executions.
2699        if !is_error {
2700            // Remove no longer existing cells and
2701            // find all outdated data items (removed cells, outdated edges)
2702            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2703            // anyway and we want to avoid needless re-executions. When the cells become
2704            // used again, they are invalidated from the update cell operation.
2705            // Remove cell data for cells that no longer exist
2706            let to_remove_persistent: Vec<_> = task
2707                .iter_persistent_cell_data()
2708                .filter_map(|(cell, _)| {
2709                    cell_counters
2710                        .get(&cell.type_id)
2711                        .is_none_or(|start_index| cell.index >= *start_index)
2712                        .then_some(*cell)
2713                })
2714                .collect();
2715
2716            // Remove transient cell data for cells that no longer exist
2717            let to_remove_transient: Vec<_> = task
2718                .iter_transient_cell_data()
2719                .filter_map(|(cell, _)| {
2720                    cell_counters
2721                        .get(&cell.type_id)
2722                        .is_none_or(|start_index| cell.index >= *start_index)
2723                        .then_some(*cell)
2724                })
2725                .collect();
2726            removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2727            for cell in to_remove_persistent {
2728                if let Some(data) = task.remove_persistent_cell_data(&cell) {
2729                    removed_cell_data.push(data.into_untyped());
2730                }
2731            }
2732            for cell in to_remove_transient {
2733                if let Some(data) = task.remove_transient_cell_data(&cell) {
2734                    removed_cell_data.push(data);
2735                }
2736            }
2737        }
2738
2739        // Clean up task storage after execution:
2740        // - Shrink collections marked with shrink_on_completion
2741        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2742        task.cleanup_after_execution();
2743
2744        drop(task);
2745
2746        // Return so we can drop outside of critical sections
2747        removed_cell_data
2748    }
2749
2750    fn run_backend_job<'a>(
2751        self: &'a Arc<Self>,
2752        job: TurboTasksBackendJob,
2753        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2754    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2755        Box::pin(async move {
2756            match job {
2757                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2758                    debug_assert!(self.should_persist());
2759
2760                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2761                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2762                    let mut idle_start_listener = self.idle_start_event.listen();
2763                    let mut idle_end_listener = self.idle_end_event.listen();
2764                    let mut fresh_idle = true;
2765                    loop {
2766                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2767                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2768                        let idle_timeout = *IDLE_TIMEOUT;
2769                        let (time, mut reason) =
2770                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2771                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2772                            } else {
2773                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2774                            };
2775
2776                        let until = last_snapshot + time;
2777                        if until > Instant::now() {
2778                            let mut stop_listener = self.stopping_event.listen();
2779                            if self.stopping.load(Ordering::Acquire) {
2780                                return;
2781                            }
2782                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2783                                Instant::now() + idle_timeout
2784                            } else {
2785                                far_future()
2786                            };
2787                            loop {
2788                                tokio::select! {
2789                                    _ = &mut stop_listener => {
2790                                        return;
2791                                    },
2792                                    _ = &mut idle_start_listener => {
2793                                        fresh_idle = true;
2794                                        idle_time = Instant::now() + idle_timeout;
2795                                        idle_start_listener = self.idle_start_event.listen()
2796                                    },
2797                                    _ = &mut idle_end_listener => {
2798                                        idle_time = until + idle_timeout;
2799                                        idle_end_listener = self.idle_end_event.listen()
2800                                    },
2801                                    _ = tokio::time::sleep_until(until) => {
2802                                        break;
2803                                    },
2804                                    _ = tokio::time::sleep_until(idle_time) => {
2805                                        if turbo_tasks.is_idle() {
2806                                            reason = "idle timeout";
2807                                            break;
2808                                        }
2809                                    },
2810                                }
2811                            }
2812                        }
2813
2814                        let this = self.clone();
2815                        let snapshot = this.snapshot_and_persist(None, reason, turbo_tasks);
2816                        if let Some((snapshot_start, new_data)) = snapshot {
2817                            last_snapshot = snapshot_start;
2818                            if !new_data {
2819                                fresh_idle = false;
2820                                continue;
2821                            }
2822                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2823                            self.last_snapshot.store(
2824                                last_snapshot.as_millis().try_into().unwrap(),
2825                                Ordering::Relaxed,
2826                            );
2827
2828                            turbo_tasks.schedule_backend_background_job(
2829                                TurboTasksBackendJob::FollowUpSnapshot,
2830                            );
2831                            return;
2832                        }
2833                    }
2834                }
2835            }
2836        })
2837    }
2838
2839    fn try_read_own_task_cell(
2840        &self,
2841        task_id: TaskId,
2842        cell: CellId,
2843        options: ReadCellOptions,
2844        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2845    ) -> Result<TypedCellContent> {
2846        let mut ctx = self.execute_context(turbo_tasks);
2847        let task = ctx.task(task_id, TaskDataCategory::Data);
2848        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2849            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2850            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2851        } else {
2852            Ok(CellContent(None).into_typed(cell.type_id))
2853        }
2854    }
2855
2856    fn read_task_collectibles(
2857        &self,
2858        task_id: TaskId,
2859        collectible_type: TraitTypeId,
2860        reader_id: Option<TaskId>,
2861        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2862    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2863        let mut ctx = self.execute_context(turbo_tasks);
2864        let mut collectibles = AutoMap::default();
2865        {
2866            let mut task = ctx.task(task_id, TaskDataCategory::All);
2867            // Ensure it's an root node
2868            loop {
2869                let aggregation_number = get_aggregation_number(&task);
2870                if is_root_node(aggregation_number) {
2871                    break;
2872                }
2873                drop(task);
2874                AggregationUpdateQueue::run(
2875                    AggregationUpdateJob::UpdateAggregationNumber {
2876                        task_id,
2877                        base_aggregation_number: u32::MAX,
2878                        distance: None,
2879                    },
2880                    &mut ctx,
2881                );
2882                task = ctx.task(task_id, TaskDataCategory::All);
2883            }
2884            for (collectible, count) in task.iter_aggregated_collectibles() {
2885                if *count > 0 && collectible.collectible_type == collectible_type {
2886                    *collectibles
2887                        .entry(RawVc::TaskCell(
2888                            collectible.cell.task,
2889                            collectible.cell.cell,
2890                        ))
2891                        .or_insert(0) += 1;
2892                }
2893            }
2894            for (&collectible, &count) in task.iter_collectibles() {
2895                if collectible.collectible_type == collectible_type {
2896                    *collectibles
2897                        .entry(RawVc::TaskCell(
2898                            collectible.cell.task,
2899                            collectible.cell.cell,
2900                        ))
2901                        .or_insert(0) += count;
2902                }
2903            }
2904            if let Some(reader_id) = reader_id {
2905                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2906            }
2907        }
2908        if let Some(reader_id) = reader_id {
2909            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2910            let target = CollectiblesRef {
2911                task: task_id,
2912                collectible_type,
2913            };
2914            if !reader.remove_outdated_collectibles_dependencies(&target) {
2915                let _ = reader.add_collectibles_dependencies(target);
2916            }
2917        }
2918        collectibles
2919    }
2920
2921    fn emit_collectible(
2922        &self,
2923        collectible_type: TraitTypeId,
2924        collectible: RawVc,
2925        task_id: TaskId,
2926        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2927    ) {
2928        self.assert_valid_collectible(task_id, collectible);
2929
2930        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2931            panic!("Collectibles need to be resolved");
2932        };
2933        let cell = CellRef {
2934            task: collectible_task,
2935            cell,
2936        };
2937        operation::UpdateCollectibleOperation::run(
2938            task_id,
2939            CollectibleRef {
2940                collectible_type,
2941                cell,
2942            },
2943            1,
2944            self.execute_context(turbo_tasks),
2945        );
2946    }
2947
2948    fn unemit_collectible(
2949        &self,
2950        collectible_type: TraitTypeId,
2951        collectible: RawVc,
2952        count: u32,
2953        task_id: TaskId,
2954        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2955    ) {
2956        self.assert_valid_collectible(task_id, collectible);
2957
2958        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2959            panic!("Collectibles need to be resolved");
2960        };
2961        let cell = CellRef {
2962            task: collectible_task,
2963            cell,
2964        };
2965        operation::UpdateCollectibleOperation::run(
2966            task_id,
2967            CollectibleRef {
2968                collectible_type,
2969                cell,
2970            },
2971            -(i32::try_from(count).unwrap()),
2972            self.execute_context(turbo_tasks),
2973        );
2974    }
2975
2976    fn update_task_cell(
2977        &self,
2978        task_id: TaskId,
2979        cell: CellId,
2980        is_serializable_cell_content: bool,
2981        content: CellContent,
2982        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2983        verification_mode: VerificationMode,
2984        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2985    ) {
2986        operation::UpdateCellOperation::run(
2987            task_id,
2988            cell,
2989            content,
2990            is_serializable_cell_content,
2991            updated_key_hashes,
2992            verification_mode,
2993            self.execute_context(turbo_tasks),
2994        );
2995    }
2996
2997    fn mark_own_task_as_session_dependent(
2998        &self,
2999        task_id: TaskId,
3000        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3001    ) {
3002        if !self.should_track_dependencies() {
3003            // Without dependency tracking we don't need session dependent tasks
3004            return;
3005        }
3006        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3007        let mut ctx = self.execute_context(turbo_tasks);
3008        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3009        let aggregation_number = get_aggregation_number(&task);
3010        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3011            drop(task);
3012            // We want to use a high aggregation number to avoid large aggregation chains for
3013            // session dependent tasks (which change on every run)
3014            AggregationUpdateQueue::run(
3015                AggregationUpdateJob::UpdateAggregationNumber {
3016                    task_id,
3017                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3018                    distance: None,
3019                },
3020                &mut ctx,
3021            );
3022            task = ctx.task(task_id, TaskDataCategory::Meta);
3023        }
3024        if let Some(InProgressState::InProgress(box InProgressStateInner {
3025            session_dependent,
3026            ..
3027        })) = task.get_in_progress_mut()
3028        {
3029            *session_dependent = true;
3030        }
3031    }
3032
3033    fn mark_own_task_as_finished(
3034        &self,
3035        task: TaskId,
3036        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3037    ) {
3038        let mut ctx = self.execute_context(turbo_tasks);
3039        let mut task = ctx.task(task, TaskDataCategory::Data);
3040        if let Some(InProgressState::InProgress(box InProgressStateInner {
3041            marked_as_completed,
3042            ..
3043        })) = task.get_in_progress_mut()
3044        {
3045            *marked_as_completed = true;
3046            // TODO this should remove the dirty state (also check session_dependent)
3047            // but this would break some assumptions for strongly consistent reads.
3048            // Client tasks are not connected yet, so we wouldn't wait for them.
3049            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3050        }
3051    }
3052
3053    fn connect_task(
3054        &self,
3055        task: TaskId,
3056        parent_task: Option<TaskId>,
3057        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3058    ) {
3059        self.assert_not_persistent_calling_transient(parent_task, task, None);
3060        ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3061    }
3062
3063    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3064        let task_id = self.transient_task_id_factory.get();
3065        {
3066            let mut task = self.storage.access_mut(task_id);
3067            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3068        }
3069        #[cfg(feature = "verify_aggregation_graph")]
3070        self.root_tasks.lock().insert(task_id);
3071        task_id
3072    }
3073
3074    fn dispose_root_task(
3075        &self,
3076        task_id: TaskId,
3077        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3078    ) {
3079        #[cfg(feature = "verify_aggregation_graph")]
3080        self.root_tasks.lock().remove(&task_id);
3081
3082        let mut ctx = self.execute_context(turbo_tasks);
3083        let mut task = ctx.task(task_id, TaskDataCategory::All);
3084        let is_dirty = task.is_dirty();
3085        let has_dirty_containers = task.has_dirty_containers();
3086        if is_dirty.is_some() || has_dirty_containers {
3087            if let Some(activeness_state) = task.get_activeness_mut() {
3088                // We will finish the task, but it would be removed after the task is done
3089                activeness_state.unset_root_type();
3090                activeness_state.set_active_until_clean();
3091            };
3092        } else if let Some(activeness_state) = task.take_activeness() {
3093            // Technically nobody should be listening to this event, but just in case
3094            // we notify it anyway
3095            activeness_state.all_clean_event.notify(usize::MAX);
3096        }
3097    }
3098
3099    #[cfg(feature = "verify_aggregation_graph")]
3100    fn verify_aggregation_graph(
3101        &self,
3102        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3103        idle: bool,
3104    ) {
3105        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3106            return;
3107        }
3108        use std::{collections::VecDeque, env, io::stdout};
3109
3110        use crate::backend::operation::{get_uppers, is_aggregating_node};
3111
3112        let mut ctx = self.execute_context(turbo_tasks);
3113        let root_tasks = self.root_tasks.lock().clone();
3114
3115        for task_id in root_tasks.into_iter() {
3116            let mut queue = VecDeque::new();
3117            let mut visited = FxHashSet::default();
3118            let mut aggregated_nodes = FxHashSet::default();
3119            let mut collectibles = FxHashMap::default();
3120            let root_task_id = task_id;
3121            visited.insert(task_id);
3122            aggregated_nodes.insert(task_id);
3123            queue.push_back(task_id);
3124            let mut counter = 0;
3125            while let Some(task_id) = queue.pop_front() {
3126                counter += 1;
3127                if counter % 100000 == 0 {
3128                    println!(
3129                        "queue={}, visited={}, aggregated_nodes={}",
3130                        queue.len(),
3131                        visited.len(),
3132                        aggregated_nodes.len()
3133                    );
3134                }
3135                let task = ctx.task(task_id, TaskDataCategory::All);
3136                if idle && !self.is_idle.load(Ordering::Relaxed) {
3137                    return;
3138                }
3139
3140                let uppers = get_uppers(&task);
3141                if task_id != root_task_id
3142                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3143                {
3144                    panic!(
3145                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3146                         {:?})",
3147                        task_id,
3148                        task.get_task_description(),
3149                        uppers
3150                    );
3151                }
3152
3153                for (collectible, _) in task.iter_aggregated_collectibles() {
3154                    collectibles
3155                        .entry(*collectible)
3156                        .or_insert_with(|| (false, Vec::new()))
3157                        .1
3158                        .push(task_id);
3159                }
3160
3161                for (&collectible, &value) in task.iter_collectibles() {
3162                    if value > 0 {
3163                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3164                            *flag = true
3165                        } else {
3166                            panic!(
3167                                "Task {} has a collectible {:?} that is not in any upper task",
3168                                task_id, collectible
3169                            );
3170                        }
3171                    }
3172                }
3173
3174                let is_dirty = task.has_dirty();
3175                let has_dirty_container = task.has_dirty_containers();
3176                let should_be_in_upper = is_dirty || has_dirty_container;
3177
3178                let aggregation_number = get_aggregation_number(&task);
3179                if is_aggregating_node(aggregation_number) {
3180                    aggregated_nodes.insert(task_id);
3181                }
3182                // println!(
3183                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3184                //     ctx.get_task_description(task_id),
3185                //     uppers
3186                // );
3187
3188                for child_id in task.iter_children() {
3189                    // println!("{task_id}: child -> {child_id}");
3190                    if visited.insert(child_id) {
3191                        queue.push_back(child_id);
3192                    }
3193                }
3194                drop(task);
3195
3196                if should_be_in_upper {
3197                    for upper_id in uppers {
3198                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3199                        let in_upper = upper
3200                            .get_aggregated_dirty_containers(&task_id)
3201                            .is_some_and(|&dirty| dirty > 0);
3202                        if !in_upper {
3203                            let containers: Vec<_> = upper
3204                                .iter_aggregated_dirty_containers()
3205                                .map(|(&k, &v)| (k, v))
3206                                .collect();
3207                            let upper_task_desc = upper.get_task_description();
3208                            drop(upper);
3209                            panic!(
3210                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3211                                 ({})\nThese dirty containers are present:\n{:#?}",
3212                                task_id,
3213                                ctx.task(task_id, TaskDataCategory::Data)
3214                                    .get_task_description(),
3215                                upper_id,
3216                                upper_task_desc,
3217                                containers,
3218                            );
3219                        }
3220                    }
3221                }
3222            }
3223
3224            for (collectible, (flag, task_ids)) in collectibles {
3225                if !flag {
3226                    use std::io::Write;
3227                    let mut stdout = stdout().lock();
3228                    writeln!(
3229                        stdout,
3230                        "{:?} that is not emitted in any child task but in these aggregated \
3231                         tasks: {:#?}",
3232                        collectible,
3233                        task_ids
3234                            .iter()
3235                            .map(|t| format!(
3236                                "{t} {}",
3237                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3238                            ))
3239                            .collect::<Vec<_>>()
3240                    )
3241                    .unwrap();
3242
3243                    let task_id = collectible.cell.task;
3244                    let mut queue = {
3245                        let task = ctx.task(task_id, TaskDataCategory::All);
3246                        get_uppers(&task)
3247                    };
3248                    let mut visited = FxHashSet::default();
3249                    for &upper_id in queue.iter() {
3250                        visited.insert(upper_id);
3251                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3252                    }
3253                    while let Some(task_id) = queue.pop() {
3254                        let task = ctx.task(task_id, TaskDataCategory::All);
3255                        let desc = task.get_task_description();
3256                        let aggregated_collectible = task
3257                            .get_aggregated_collectibles(&collectible)
3258                            .copied()
3259                            .unwrap_or_default();
3260                        let uppers = get_uppers(&task);
3261                        drop(task);
3262                        writeln!(
3263                            stdout,
3264                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3265                        )
3266                        .unwrap();
3267                        if task_ids.contains(&task_id) {
3268                            writeln!(
3269                                stdout,
3270                                "Task has an upper connection to an aggregated task that doesn't \
3271                                 reference it. Upper connection is invalid!"
3272                            )
3273                            .unwrap();
3274                        }
3275                        for upper_id in uppers {
3276                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3277                            if !visited.contains(&upper_id) {
3278                                queue.push(upper_id);
3279                            }
3280                        }
3281                    }
3282                    panic!("See stdout for more details");
3283                }
3284            }
3285        }
3286    }
3287
3288    fn assert_not_persistent_calling_transient(
3289        &self,
3290        parent_id: Option<TaskId>,
3291        child_id: TaskId,
3292        cell_id: Option<CellId>,
3293    ) {
3294        if let Some(parent_id) = parent_id
3295            && !parent_id.is_transient()
3296            && child_id.is_transient()
3297        {
3298            self.panic_persistent_calling_transient(
3299                self.debug_get_task_description(parent_id),
3300                self.debug_get_cached_task_type(child_id).as_deref(),
3301                cell_id,
3302            );
3303        }
3304    }
3305
3306    fn panic_persistent_calling_transient(
3307        &self,
3308        parent: String,
3309        child: Option<&CachedTaskType>,
3310        cell_id: Option<CellId>,
3311    ) {
3312        let transient_reason = if let Some(child) = child {
3313            Cow::Owned(format!(
3314                " The callee is transient because it depends on:\n{}",
3315                self.debug_trace_transient_task(child, cell_id),
3316            ))
3317        } else {
3318            Cow::Borrowed("")
3319        };
3320        panic!(
3321            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3322            parent,
3323            child.map_or("unknown", |t| t.get_name()),
3324            transient_reason,
3325        );
3326    }
3327
3328    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3329        // these checks occur in a potentially hot codepath, but they're cheap
3330        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3331            // This should never happen: The collectible APIs use ResolvedVc
3332            let task_info = if let Some(col_task_ty) = collectible
3333                .try_get_task_id()
3334                .map(|t| self.debug_get_task_description(t))
3335            {
3336                Cow::Owned(format!(" (return type of {col_task_ty})"))
3337            } else {
3338                Cow::Borrowed("")
3339            };
3340            panic!("Collectible{task_info} must be a ResolvedVc")
3341        };
3342        if col_task_id.is_transient() && !task_id.is_transient() {
3343            let transient_reason =
3344                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3345                    Cow::Owned(format!(
3346                        ". The collectible is transient because it depends on:\n{}",
3347                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3348                    ))
3349                } else {
3350                    Cow::Borrowed("")
3351                };
3352            // this should never happen: How would a persistent function get a transient Vc?
3353            panic!(
3354                "Collectible is transient, transient collectibles cannot be emitted from \
3355                 persistent tasks{transient_reason}",
3356            )
3357        }
3358    }
3359}
3360
3361impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3362    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3363        self.0.startup(turbo_tasks);
3364    }
3365
3366    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3367        self.0.stopping();
3368    }
3369
3370    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3371        self.0.stop(turbo_tasks);
3372    }
3373
3374    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3375        self.0.idle_start(turbo_tasks);
3376    }
3377
3378    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3379        self.0.idle_end();
3380    }
3381
3382    fn get_or_create_persistent_task(
3383        &self,
3384        task_type: CachedTaskType,
3385        parent_task: Option<TaskId>,
3386        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3387    ) -> TaskId {
3388        self.0
3389            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3390    }
3391
3392    fn get_or_create_transient_task(
3393        &self,
3394        task_type: CachedTaskType,
3395        parent_task: Option<TaskId>,
3396        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3397    ) -> TaskId {
3398        self.0
3399            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3400    }
3401
3402    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3403        self.0.invalidate_task(task_id, turbo_tasks);
3404    }
3405
3406    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3407        self.0.invalidate_tasks(tasks, turbo_tasks);
3408    }
3409
3410    fn invalidate_tasks_set(
3411        &self,
3412        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3413        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3414    ) {
3415        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3416    }
3417
3418    fn invalidate_serialization(
3419        &self,
3420        task_id: TaskId,
3421        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3422    ) {
3423        self.0.invalidate_serialization(task_id, turbo_tasks);
3424    }
3425
3426    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3427        self.0.task_execution_canceled(task, turbo_tasks)
3428    }
3429
3430    fn try_start_task_execution(
3431        &self,
3432        task_id: TaskId,
3433        priority: TaskPriority,
3434        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3435    ) -> Option<TaskExecutionSpec<'_>> {
3436        self.0
3437            .try_start_task_execution(task_id, priority, turbo_tasks)
3438    }
3439
3440    fn task_execution_completed(
3441        &self,
3442        task_id: TaskId,
3443        result: Result<RawVc, TurboTasksExecutionError>,
3444        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3445        #[cfg(feature = "verify_determinism")] stateful: bool,
3446        has_invalidator: bool,
3447        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3448    ) -> bool {
3449        self.0.task_execution_completed(
3450            task_id,
3451            result,
3452            cell_counters,
3453            #[cfg(feature = "verify_determinism")]
3454            stateful,
3455            has_invalidator,
3456            turbo_tasks,
3457        )
3458    }
3459
3460    type BackendJob = TurboTasksBackendJob;
3461
3462    fn run_backend_job<'a>(
3463        &'a self,
3464        job: Self::BackendJob,
3465        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3466    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3467        self.0.run_backend_job(job, turbo_tasks)
3468    }
3469
3470    fn try_read_task_output(
3471        &self,
3472        task_id: TaskId,
3473        reader: Option<TaskId>,
3474        options: ReadOutputOptions,
3475        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3476    ) -> Result<Result<RawVc, EventListener>> {
3477        self.0
3478            .try_read_task_output(task_id, reader, options, turbo_tasks)
3479    }
3480
3481    fn try_read_task_cell(
3482        &self,
3483        task_id: TaskId,
3484        cell: CellId,
3485        reader: Option<TaskId>,
3486        options: ReadCellOptions,
3487        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3488    ) -> Result<Result<TypedCellContent, EventListener>> {
3489        self.0
3490            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3491    }
3492
3493    fn try_read_own_task_cell(
3494        &self,
3495        task_id: TaskId,
3496        cell: CellId,
3497        options: ReadCellOptions,
3498        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3499    ) -> Result<TypedCellContent> {
3500        self.0
3501            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3502    }
3503
3504    fn read_task_collectibles(
3505        &self,
3506        task_id: TaskId,
3507        collectible_type: TraitTypeId,
3508        reader: Option<TaskId>,
3509        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3510    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3511        self.0
3512            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3513    }
3514
3515    fn emit_collectible(
3516        &self,
3517        collectible_type: TraitTypeId,
3518        collectible: RawVc,
3519        task_id: TaskId,
3520        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3521    ) {
3522        self.0
3523            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3524    }
3525
3526    fn unemit_collectible(
3527        &self,
3528        collectible_type: TraitTypeId,
3529        collectible: RawVc,
3530        count: u32,
3531        task_id: TaskId,
3532        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3533    ) {
3534        self.0
3535            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3536    }
3537
3538    fn update_task_cell(
3539        &self,
3540        task_id: TaskId,
3541        cell: CellId,
3542        is_serializable_cell_content: bool,
3543        content: CellContent,
3544        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3545        verification_mode: VerificationMode,
3546        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3547    ) {
3548        self.0.update_task_cell(
3549            task_id,
3550            cell,
3551            is_serializable_cell_content,
3552            content,
3553            updated_key_hashes,
3554            verification_mode,
3555            turbo_tasks,
3556        );
3557    }
3558
3559    fn mark_own_task_as_finished(
3560        &self,
3561        task_id: TaskId,
3562        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3563    ) {
3564        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3565    }
3566
3567    fn mark_own_task_as_session_dependent(
3568        &self,
3569        task: TaskId,
3570        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3571    ) {
3572        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3573    }
3574
3575    fn connect_task(
3576        &self,
3577        task: TaskId,
3578        parent_task: Option<TaskId>,
3579        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3580    ) {
3581        self.0.connect_task(task, parent_task, turbo_tasks);
3582    }
3583
3584    fn create_transient_task(
3585        &self,
3586        task_type: TransientTaskType,
3587        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3588    ) -> TaskId {
3589        self.0.create_transient_task(task_type)
3590    }
3591
3592    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3593        self.0.dispose_root_task(task_id, turbo_tasks);
3594    }
3595
3596    fn task_statistics(&self) -> &TaskStatisticsApi {
3597        &self.0.task_statistics
3598    }
3599
3600    fn is_tracking_dependencies(&self) -> bool {
3601        self.0.options.dependency_tracking
3602    }
3603
3604    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3605        self.0.get_task_name(task, turbo_tasks)
3606    }
3607}
3608
3609enum DebugTraceTransientTask {
3610    Cached {
3611        task_name: &'static str,
3612        cell_type_id: Option<ValueTypeId>,
3613        cause_self: Option<Box<DebugTraceTransientTask>>,
3614        cause_args: Vec<DebugTraceTransientTask>,
3615    },
3616    /// This representation is used when this task is a duplicate of one previously shown
3617    Collapsed {
3618        task_name: &'static str,
3619        cell_type_id: Option<ValueTypeId>,
3620    },
3621    Uncached {
3622        cell_type_id: Option<ValueTypeId>,
3623    },
3624}
3625
3626impl DebugTraceTransientTask {
3627    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3628        let indent = "    ".repeat(level);
3629        f.write_str(&indent)?;
3630
3631        fn fmt_cell_type_id(
3632            f: &mut fmt::Formatter<'_>,
3633            cell_type_id: Option<ValueTypeId>,
3634        ) -> fmt::Result {
3635            if let Some(ty) = cell_type_id {
3636                write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3637            } else {
3638                Ok(())
3639            }
3640        }
3641
3642        // write the name and type
3643        match self {
3644            Self::Cached {
3645                task_name,
3646                cell_type_id,
3647                ..
3648            }
3649            | Self::Collapsed {
3650                task_name,
3651                cell_type_id,
3652                ..
3653            } => {
3654                f.write_str(task_name)?;
3655                fmt_cell_type_id(f, *cell_type_id)?;
3656                if matches!(self, Self::Collapsed { .. }) {
3657                    f.write_str(" (collapsed)")?;
3658                }
3659            }
3660            Self::Uncached { cell_type_id } => {
3661                f.write_str("unknown transient task")?;
3662                fmt_cell_type_id(f, *cell_type_id)?;
3663            }
3664        }
3665        f.write_char('\n')?;
3666
3667        // write any extra "cause" information we might have
3668        if let Self::Cached {
3669            cause_self,
3670            cause_args,
3671            ..
3672        } = self
3673        {
3674            if let Some(c) = cause_self {
3675                writeln!(f, "{indent}  self:")?;
3676                c.fmt_indented(f, level + 1)?;
3677            }
3678            if !cause_args.is_empty() {
3679                writeln!(f, "{indent}  args:")?;
3680                for c in cause_args {
3681                    c.fmt_indented(f, level + 1)?;
3682                }
3683            }
3684        }
3685        Ok(())
3686    }
3687}
3688
3689impl fmt::Display for DebugTraceTransientTask {
3690    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3691        self.fmt_indented(f, 0)
3692    }
3693}
3694
3695// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3696fn far_future() -> Instant {
3697    // Roughly 30 years from now.
3698    // API does not provide a way to obtain max `Instant`
3699    // or convert specific date in the future to instant.
3700    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3701    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3702}
3703
3704/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3705/// buffer.
3706/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3707/// resulting buffer sizes.
3708fn encode_task_data(
3709    task: TaskId,
3710    data: &TaskStorage,
3711    category: SpecificTaskDataCategory,
3712    scratch_buffer: &mut TurboBincodeBuffer,
3713) -> Result<TurboBincodeBuffer> {
3714    scratch_buffer.clear();
3715    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3716    data.encode(category, &mut encoder)?;
3717
3718    if cfg!(feature = "verify_serialization") {
3719        TaskStorage::new()
3720            .decode(
3721                category,
3722                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3723            )
3724            .with_context(|| {
3725                format!(
3726                    "expected to be able to decode serialized data for '{category:?}' information \
3727                     for {task}"
3728                )
3729            })?;
3730    }
3731    Ok(SmallVec::from_slice(scratch_buffer))
3732}