Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7    borrow::Cow,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    pin::Pin,
13    sync::{
14        Arc, LazyLock,
15        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16    },
17};
18
19use anyhow::{Context, Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use tracing::{Span, trace_span};
27use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
28use turbo_tasks::{
29    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
30    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
31    TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
32    backend::{
33        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
34        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
35        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
36        VerificationMode,
37    },
38    event::{Event, EventDescription, EventListener},
39    message_queue::TimingEvent,
40    registry::get_value_type,
41    scope::scope_and_block,
42    task_statistics::TaskStatisticsApi,
43    trace::TraceRawVcs,
44    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
45};
46
47pub use self::{
48    operation::AnyOperation,
49    storage::{SpecificTaskDataCategory, TaskDataCategory},
50};
51#[cfg(feature = "trace_task_dirty")]
52use crate::backend::operation::TaskDirtyCause;
53use crate::{
54    backend::{
55        operation::{
56            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
57            CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
58            ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
59            TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
60            is_root_node, make_task_dirty_internal, prepare_new_children,
61        },
62        storage::Storage,
63        storage_schema::{TaskStorage, TaskStorageAccessors},
64    },
65    backing_storage::BackingStorage,
66    data::{
67        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
68        InProgressState, InProgressStateInner, OutputValue, TransientTask,
69    },
70    error::TaskError,
71    utils::{
72        arc_or_owned::ArcOrOwned,
73        chunked_vec::ChunkedVec,
74        dash_map_drop_contents::drop_contents,
75        dash_map_raw_entry::{RawEntry, raw_entry},
76        ptr_eq_arc::PtrEqArc,
77        shard_amount::compute_shard_amount,
78        sharded::Sharded,
79        swap_retain,
80    },
81};
82
83/// Threshold for parallelizing making dependent tasks dirty.
84/// If the number of dependent tasks exceeds this threshold,
85/// the operation will be parallelized.
86const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
87
88const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
89
90/// Configurable idle timeout for snapshot persistence.
91/// Defaults to 2 seconds if not set or if the value is invalid.
92static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
93    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
94        .ok()
95        .and_then(|v| v.parse::<u64>().ok())
96        .map(Duration::from_millis)
97        .unwrap_or(Duration::from_secs(2))
98});
99
100struct SnapshotRequest {
101    snapshot_requested: bool,
102    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
103}
104
105impl SnapshotRequest {
106    fn new() -> Self {
107        Self {
108            snapshot_requested: false,
109            suspended_operations: FxHashSet::default(),
110        }
111    }
112}
113
114pub enum StorageMode {
115    /// Queries the storage for cache entries that don't exist locally.
116    ReadOnly,
117    /// Queries the storage for cache entries that don't exist locally.
118    /// Regularly pushes changes to the backing storage.
119    ReadWrite,
120    /// Queries the storage for cache entries that don't exist locally.
121    /// On shutdown, pushes all changes to the backing storage.
122    ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126    /// Enables dependency tracking.
127    ///
128    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
129    /// forever.
130    pub dependency_tracking: bool,
131
132    /// Enables active tracking.
133    ///
134    /// Automatically disabled when `dependency_tracking` is disabled.
135    ///
136    /// When disabled: All tasks are considered as active.
137    pub active_tracking: bool,
138
139    /// Enables the backing storage.
140    pub storage_mode: Option<StorageMode>,
141
142    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
143    /// datastructures. If `None`, it will use the available parallelism.
144    pub num_workers: Option<usize>,
145
146    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
147    pub small_preallocation: bool,
148}
149
150impl Default for BackendOptions {
151    fn default() -> Self {
152        Self {
153            dependency_tracking: true,
154            active_tracking: true,
155            storage_mode: Some(StorageMode::ReadWrite),
156            num_workers: None,
157            small_preallocation: false,
158        }
159    }
160}
161
162pub enum TurboTasksBackendJob {
163    InitialSnapshot,
164    FollowUpSnapshot,
165}
166
167pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
168
169type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
170
171struct TurboTasksBackendInner<B: BackingStorage> {
172    options: BackendOptions,
173
174    start_time: Instant,
175
176    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
177    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
178
179    persisted_task_cache_log: Option<TaskCacheLog>,
180    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
181
182    storage: Storage,
183
184    /// When true, the backing_storage has data that is not in the local storage.
185    local_is_partial: AtomicBool,
186
187    /// Number of executing operations + Highest bit is set when snapshot is
188    /// requested. When that bit is set, operations should pause until the
189    /// snapshot is completed. When the bit is set and in progress counter
190    /// reaches zero, `operations_completed_when_snapshot_requested` is
191    /// triggered.
192    in_progress_operations: AtomicUsize,
193
194    snapshot_request: Mutex<SnapshotRequest>,
195    /// Condition Variable that is triggered when `in_progress_operations`
196    /// reaches zero while snapshot is requested. All operations are either
197    /// completed or suspended.
198    operations_suspended: Condvar,
199    /// Condition Variable that is triggered when a snapshot is completed and
200    /// operations can continue.
201    snapshot_completed: Condvar,
202    /// The timestamp of the last started snapshot since [`Self::start_time`].
203    last_snapshot: AtomicU64,
204
205    stopping: AtomicBool,
206    stopping_event: Event,
207    idle_start_event: Event,
208    idle_end_event: Event,
209    #[cfg(feature = "verify_aggregation_graph")]
210    is_idle: AtomicBool,
211
212    task_statistics: TaskStatisticsApi,
213
214    backing_storage: B,
215
216    #[cfg(feature = "verify_aggregation_graph")]
217    root_tasks: Mutex<FxHashSet<TaskId>>,
218}
219
220impl<B: BackingStorage> TurboTasksBackend<B> {
221    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
222        Self(Arc::new(TurboTasksBackendInner::new(
223            options,
224            backing_storage,
225        )))
226    }
227
228    pub fn backing_storage(&self) -> &B {
229        &self.0.backing_storage
230    }
231}
232
233impl<B: BackingStorage> TurboTasksBackendInner<B> {
234    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
235        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
236        let need_log = matches!(
237            options.storage_mode,
238            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
239        );
240        if !options.dependency_tracking {
241            options.active_tracking = false;
242        }
243        let small_preallocation = options.small_preallocation;
244        let next_task_id = backing_storage
245            .next_free_task_id()
246            .expect("Failed to get task id");
247        Self {
248            options,
249            start_time: Instant::now(),
250            persisted_task_id_factory: IdFactoryWithReuse::new(
251                next_task_id,
252                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
253            ),
254            transient_task_id_factory: IdFactoryWithReuse::new(
255                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
256                TaskId::MAX,
257            ),
258            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
259            task_cache: FxDashMap::default(),
260            local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
261            storage: Storage::new(shard_amount, small_preallocation),
262            in_progress_operations: AtomicUsize::new(0),
263            snapshot_request: Mutex::new(SnapshotRequest::new()),
264            operations_suspended: Condvar::new(),
265            snapshot_completed: Condvar::new(),
266            last_snapshot: AtomicU64::new(0),
267            stopping: AtomicBool::new(false),
268            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
269            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
270            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
271            #[cfg(feature = "verify_aggregation_graph")]
272            is_idle: AtomicBool::new(false),
273            task_statistics: TaskStatisticsApi::default(),
274            backing_storage,
275            #[cfg(feature = "verify_aggregation_graph")]
276            root_tasks: Default::default(),
277        }
278    }
279
280    fn execute_context<'a>(
281        &'a self,
282        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
283    ) -> impl ExecuteContext<'a> {
284        ExecuteContextImpl::new(self, turbo_tasks)
285    }
286
287    /// # Safety
288    ///
289    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
290    unsafe fn execute_context_with_tx<'e, 'tx>(
291        &'e self,
292        tx: Option<&'e B::ReadTransaction<'tx>>,
293        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
294    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
295    where
296        'tx: 'e,
297    {
298        // Safety: `tx` is from `self`.
299        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
300    }
301
302    fn suspending_requested(&self) -> bool {
303        self.should_persist()
304            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
305    }
306
307    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
308        #[cold]
309        fn operation_suspend_point_cold<B: BackingStorage>(
310            this: &TurboTasksBackendInner<B>,
311            suspend: impl FnOnce() -> AnyOperation,
312        ) {
313            let operation = Arc::new(suspend());
314            let mut snapshot_request = this.snapshot_request.lock();
315            if snapshot_request.snapshot_requested {
316                snapshot_request
317                    .suspended_operations
318                    .insert(operation.clone().into());
319                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
320                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
321                if value == SNAPSHOT_REQUESTED_BIT {
322                    this.operations_suspended.notify_all();
323                }
324                this.snapshot_completed
325                    .wait_while(&mut snapshot_request, |snapshot_request| {
326                        snapshot_request.snapshot_requested
327                    });
328                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
329                snapshot_request
330                    .suspended_operations
331                    .remove(&operation.into());
332            }
333        }
334
335        if self.suspending_requested() {
336            operation_suspend_point_cold(self, suspend);
337        }
338    }
339
340    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
341        if !self.should_persist() {
342            return OperationGuard { backend: None };
343        }
344        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
345        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
346            let mut snapshot_request = self.snapshot_request.lock();
347            if snapshot_request.snapshot_requested {
348                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
349                if value == SNAPSHOT_REQUESTED_BIT {
350                    self.operations_suspended.notify_all();
351                }
352                self.snapshot_completed
353                    .wait_while(&mut snapshot_request, |snapshot_request| {
354                        snapshot_request.snapshot_requested
355                    });
356                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
357            }
358        }
359        OperationGuard {
360            backend: Some(self),
361        }
362    }
363
364    fn should_persist(&self) -> bool {
365        matches!(
366            self.options.storage_mode,
367            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
368        )
369    }
370
371    fn should_restore(&self) -> bool {
372        self.options.storage_mode.is_some()
373    }
374
375    fn should_track_dependencies(&self) -> bool {
376        self.options.dependency_tracking
377    }
378
379    fn should_track_activeness(&self) -> bool {
380        self.options.active_tracking
381    }
382
383    fn track_cache_hit(&self, task_type: &CachedTaskType) {
384        self.task_statistics
385            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
386    }
387
388    fn track_cache_miss(&self, task_type: &CachedTaskType) {
389        self.task_statistics
390            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
391    }
392
393    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
394    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
395    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
396    /// references for lazy name resolution.
397    fn task_error_to_turbo_tasks_execution_error(
398        &self,
399        error: &TaskError,
400        ctx: &mut impl ExecuteContext<'_>,
401    ) -> TurboTasksExecutionError {
402        match error {
403            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
404            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
405                message: item.message.clone(),
406                source: item
407                    .source
408                    .as_ref()
409                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
410            })),
411            TaskError::LocalTaskContext(local_task_context) => {
412                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
413                    name: local_task_context.name.clone(),
414                    source: local_task_context
415                        .source
416                        .as_ref()
417                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
418                }))
419            }
420            TaskError::TaskChain(chain) => {
421                let task_id = chain.last().unwrap();
422                let error = {
423                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
424                    if let Some(OutputValue::Error(error)) = task.get_output() {
425                        Some(error.clone())
426                    } else {
427                        None
428                    }
429                };
430                let error = error.map_or_else(
431                    || {
432                        // Eventual consistency will cause errors to no longer be available
433                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
434                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
435                                "Error no longer available",
436                            )),
437                            location: None,
438                        }))
439                    },
440                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
441                );
442                let mut current_error = error;
443                for &task_id in chain.iter().rev() {
444                    current_error =
445                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
446                            task_id,
447                            source: Some(current_error),
448                            turbo_tasks: ctx.turbo_tasks(),
449                        }));
450                }
451                current_error
452            }
453        }
454    }
455}
456
457pub(crate) struct OperationGuard<'a, B: BackingStorage> {
458    backend: Option<&'a TurboTasksBackendInner<B>>,
459}
460
461impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
462    fn drop(&mut self) {
463        if let Some(backend) = self.backend {
464            let fetch_sub = backend
465                .in_progress_operations
466                .fetch_sub(1, Ordering::AcqRel);
467            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
468                backend.operations_suspended.notify_all();
469            }
470        }
471    }
472}
473
474/// Intermediate result of step 1 of task execution completion.
475struct TaskExecutionCompletePrepareResult {
476    pub new_children: FxHashSet<TaskId>,
477    pub is_now_immutable: bool,
478    #[cfg(feature = "verify_determinism")]
479    pub no_output_set: bool,
480    pub new_output: Option<OutputValue>,
481    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
482}
483
484// Operations
485impl<B: BackingStorage> TurboTasksBackendInner<B> {
486    /// # Safety
487    ///
488    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
489    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
490        &'l self,
491        tx: Option<&'l B::ReadTransaction<'tx>>,
492        parent_task: Option<TaskId>,
493        child_task: TaskId,
494        task_type: Option<ArcOrOwned<CachedTaskType>>,
495        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
496    ) {
497        operation::ConnectChildOperation::run(parent_task, child_task, task_type, unsafe {
498            self.execute_context_with_tx(tx, turbo_tasks)
499        });
500    }
501
502    fn connect_child(
503        &self,
504        parent_task: Option<TaskId>,
505        child_task: TaskId,
506        task_type: Option<ArcOrOwned<CachedTaskType>>,
507        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
508    ) {
509        operation::ConnectChildOperation::run(
510            parent_task,
511            child_task,
512            task_type,
513            self.execute_context(turbo_tasks),
514        );
515    }
516
517    fn try_read_task_output(
518        self: &Arc<Self>,
519        task_id: TaskId,
520        reader: Option<TaskId>,
521        options: ReadOutputOptions,
522        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
523    ) -> Result<Result<RawVc, EventListener>> {
524        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
525
526        let mut ctx = self.execute_context(turbo_tasks);
527        let need_reader_task = if self.should_track_dependencies()
528            && !matches!(options.tracking, ReadTracking::Untracked)
529            && reader.is_some_and(|reader_id| reader_id != task_id)
530            && let Some(reader_id) = reader
531            && reader_id != task_id
532        {
533            Some(reader_id)
534        } else {
535            None
536        };
537        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
538            // Having a task_pair here is not optimal, but otherwise this would lead to a race
539            // condition. See below.
540            // TODO(sokra): solve that in a more performant way.
541            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
542            (task, Some(reader))
543        } else {
544            (ctx.task(task_id, TaskDataCategory::All), None)
545        };
546
547        fn listen_to_done_event(
548            reader_description: Option<EventDescription>,
549            tracking: ReadTracking,
550            done_event: &Event,
551        ) -> EventListener {
552            done_event.listen_with_note(move || {
553                move || {
554                    if let Some(reader_description) = reader_description.as_ref() {
555                        format!(
556                            "try_read_task_output from {} ({})",
557                            reader_description, tracking
558                        )
559                    } else {
560                        format!("try_read_task_output ({})", tracking)
561                    }
562                }
563            })
564        }
565
566        fn check_in_progress(
567            task: &impl TaskGuard,
568            reader_description: Option<EventDescription>,
569            tracking: ReadTracking,
570        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
571        {
572            match task.get_in_progress() {
573                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
574                    listen_to_done_event(reader_description, tracking, done_event),
575                ))),
576                Some(InProgressState::InProgress(box InProgressStateInner {
577                    done_event, ..
578                })) => Some(Ok(Err(listen_to_done_event(
579                    reader_description,
580                    tracking,
581                    done_event,
582                )))),
583                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
584                    "{} was canceled",
585                    task.get_task_description()
586                ))),
587                None => None,
588            }
589        }
590
591        if matches!(options.consistency, ReadConsistency::Strong) {
592            // Ensure it's an root node
593            loop {
594                let aggregation_number = get_aggregation_number(&task);
595                if is_root_node(aggregation_number) {
596                    break;
597                }
598                drop(task);
599                drop(reader_task);
600                {
601                    let _span = tracing::trace_span!(
602                        "make root node for strongly consistent read",
603                        %task_id
604                    )
605                    .entered();
606                    AggregationUpdateQueue::run(
607                        AggregationUpdateJob::UpdateAggregationNumber {
608                            task_id,
609                            base_aggregation_number: u32::MAX,
610                            distance: None,
611                        },
612                        &mut ctx,
613                    );
614                }
615                (task, reader_task) = if let Some(reader_id) = need_reader_task {
616                    // TODO(sokra): see comment above
617                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
618                    (task, Some(reader))
619                } else {
620                    (ctx.task(task_id, TaskDataCategory::All), None)
621                }
622            }
623
624            let is_dirty = task.is_dirty();
625
626            // Check the dirty count of the root node
627            let has_dirty_containers = task.has_dirty_containers();
628            if has_dirty_containers || is_dirty.is_some() {
629                let activeness = task.get_activeness_mut();
630                let mut task_ids_to_schedule: Vec<_> = Vec::new();
631                // When there are dirty task, subscribe to the all_clean_event
632                let activeness = if let Some(activeness) = activeness {
633                    // This makes sure all tasks stay active and this task won't stale.
634                    // active_until_clean is automatically removed when this
635                    // task is clean.
636                    activeness.set_active_until_clean();
637                    activeness
638                } else {
639                    // If we don't have a root state, add one. This also makes sure all tasks stay
640                    // active and this task won't stale. active_until_clean
641                    // is automatically removed when this task is clean.
642                    if ctx.should_track_activeness() {
643                        // A newly added Activeness need to make sure to schedule the tasks
644                        task_ids_to_schedule = task.dirty_containers().collect();
645                        task_ids_to_schedule.push(task_id);
646                    }
647                    let activeness =
648                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
649                    activeness.set_active_until_clean();
650                    activeness
651                };
652                let listener = activeness.all_clean_event.listen_with_note(move || {
653                    let this = self.clone();
654                    let tt = turbo_tasks.pin();
655                    move || {
656                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
657                        let mut ctx = this.execute_context(tt);
658                        let mut visited = FxHashSet::default();
659                        fn indent(s: &str) -> String {
660                            s.split_inclusive('\n')
661                                .flat_map(|line: &str| ["  ", line].into_iter())
662                                .collect::<String>()
663                        }
664                        fn get_info(
665                            ctx: &mut impl ExecuteContext<'_>,
666                            task_id: TaskId,
667                            parent_and_count: Option<(TaskId, i32)>,
668                            visited: &mut FxHashSet<TaskId>,
669                        ) -> String {
670                            let task = ctx.task(task_id, TaskDataCategory::All);
671                            let is_dirty = task.is_dirty();
672                            let in_progress =
673                                task.get_in_progress()
674                                    .map_or("not in progress", |p| match p {
675                                        InProgressState::InProgress(_) => "in progress",
676                                        InProgressState::Scheduled { .. } => "scheduled",
677                                        InProgressState::Canceled => "canceled",
678                                    });
679                            let activeness = task.get_activeness().map_or_else(
680                                || "not active".to_string(),
681                                |activeness| format!("{activeness:?}"),
682                            );
683                            let aggregation_number = get_aggregation_number(&task);
684                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
685                            {
686                                let uppers = get_uppers(&task);
687                                !uppers.contains(&parent_task_id)
688                            } else {
689                                false
690                            };
691
692                            // Check the dirty count of the root node
693                            let has_dirty_containers = task.has_dirty_containers();
694
695                            let task_description = task.get_task_description();
696                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
697                                format!(", dirty({parent_priority})")
698                            } else {
699                                String::new()
700                            };
701                            let has_dirty_containers_label = if has_dirty_containers {
702                                ", dirty containers"
703                            } else {
704                                ""
705                            };
706                            let count = if let Some((_, count)) = parent_and_count {
707                                format!(" {count}")
708                            } else {
709                                String::new()
710                            };
711                            let mut info = format!(
712                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
713                                 {in_progress}, \
714                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
715                            );
716                            let children: Vec<_> = task.dirty_containers_with_count().collect();
717                            drop(task);
718
719                            if missing_upper {
720                                info.push_str("\n  ERROR: missing upper connection");
721                            }
722
723                            if has_dirty_containers || !children.is_empty() {
724                                writeln!(info, "\n  dirty tasks:").unwrap();
725
726                                for (child_task_id, count) in children {
727                                    let task_description = ctx
728                                        .task(child_task_id, TaskDataCategory::Data)
729                                        .get_task_description();
730                                    if visited.insert(child_task_id) {
731                                        let child_info = get_info(
732                                            ctx,
733                                            child_task_id,
734                                            Some((task_id, count)),
735                                            visited,
736                                        );
737                                        info.push_str(&indent(&child_info));
738                                        if !info.ends_with('\n') {
739                                            info.push('\n');
740                                        }
741                                    } else {
742                                        writeln!(
743                                            info,
744                                            "  {child_task_id} {task_description} {count} \
745                                             (already visited)"
746                                        )
747                                        .unwrap();
748                                    }
749                                }
750                            }
751                            info
752                        }
753                        let info = get_info(&mut ctx, task_id, None, &mut visited);
754                        format!(
755                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
756                        )
757                    }
758                });
759                drop(reader_task);
760                drop(task);
761                if !task_ids_to_schedule.is_empty() {
762                    let mut queue = AggregationUpdateQueue::new();
763                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
764                    queue.execute(&mut ctx);
765                }
766
767                return Ok(Err(listener));
768            }
769        }
770
771        let reader_description = reader_task
772            .as_ref()
773            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
774        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
775        {
776            return value;
777        }
778
779        if let Some(output) = task.get_output() {
780            let result = match output {
781                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
782                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
783                OutputValue::Error(error) => Err(error.clone()),
784            };
785            if let Some(mut reader_task) = reader_task.take()
786                && options.tracking.should_track(result.is_err())
787                && (!task.immutable() || cfg!(feature = "verify_immutable"))
788            {
789                #[cfg(feature = "trace_task_output_dependencies")]
790                let _span = tracing::trace_span!(
791                    "add output dependency",
792                    task = %task_id,
793                    dependent_task = ?reader
794                )
795                .entered();
796                let mut queue = LeafDistanceUpdateQueue::new();
797                let reader = reader.unwrap();
798                if task.add_output_dependent(reader) {
799                    // Ensure that dependent leaf distance is strictly monotonic increasing
800                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
801                    let reader_leaf_distance =
802                        reader_task.get_leaf_distance().copied().unwrap_or_default();
803                    if reader_leaf_distance.distance <= leaf_distance.distance {
804                        queue.push(
805                            reader,
806                            leaf_distance.distance,
807                            leaf_distance.max_distance_in_buffer,
808                        );
809                    }
810                }
811
812                drop(task);
813
814                // Note: We use `task_pair` earlier to lock the task and its reader at the same
815                // time. If we didn't and just locked the reader here, an invalidation could occur
816                // between grabbing the locks. If that happened, and if the task is "outdated" or
817                // doesn't have the dependency edge yet, the invalidation would be lost.
818
819                if !reader_task.remove_outdated_output_dependencies(&task_id) {
820                    let _ = reader_task.add_output_dependencies(task_id);
821                }
822                drop(reader_task);
823
824                queue.execute(&mut ctx);
825            } else {
826                drop(task);
827            }
828
829            return result.map_err(|error| {
830                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
831                    .with_task_context(task_id, turbo_tasks.pin())
832                    .into()
833            });
834        }
835        drop(reader_task);
836
837        let note = EventDescription::new(|| {
838            move || {
839                if let Some(reader) = reader_description.as_ref() {
840                    format!("try_read_task_output (recompute) from {reader}",)
841                } else {
842                    "try_read_task_output (recompute, untracked)".to_string()
843                }
844            }
845        });
846
847        // Output doesn't exist. We need to schedule the task to compute it.
848        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
849            TaskExecutionReason::OutputNotAvailable,
850            EventDescription::new(|| task.get_task_desc_fn()),
851            note,
852        );
853
854        // It's not possible that the task is InProgress at this point. If it is InProgress {
855        // done: true } it must have Output and would early return.
856        let old = task.set_in_progress(in_progress_state);
857        debug_assert!(old.is_none(), "InProgress already exists");
858        ctx.schedule_task(task, TaskPriority::Initial);
859
860        Ok(Err(listener))
861    }
862
863    fn try_read_task_cell(
864        &self,
865        task_id: TaskId,
866        reader: Option<TaskId>,
867        cell: CellId,
868        options: ReadCellOptions,
869        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
870    ) -> Result<Result<TypedCellContent, EventListener>> {
871        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
872
873        fn add_cell_dependency(
874            task_id: TaskId,
875            mut task: impl TaskGuard,
876            reader: Option<TaskId>,
877            reader_task: Option<impl TaskGuard>,
878            cell: CellId,
879            key: Option<u64>,
880        ) {
881            if let Some(mut reader_task) = reader_task
882                && (!task.immutable() || cfg!(feature = "verify_immutable"))
883            {
884                let reader = reader.unwrap();
885                let _ = task.add_cell_dependents((cell, key, reader));
886                drop(task);
887
888                // Note: We use `task_pair` earlier to lock the task and its reader at the same
889                // time. If we didn't and just locked the reader here, an invalidation could occur
890                // between grabbing the locks. If that happened, and if the task is "outdated" or
891                // doesn't have the dependency edge yet, the invalidation would be lost.
892
893                let target = CellRef {
894                    task: task_id,
895                    cell,
896                };
897                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
898                    let _ = reader_task.add_cell_dependencies((target, key));
899                }
900                drop(reader_task);
901            }
902        }
903
904        let ReadCellOptions {
905            is_serializable_cell_content,
906            tracking,
907            final_read_hint,
908        } = options;
909
910        let mut ctx = self.execute_context(turbo_tasks);
911        let (mut task, reader_task) = if self.should_track_dependencies()
912            && !matches!(tracking, ReadCellTracking::Untracked)
913            && let Some(reader_id) = reader
914            && reader_id != task_id
915        {
916            // Having a task_pair here is not optimal, but otherwise this would lead to a race
917            // condition. See below.
918            // TODO(sokra): solve that in a more performant way.
919            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
920            (task, Some(reader))
921        } else {
922            (ctx.task(task_id, TaskDataCategory::All), None)
923        };
924
925        let content = if final_read_hint {
926            task.remove_cell_data(is_serializable_cell_content, cell)
927        } else {
928            task.get_cell_data(is_serializable_cell_content, cell)
929        };
930        if let Some(content) = content {
931            if tracking.should_track(false) {
932                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
933            }
934            return Ok(Ok(TypedCellContent(
935                cell.type_id,
936                CellContent(Some(content.reference)),
937            )));
938        }
939
940        let in_progress = task.get_in_progress();
941        if matches!(
942            in_progress,
943            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
944        ) {
945            return Ok(Err(self
946                .listen_to_cell(&mut task, task_id, &reader_task, cell)
947                .0));
948        }
949        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
950
951        // Check cell index range (cell might not exist at all)
952        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
953        let Some(max_id) = max_id else {
954            let task_desc = task.get_task_description();
955            if tracking.should_track(true) {
956                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
957            }
958            bail!(
959                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
960            );
961        };
962        if cell.index >= max_id {
963            let task_desc = task.get_task_description();
964            if tracking.should_track(true) {
965                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
966            }
967            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
968        }
969
970        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
971        // task the get the cell content.
972
973        // Listen to the cell and potentially schedule the task
974        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
975        drop(reader_task);
976        if !new_listener {
977            return Ok(Err(listener));
978        }
979
980        let _span = tracing::trace_span!(
981            "recomputation",
982            cell_type = get_value_type(cell.type_id).global_name,
983            cell_index = cell.index
984        )
985        .entered();
986
987        // Schedule the task, if not already scheduled
988        if is_cancelled {
989            bail!("{} was canceled", task.get_task_description());
990        }
991
992        let _ = task.add_scheduled(
993            TaskExecutionReason::CellNotAvailable,
994            EventDescription::new(|| task.get_task_desc_fn()),
995        );
996        ctx.schedule_task(task, TaskPriority::Initial);
997
998        Ok(Err(listener))
999    }
1000
1001    fn listen_to_cell(
1002        &self,
1003        task: &mut impl TaskGuard,
1004        task_id: TaskId,
1005        reader_task: &Option<impl TaskGuard>,
1006        cell: CellId,
1007    ) -> (EventListener, bool) {
1008        let note = || {
1009            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
1010            move || {
1011                if let Some(reader_desc) = reader_desc.as_ref() {
1012                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
1013                } else {
1014                    "try_read_task_cell (in progress, untracked)".to_string()
1015                }
1016            }
1017        };
1018        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
1019            // Someone else is already computing the cell
1020            let listener = in_progress.event.listen_with_note(note);
1021            return (listener, false);
1022        }
1023        let in_progress = InProgressCellState::new(task_id, cell);
1024        let listener = in_progress.event.listen_with_note(note);
1025        let old = task.insert_in_progress_cells(cell, in_progress);
1026        debug_assert!(old.is_none(), "InProgressCell already exists");
1027        (listener, true)
1028    }
1029
1030    fn snapshot_and_persist(
1031        &self,
1032        parent_span: Option<tracing::Id>,
1033        reason: &str,
1034        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1035    ) -> Option<(Instant, bool)> {
1036        let snapshot_span =
1037            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1038                .entered();
1039        let start = Instant::now();
1040        debug_assert!(self.should_persist());
1041
1042        let suspended_operations;
1043        {
1044            let _span = tracing::info_span!("blocking").entered();
1045            let mut snapshot_request = self.snapshot_request.lock();
1046            snapshot_request.snapshot_requested = true;
1047            let active_operations = self
1048                .in_progress_operations
1049                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1050            if active_operations != 0 {
1051                self.operations_suspended
1052                    .wait_while(&mut snapshot_request, |_| {
1053                        self.in_progress_operations.load(Ordering::Relaxed)
1054                            != SNAPSHOT_REQUESTED_BIT
1055                    });
1056            }
1057            suspended_operations = snapshot_request
1058                .suspended_operations
1059                .iter()
1060                .map(|op| op.arc().clone())
1061                .collect::<Vec<_>>();
1062        }
1063        self.storage.start_snapshot();
1064        let mut persisted_task_cache_log = self
1065            .persisted_task_cache_log
1066            .as_ref()
1067            .map(|l| l.take(|i| i))
1068            .unwrap_or_default();
1069        let mut snapshot_request = self.snapshot_request.lock();
1070        snapshot_request.snapshot_requested = false;
1071        self.in_progress_operations
1072            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1073        self.snapshot_completed.notify_all();
1074        let snapshot_time = Instant::now();
1075        drop(snapshot_request);
1076
1077        #[cfg(feature = "print_cache_item_size")]
1078        #[derive(Default)]
1079        struct TaskCacheStats {
1080            data: usize,
1081            data_compressed: usize,
1082            data_count: usize,
1083            meta: usize,
1084            meta_compressed: usize,
1085            meta_count: usize,
1086            upper_count: usize,
1087            collectibles_count: usize,
1088            aggregated_collectibles_count: usize,
1089            children_count: usize,
1090            followers_count: usize,
1091            collectibles_dependents_count: usize,
1092            aggregated_dirty_containers_count: usize,
1093            output_size: usize,
1094        }
1095        #[cfg(feature = "print_cache_item_size")]
1096        impl TaskCacheStats {
1097            fn compressed_size(data: &[u8]) -> Result<usize> {
1098                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1099                    data,
1100                    &mut Vec::new(),
1101                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1102                )?)
1103            }
1104
1105            fn add_data(&mut self, data: &[u8]) {
1106                self.data += data.len();
1107                self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1108                self.data_count += 1;
1109            }
1110
1111            fn add_meta(&mut self, data: &[u8]) {
1112                self.meta += data.len();
1113                self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1114                self.meta_count += 1;
1115            }
1116
1117            fn add_counts(&mut self, storage: &TaskStorage) {
1118                let counts = storage.meta_counts();
1119                self.upper_count += counts.upper;
1120                self.collectibles_count += counts.collectibles;
1121                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1122                self.children_count += counts.children;
1123                self.followers_count += counts.followers;
1124                self.collectibles_dependents_count += counts.collectibles_dependents;
1125                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1126                if let Some(output) = storage.get_output() {
1127                    use turbo_bincode::turbo_bincode_encode;
1128
1129                    self.output_size += turbo_bincode_encode(&output)
1130                        .map(|data| data.len())
1131                        .unwrap_or(0);
1132                }
1133            }
1134        }
1135        #[cfg(feature = "print_cache_item_size")]
1136        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1137            Mutex::new(FxHashMap::default());
1138
1139        let preprocess = |task_id: TaskId, inner: &TaskStorage| {
1140            if task_id.is_transient() {
1141                return (None, None);
1142            }
1143
1144            let meta_restored = inner.flags.meta_restored();
1145            let data_restored = inner.flags.data_restored();
1146
1147            // Encode meta/data directly from TaskStorage
1148            let meta = meta_restored.then(|| inner.clone_meta_snapshot());
1149            let data = data_restored.then(|| inner.clone_data_snapshot());
1150
1151            (meta, data)
1152        };
1153        let process = |task_id: TaskId,
1154                       (meta, data): (Option<TaskStorage>, Option<TaskStorage>),
1155                       buffer: &mut TurboBincodeBuffer| {
1156            #[cfg(feature = "print_cache_item_size")]
1157            if let Some(ref m) = meta {
1158                task_cache_stats
1159                    .lock()
1160                    .entry(self.get_task_name(task_id, turbo_tasks))
1161                    .or_default()
1162                    .add_counts(m);
1163            }
1164            (
1165                task_id,
1166                meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)),
1167                data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)),
1168            )
1169        };
1170        let process_snapshot =
1171            |task_id: TaskId, inner: Box<TaskStorage>, buffer: &mut TurboBincodeBuffer| {
1172                if task_id.is_transient() {
1173                    return (task_id, None, None);
1174                }
1175
1176                #[cfg(feature = "print_cache_item_size")]
1177                if inner.flags.meta_modified() {
1178                    task_cache_stats
1179                        .lock()
1180                        .entry(self.get_task_name(task_id, turbo_tasks))
1181                        .or_default()
1182                        .add_counts(&inner);
1183                }
1184
1185                // Encode meta/data directly from TaskStorage snapshot
1186                (
1187                    task_id,
1188                    inner.flags.meta_modified().then(|| {
1189                        encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer)
1190                    }),
1191                    inner.flags.data_modified().then(|| {
1192                        encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer)
1193                    }),
1194                )
1195            };
1196
1197        let snapshot = self
1198            .storage
1199            .take_snapshot(&preprocess, &process, &process_snapshot);
1200
1201        let task_snapshots = snapshot
1202            .into_iter()
1203            .filter_map(|iter| {
1204                let mut iter = iter
1205                    .filter_map(
1206                        |(task_id, meta, data): (
1207                            _,
1208                            Option<Result<SmallVec<_>>>,
1209                            Option<Result<SmallVec<_>>>,
1210                        )| {
1211                            let meta = match meta {
1212                                Some(Ok(meta)) => {
1213                                    #[cfg(feature = "print_cache_item_size")]
1214                                    task_cache_stats
1215                                        .lock()
1216                                        .entry(self.get_task_name(task_id, turbo_tasks))
1217                                        .or_default()
1218                                        .add_meta(&meta);
1219                                    Some(meta)
1220                                }
1221                                None => None,
1222                                Some(Err(err)) => {
1223                                    println!(
1224                                        "Serializing task {} failed (meta): {:?}",
1225                                        self.debug_get_task_description(task_id),
1226                                        err
1227                                    );
1228                                    None
1229                                }
1230                            };
1231                            let data = match data {
1232                                Some(Ok(data)) => {
1233                                    #[cfg(feature = "print_cache_item_size")]
1234                                    task_cache_stats
1235                                        .lock()
1236                                        .entry(self.get_task_name(task_id, turbo_tasks))
1237                                        .or_default()
1238                                        .add_data(&data);
1239                                    Some(data)
1240                                }
1241                                None => None,
1242                                Some(Err(err)) => {
1243                                    println!(
1244                                        "Serializing task {} failed (data): {:?}",
1245                                        self.debug_get_task_description(task_id),
1246                                        err
1247                                    );
1248                                    None
1249                                }
1250                            };
1251                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1252                        },
1253                    )
1254                    .peekable();
1255                iter.peek().is_some().then_some(iter)
1256            })
1257            .collect::<Vec<_>>();
1258
1259        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1260
1261        drop(snapshot_span);
1262
1263        if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1264            return Some((snapshot_time, false));
1265        }
1266
1267        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1268        {
1269            if let Err(err) = self.backing_storage.save_snapshot(
1270                suspended_operations,
1271                persisted_task_cache_log,
1272                task_snapshots,
1273            ) {
1274                eprintln!("Persisting failed: {err:?}");
1275                return None;
1276            }
1277            #[cfg(feature = "print_cache_item_size")]
1278            {
1279                let mut task_cache_stats = task_cache_stats
1280                    .into_inner()
1281                    .into_iter()
1282                    .collect::<Vec<_>>();
1283                if !task_cache_stats.is_empty() {
1284                    use turbo_tasks::util::FormatBytes;
1285
1286                    use crate::utils::markdown_table::print_markdown_table;
1287
1288                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1289                        (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1290                            .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1291                    });
1292                    println!(
1293                        "Task cache stats: {} ({})",
1294                        FormatBytes(
1295                            task_cache_stats
1296                                .iter()
1297                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1298                                .sum::<usize>()
1299                        ),
1300                        FormatBytes(
1301                            task_cache_stats
1302                                .iter()
1303                                .map(|(_, s)| s.data + s.meta)
1304                                .sum::<usize>()
1305                        )
1306                    );
1307
1308                    print_markdown_table(
1309                        [
1310                            "Task",
1311                            " Total Size",
1312                            " Data Size",
1313                            " Data Count x Avg",
1314                            " Data Count x Avg",
1315                            " Meta Size",
1316                            " Meta Count x Avg",
1317                            " Meta Count x Avg",
1318                            " Uppers",
1319                            " Coll",
1320                            " Agg Coll",
1321                            " Children",
1322                            " Followers",
1323                            " Coll Deps",
1324                            " Agg Dirty",
1325                            " Output Size",
1326                        ],
1327                        task_cache_stats.iter(),
1328                        |(task_desc, stats)| {
1329                            [
1330                                task_desc.to_string(),
1331                                format!(
1332                                    " {} ({})",
1333                                    FormatBytes(stats.data_compressed + stats.meta_compressed),
1334                                    FormatBytes(stats.data + stats.meta)
1335                                ),
1336                                format!(
1337                                    " {} ({})",
1338                                    FormatBytes(stats.data_compressed),
1339                                    FormatBytes(stats.data)
1340                                ),
1341                                format!(" {} x", stats.data_count,),
1342                                format!(
1343                                    "{} ({})",
1344                                    FormatBytes(
1345                                        stats
1346                                            .data_compressed
1347                                            .checked_div(stats.data_count)
1348                                            .unwrap_or(0)
1349                                    ),
1350                                    FormatBytes(
1351                                        stats.data.checked_div(stats.data_count).unwrap_or(0)
1352                                    ),
1353                                ),
1354                                format!(
1355                                    " {} ({})",
1356                                    FormatBytes(stats.meta_compressed),
1357                                    FormatBytes(stats.meta)
1358                                ),
1359                                format!(" {} x", stats.meta_count,),
1360                                format!(
1361                                    "{} ({})",
1362                                    FormatBytes(
1363                                        stats
1364                                            .meta_compressed
1365                                            .checked_div(stats.meta_count)
1366                                            .unwrap_or(0)
1367                                    ),
1368                                    FormatBytes(
1369                                        stats.meta.checked_div(stats.meta_count).unwrap_or(0)
1370                                    ),
1371                                ),
1372                                format!(" {}", stats.upper_count),
1373                                format!(" {}", stats.collectibles_count),
1374                                format!(" {}", stats.aggregated_collectibles_count),
1375                                format!(" {}", stats.children_count),
1376                                format!(" {}", stats.followers_count),
1377                                format!(" {}", stats.collectibles_dependents_count),
1378                                format!(" {}", stats.aggregated_dirty_containers_count),
1379                                format!(" {}", FormatBytes(stats.output_size)),
1380                            ]
1381                        },
1382                    );
1383                }
1384            }
1385        }
1386
1387        let elapsed = start.elapsed();
1388        // avoid spamming the event queue with information about fast operations
1389        if elapsed > Duration::from_secs(10) {
1390            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1391                "Finished writing to filesystem cache".to_string(),
1392                elapsed,
1393            )));
1394        }
1395
1396        Some((snapshot_time, true))
1397    }
1398
1399    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1400        if self.should_restore() {
1401            // Continue all uncompleted operations
1402            // They can't be interrupted by a snapshot since the snapshotting job has not been
1403            // scheduled yet.
1404            let uncompleted_operations = self
1405                .backing_storage
1406                .uncompleted_operations()
1407                .expect("Failed to get uncompleted operations");
1408            if !uncompleted_operations.is_empty() {
1409                let mut ctx = self.execute_context(turbo_tasks);
1410                for op in uncompleted_operations {
1411                    op.execute(&mut ctx);
1412                }
1413            }
1414        }
1415
1416        // Only when it should write regularly to the storage, we schedule the initial snapshot
1417        // job.
1418        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1419            // Schedule the snapshot job
1420            let _span = trace_span!("persisting background job").entered();
1421            let _span = tracing::info_span!("thread").entered();
1422            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1423        }
1424    }
1425
1426    fn stopping(&self) {
1427        self.stopping.store(true, Ordering::Release);
1428        self.stopping_event.notify(usize::MAX);
1429    }
1430
1431    #[allow(unused_variables)]
1432    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1433        #[cfg(feature = "verify_aggregation_graph")]
1434        {
1435            self.is_idle.store(false, Ordering::Release);
1436            self.verify_aggregation_graph(turbo_tasks, false);
1437        }
1438        if self.should_persist() {
1439            self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1440        }
1441        drop_contents(&self.task_cache);
1442        self.storage.drop_contents();
1443        if let Err(err) = self.backing_storage.shutdown() {
1444            println!("Shutting down failed: {err}");
1445        }
1446    }
1447
1448    #[allow(unused_variables)]
1449    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1450        self.idle_start_event.notify(usize::MAX);
1451
1452        #[cfg(feature = "verify_aggregation_graph")]
1453        {
1454            use tokio::select;
1455
1456            self.is_idle.store(true, Ordering::Release);
1457            let this = self.clone();
1458            let turbo_tasks = turbo_tasks.pin();
1459            tokio::task::spawn(async move {
1460                select! {
1461                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1462                        // do nothing
1463                    }
1464                    _ = this.idle_end_event.listen() => {
1465                        return;
1466                    }
1467                }
1468                if !this.is_idle.load(Ordering::Relaxed) {
1469                    return;
1470                }
1471                this.verify_aggregation_graph(&*turbo_tasks, true);
1472            });
1473        }
1474    }
1475
1476    fn idle_end(&self) {
1477        #[cfg(feature = "verify_aggregation_graph")]
1478        self.is_idle.store(false, Ordering::Release);
1479        self.idle_end_event.notify(usize::MAX);
1480    }
1481
1482    fn get_or_create_persistent_task(
1483        &self,
1484        task_type: CachedTaskType,
1485        parent_task: Option<TaskId>,
1486        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1487    ) -> TaskId {
1488        let is_root = task_type.native_fn.is_root;
1489
1490        // First check if the task exists in the cache which only uses a read lock
1491        if let Some(task_id) = self.task_cache.get(&task_type) {
1492            let task_id = *task_id;
1493            self.track_cache_hit(&task_type);
1494            self.connect_child(
1495                parent_task,
1496                task_id,
1497                Some(ArcOrOwned::Owned(task_type)),
1498                turbo_tasks,
1499            );
1500            return task_id;
1501        }
1502
1503        let check_backing_storage =
1504            self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1505        let tx = check_backing_storage
1506            .then(|| self.backing_storage.start_read_transaction())
1507            .flatten();
1508        let mut is_new = false;
1509        let (task_id, task_type) = {
1510            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1511            if let Some(task_id) = unsafe {
1512                check_backing_storage
1513                    .then(|| {
1514                        self.backing_storage
1515                            .forward_lookup_task_cache(tx.as_ref(), &task_type)
1516                            .expect("Failed to lookup task id")
1517                    })
1518                    .flatten()
1519            } {
1520                // Task exists in backing storage
1521                // So we only need to insert it into the in-memory cache
1522                self.track_cache_hit(&task_type);
1523                let task_type = match raw_entry(&self.task_cache, &task_type) {
1524                    RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1525                    RawEntry::Vacant(e) => {
1526                        let task_type = Arc::new(task_type);
1527                        e.insert(task_type.clone(), task_id);
1528                        ArcOrOwned::Arc(task_type)
1529                    }
1530                };
1531                (task_id, task_type)
1532            } else {
1533                // Task doesn't exist in memory cache or backing storage
1534                // So we might need to create a new task
1535                let (task_id, mut task_type) = match raw_entry(&self.task_cache, &task_type) {
1536                    RawEntry::Occupied(e) => {
1537                        let task_id = *e.get();
1538                        drop(e);
1539                        self.track_cache_hit(&task_type);
1540                        (task_id, ArcOrOwned::Owned(task_type))
1541                    }
1542                    RawEntry::Vacant(e) => {
1543                        let task_type = Arc::new(task_type);
1544                        let task_id = self.persisted_task_id_factory.get();
1545                        e.insert(task_type.clone(), task_id);
1546                        self.track_cache_miss(&task_type);
1547                        is_new = true;
1548                        (task_id, ArcOrOwned::Arc(task_type))
1549                    }
1550                };
1551                if let Some(log) = &self.persisted_task_cache_log {
1552                    let task_type_arc: Arc<CachedTaskType> = Arc::from(task_type);
1553                    log.lock(task_id).push((task_type_arc.clone(), task_id));
1554                    task_type = ArcOrOwned::Arc(task_type_arc);
1555                }
1556                (task_id, task_type)
1557            }
1558        };
1559
1560        if is_new && is_root {
1561            let mut ctx = self.execute_context(turbo_tasks);
1562            AggregationUpdateQueue::run(
1563                AggregationUpdateJob::UpdateAggregationNumber {
1564                    task_id,
1565                    base_aggregation_number: u32::MAX,
1566                    distance: None,
1567                },
1568                &mut ctx,
1569            );
1570        }
1571
1572        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1573        unsafe {
1574            self.connect_child_with_tx(
1575                tx.as_ref(),
1576                parent_task,
1577                task_id,
1578                Some(task_type),
1579                turbo_tasks,
1580            )
1581        };
1582
1583        task_id
1584    }
1585
1586    fn get_or_create_transient_task(
1587        &self,
1588        task_type: CachedTaskType,
1589        parent_task: Option<TaskId>,
1590        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1591    ) -> TaskId {
1592        let is_root = task_type.native_fn.is_root;
1593
1594        if let Some(parent_task) = parent_task
1595            && !parent_task.is_transient()
1596        {
1597            self.panic_persistent_calling_transient(
1598                self.debug_get_task_description(parent_task),
1599                Some(&task_type),
1600                /* cell_id */ None,
1601            );
1602        }
1603        // First check if the task exists in the cache which only uses a read lock
1604        if let Some(task_id) = self.task_cache.get(&task_type) {
1605            let task_id = *task_id;
1606            self.track_cache_hit(&task_type);
1607            self.connect_child(
1608                parent_task,
1609                task_id,
1610                Some(ArcOrOwned::Owned(task_type)),
1611                turbo_tasks,
1612            );
1613            return task_id;
1614        }
1615        // If not, acquire a write lock and double check / insert
1616        match raw_entry(&self.task_cache, &task_type) {
1617            RawEntry::Occupied(e) => {
1618                let task_id = *e.get();
1619                drop(e);
1620                self.track_cache_hit(&task_type);
1621                self.connect_child(
1622                    parent_task,
1623                    task_id,
1624                    Some(ArcOrOwned::Owned(task_type)),
1625                    turbo_tasks,
1626                );
1627                task_id
1628            }
1629            RawEntry::Vacant(e) => {
1630                let task_type = Arc::new(task_type);
1631                let task_id = self.transient_task_id_factory.get();
1632                e.insert(task_type.clone(), task_id);
1633                self.track_cache_miss(&task_type);
1634
1635                if is_root {
1636                    let mut ctx = self.execute_context(turbo_tasks);
1637                    AggregationUpdateQueue::run(
1638                        AggregationUpdateJob::UpdateAggregationNumber {
1639                            task_id,
1640                            base_aggregation_number: u32::MAX,
1641                            distance: None,
1642                        },
1643                        &mut ctx,
1644                    );
1645                }
1646
1647                self.connect_child(
1648                    parent_task,
1649                    task_id,
1650                    Some(ArcOrOwned::Arc(task_type)),
1651                    turbo_tasks,
1652                );
1653
1654                task_id
1655            }
1656        }
1657    }
1658
1659    /// Generate an object that implements [`fmt::Display`] explaining why the given
1660    /// [`CachedTaskType`] is transient.
1661    fn debug_trace_transient_task(
1662        &self,
1663        task_type: &CachedTaskType,
1664        cell_id: Option<CellId>,
1665    ) -> DebugTraceTransientTask {
1666        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1667        // from tracing the same task many times, so use a visited_set
1668        fn inner_id(
1669            backend: &TurboTasksBackendInner<impl BackingStorage>,
1670            task_id: TaskId,
1671            cell_type_id: Option<ValueTypeId>,
1672            visited_set: &mut FxHashSet<TaskId>,
1673        ) -> DebugTraceTransientTask {
1674            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1675                if visited_set.contains(&task_id) {
1676                    let task_name = task_type.get_name();
1677                    DebugTraceTransientTask::Collapsed {
1678                        task_name,
1679                        cell_type_id,
1680                    }
1681                } else {
1682                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1683                }
1684            } else {
1685                DebugTraceTransientTask::Uncached { cell_type_id }
1686            }
1687        }
1688        fn inner_cached(
1689            backend: &TurboTasksBackendInner<impl BackingStorage>,
1690            task_type: &CachedTaskType,
1691            cell_type_id: Option<ValueTypeId>,
1692            visited_set: &mut FxHashSet<TaskId>,
1693        ) -> DebugTraceTransientTask {
1694            let task_name = task_type.get_name();
1695
1696            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1697                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1698                    // `task_id` should never be `None` at this point, as that would imply a
1699                    // non-local task is returning a local `Vc`...
1700                    // Just ignore if it happens, as we're likely already panicking.
1701                    return None;
1702                };
1703                if task_id.is_transient() {
1704                    Some(Box::new(inner_id(
1705                        backend,
1706                        task_id,
1707                        cause_self_raw_vc.try_get_type_id(),
1708                        visited_set,
1709                    )))
1710                } else {
1711                    None
1712                }
1713            });
1714            let cause_args = task_type
1715                .arg
1716                .get_raw_vcs()
1717                .into_iter()
1718                .filter_map(|raw_vc| {
1719                    let Some(task_id) = raw_vc.try_get_task_id() else {
1720                        // `task_id` should never be `None` (see comment above)
1721                        return None;
1722                    };
1723                    if !task_id.is_transient() {
1724                        return None;
1725                    }
1726                    Some((task_id, raw_vc.try_get_type_id()))
1727                })
1728                .collect::<IndexSet<_>>() // dedupe
1729                .into_iter()
1730                .map(|(task_id, cell_type_id)| {
1731                    inner_id(backend, task_id, cell_type_id, visited_set)
1732                })
1733                .collect();
1734
1735            DebugTraceTransientTask::Cached {
1736                task_name,
1737                cell_type_id,
1738                cause_self,
1739                cause_args,
1740            }
1741        }
1742        inner_cached(
1743            self,
1744            task_type,
1745            cell_id.map(|c| c.type_id),
1746            &mut FxHashSet::default(),
1747        )
1748    }
1749
1750    fn invalidate_task(
1751        &self,
1752        task_id: TaskId,
1753        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1754    ) {
1755        if !self.should_track_dependencies() {
1756            panic!("Dependency tracking is disabled so invalidation is not allowed");
1757        }
1758        operation::InvalidateOperation::run(
1759            smallvec![task_id],
1760            #[cfg(feature = "trace_task_dirty")]
1761            TaskDirtyCause::Invalidator,
1762            self.execute_context(turbo_tasks),
1763        );
1764    }
1765
1766    fn invalidate_tasks(
1767        &self,
1768        tasks: &[TaskId],
1769        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1770    ) {
1771        if !self.should_track_dependencies() {
1772            panic!("Dependency tracking is disabled so invalidation is not allowed");
1773        }
1774        operation::InvalidateOperation::run(
1775            tasks.iter().copied().collect(),
1776            #[cfg(feature = "trace_task_dirty")]
1777            TaskDirtyCause::Unknown,
1778            self.execute_context(turbo_tasks),
1779        );
1780    }
1781
1782    fn invalidate_tasks_set(
1783        &self,
1784        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1785        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1786    ) {
1787        if !self.should_track_dependencies() {
1788            panic!("Dependency tracking is disabled so invalidation is not allowed");
1789        }
1790        operation::InvalidateOperation::run(
1791            tasks.iter().copied().collect(),
1792            #[cfg(feature = "trace_task_dirty")]
1793            TaskDirtyCause::Unknown,
1794            self.execute_context(turbo_tasks),
1795        );
1796    }
1797
1798    fn invalidate_serialization(
1799        &self,
1800        task_id: TaskId,
1801        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1802    ) {
1803        if task_id.is_transient() {
1804            return;
1805        }
1806        let mut ctx = self.execute_context(turbo_tasks);
1807        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1808        task.invalidate_serialization();
1809    }
1810
1811    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1812        let task = self.storage.access_mut(task_id);
1813        if let Some(value) = task.get_persistent_task_type() {
1814            format!("{task_id:?} {}", value)
1815        } else if let Some(value) = task.get_transient_task_type() {
1816            format!("{task_id:?} {}", value)
1817        } else {
1818            format!("{task_id:?} unknown")
1819        }
1820    }
1821
1822    fn get_task_name(
1823        &self,
1824        task_id: TaskId,
1825        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1826    ) -> String {
1827        let mut ctx = self.execute_context(turbo_tasks);
1828        let task = ctx.task(task_id, TaskDataCategory::Data);
1829        if let Some(value) = task.get_persistent_task_type() {
1830            value.to_string()
1831        } else if let Some(value) = task.get_transient_task_type() {
1832            value.to_string()
1833        } else {
1834            "unknown".to_string()
1835        }
1836    }
1837
1838    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1839        let task = self.storage.access_mut(task_id);
1840        task.get_persistent_task_type().cloned()
1841    }
1842
1843    fn task_execution_canceled(
1844        &self,
1845        task_id: TaskId,
1846        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1847    ) {
1848        let mut ctx = self.execute_context(turbo_tasks);
1849        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1850        if let Some(in_progress) = task.take_in_progress() {
1851            match in_progress {
1852                InProgressState::Scheduled {
1853                    done_event,
1854                    reason: _,
1855                } => done_event.notify(usize::MAX),
1856                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1857                    done_event.notify(usize::MAX)
1858                }
1859                InProgressState::Canceled => {}
1860            }
1861        }
1862        let old = task.set_in_progress(InProgressState::Canceled);
1863        debug_assert!(old.is_none(), "InProgress already exists");
1864    }
1865
1866    fn try_start_task_execution(
1867        &self,
1868        task_id: TaskId,
1869        priority: TaskPriority,
1870        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1871    ) -> Option<TaskExecutionSpec<'_>> {
1872        let execution_reason;
1873        let task_type;
1874        {
1875            let mut ctx = self.execute_context(turbo_tasks);
1876            let mut task = ctx.task(task_id, TaskDataCategory::All);
1877            task_type = task.get_task_type().to_owned();
1878            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1879            if let Some(tasks) = task.prefetch() {
1880                drop(task);
1881                ctx.prepare_tasks(tasks);
1882                task = ctx.task(task_id, TaskDataCategory::All);
1883            }
1884            let in_progress = task.take_in_progress()?;
1885            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1886                let old = task.set_in_progress(in_progress);
1887                debug_assert!(old.is_none(), "InProgress already exists");
1888                return None;
1889            };
1890            execution_reason = reason;
1891            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1892                InProgressStateInner {
1893                    stale: false,
1894                    once_task,
1895                    done_event,
1896                    session_dependent: false,
1897                    marked_as_completed: false,
1898                    new_children: Default::default(),
1899                },
1900            )));
1901            debug_assert!(old.is_none(), "InProgress already exists");
1902
1903            // Make all current collectibles outdated (remove left-over outdated collectibles)
1904            enum Collectible {
1905                Current(CollectibleRef, i32),
1906                Outdated(CollectibleRef),
1907            }
1908            let collectibles = task
1909                .iter_collectibles()
1910                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1911                .chain(
1912                    task.iter_outdated_collectibles()
1913                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1914                )
1915                .collect::<Vec<_>>();
1916            for collectible in collectibles {
1917                match collectible {
1918                    Collectible::Current(collectible, value) => {
1919                        let _ = task.insert_outdated_collectible(collectible, value);
1920                    }
1921                    Collectible::Outdated(collectible) => {
1922                        if task
1923                            .collectibles()
1924                            .is_none_or(|m| m.get(&collectible).is_none())
1925                        {
1926                            task.remove_outdated_collectibles(&collectible);
1927                        }
1928                    }
1929                }
1930            }
1931
1932            if self.should_track_dependencies() {
1933                // Make all dependencies outdated
1934                let cell_dependencies = task.iter_cell_dependencies().collect();
1935                task.set_outdated_cell_dependencies(cell_dependencies);
1936
1937                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1938                task.set_outdated_output_dependencies(outdated_output_dependencies);
1939            }
1940        }
1941
1942        let (span, future) = match task_type {
1943            TaskType::Cached(task_type) => {
1944                let CachedTaskType {
1945                    native_fn,
1946                    this,
1947                    arg,
1948                } = &*task_type;
1949                (
1950                    native_fn.span(task_id.persistence(), execution_reason, priority),
1951                    native_fn.execute(*this, &**arg),
1952                )
1953            }
1954            TaskType::Transient(task_type) => {
1955                let span = tracing::trace_span!("turbo_tasks::root_task");
1956                let future = match &*task_type {
1957                    TransientTask::Root(f) => f(),
1958                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1959                };
1960                (span, future)
1961            }
1962        };
1963        Some(TaskExecutionSpec { future, span })
1964    }
1965
1966    fn task_execution_completed(
1967        &self,
1968        task_id: TaskId,
1969        result: Result<RawVc, TurboTasksExecutionError>,
1970        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1971        #[cfg(feature = "verify_determinism")] stateful: bool,
1972        has_invalidator: bool,
1973        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1974    ) -> bool {
1975        // Task completion is a 4 step process:
1976        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1977        //    aggregation number of the task and the new children.
1978        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1979        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1980        // 4. Shrink the task memory to reduce footprint of the task.
1981
1982        // Due to persistence it is possible that the process is cancelled after any step. This is
1983        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1984        // in-memory representation.
1985
1986        // The task might be invalidated during this process, so we need to check the stale flag
1987        // at the start of every step.
1988
1989        #[cfg(not(feature = "trace_task_details"))]
1990        let span = tracing::trace_span!(
1991            "task execution completed",
1992            new_children = tracing::field::Empty
1993        )
1994        .entered();
1995        #[cfg(feature = "trace_task_details")]
1996        let span = tracing::trace_span!(
1997            "task execution completed",
1998            task_id = display(task_id),
1999            result = match result.as_ref() {
2000                Ok(value) => display(either::Either::Left(value)),
2001                Err(err) => display(either::Either::Right(err)),
2002            },
2003            new_children = tracing::field::Empty,
2004            immutable = tracing::field::Empty,
2005            new_output = tracing::field::Empty,
2006            output_dependents = tracing::field::Empty,
2007            stale = tracing::field::Empty,
2008        )
2009        .entered();
2010
2011        let is_error = result.is_err();
2012
2013        let mut ctx = self.execute_context(turbo_tasks);
2014
2015        let Some(TaskExecutionCompletePrepareResult {
2016            new_children,
2017            is_now_immutable,
2018            #[cfg(feature = "verify_determinism")]
2019            no_output_set,
2020            new_output,
2021            output_dependent_tasks,
2022        }) = self.task_execution_completed_prepare(
2023            &mut ctx,
2024            #[cfg(feature = "trace_task_details")]
2025            &span,
2026            task_id,
2027            result,
2028            cell_counters,
2029            #[cfg(feature = "verify_determinism")]
2030            stateful,
2031            has_invalidator,
2032        )
2033        else {
2034            // Task was stale and has been rescheduled
2035            #[cfg(feature = "trace_task_details")]
2036            span.record("stale", "prepare");
2037            return true;
2038        };
2039
2040        #[cfg(feature = "trace_task_details")]
2041        span.record("new_output", new_output.is_some());
2042        #[cfg(feature = "trace_task_details")]
2043        span.record("output_dependents", output_dependent_tasks.len());
2044
2045        // When restoring from filesystem cache the following might not be executed (since we can
2046        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2047        // would be executed again.
2048
2049        if !output_dependent_tasks.is_empty() {
2050            self.task_execution_completed_invalidate_output_dependent(
2051                &mut ctx,
2052                task_id,
2053                output_dependent_tasks,
2054            );
2055        }
2056
2057        let has_new_children = !new_children.is_empty();
2058        span.record("new_children", new_children.len());
2059
2060        if has_new_children {
2061            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2062        }
2063
2064        if has_new_children
2065            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2066        {
2067            // Task was stale and has been rescheduled
2068            #[cfg(feature = "trace_task_details")]
2069            span.record("stale", "connect");
2070            return true;
2071        }
2072
2073        let (stale, in_progress_cells) = self.task_execution_completed_finish(
2074            &mut ctx,
2075            task_id,
2076            #[cfg(feature = "verify_determinism")]
2077            no_output_set,
2078            new_output,
2079            is_now_immutable,
2080        );
2081        if stale {
2082            // Task was stale and has been rescheduled
2083            #[cfg(feature = "trace_task_details")]
2084            span.record("stale", "finish");
2085            return true;
2086        }
2087
2088        let removed_data =
2089            self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2090
2091        // Drop data outside of critical sections
2092        drop(removed_data);
2093        drop(in_progress_cells);
2094
2095        false
2096    }
2097
2098    fn task_execution_completed_prepare(
2099        &self,
2100        ctx: &mut impl ExecuteContext<'_>,
2101        #[cfg(feature = "trace_task_details")] span: &Span,
2102        task_id: TaskId,
2103        result: Result<RawVc, TurboTasksExecutionError>,
2104        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2105        #[cfg(feature = "verify_determinism")] stateful: bool,
2106        has_invalidator: bool,
2107    ) -> Option<TaskExecutionCompletePrepareResult> {
2108        let mut task = ctx.task(task_id, TaskDataCategory::All);
2109        let Some(in_progress) = task.get_in_progress_mut() else {
2110            panic!("Task execution completed, but task is not in progress: {task:#?}");
2111        };
2112        if matches!(in_progress, InProgressState::Canceled) {
2113            return Some(TaskExecutionCompletePrepareResult {
2114                new_children: Default::default(),
2115                is_now_immutable: false,
2116                #[cfg(feature = "verify_determinism")]
2117                no_output_set: false,
2118                new_output: None,
2119                output_dependent_tasks: Default::default(),
2120            });
2121        }
2122        let &mut InProgressState::InProgress(box InProgressStateInner {
2123            stale,
2124            ref mut new_children,
2125            session_dependent,
2126            once_task: is_once_task,
2127            ..
2128        }) = in_progress
2129        else {
2130            panic!("Task execution completed, but task is not in progress: {task:#?}");
2131        };
2132
2133        // If the task is stale, reschedule it
2134        #[cfg(not(feature = "no_fast_stale"))]
2135        if stale && !is_once_task {
2136            let Some(InProgressState::InProgress(box InProgressStateInner {
2137                done_event,
2138                mut new_children,
2139                ..
2140            })) = task.take_in_progress()
2141            else {
2142                unreachable!();
2143            };
2144            let old = task.set_in_progress(InProgressState::Scheduled {
2145                done_event,
2146                reason: TaskExecutionReason::Stale,
2147            });
2148            debug_assert!(old.is_none(), "InProgress already exists");
2149            // Remove old children from new_children to leave only the children that had their
2150            // active count increased
2151            for task in task.iter_children() {
2152                new_children.remove(&task);
2153            }
2154            drop(task);
2155
2156            // We need to undo the active count increase for the children since we throw away the
2157            // new_children list now.
2158            AggregationUpdateQueue::run(
2159                AggregationUpdateJob::DecreaseActiveCounts {
2160                    task_ids: new_children.into_iter().collect(),
2161                },
2162                ctx,
2163            );
2164            return None;
2165        }
2166
2167        // take the children from the task to process them
2168        let mut new_children = take(new_children);
2169
2170        // handle stateful (only tracked when verify_determinism is enabled)
2171        #[cfg(feature = "verify_determinism")]
2172        if stateful {
2173            task.set_stateful(true);
2174        }
2175
2176        // handle has_invalidator
2177        if has_invalidator {
2178            task.set_invalidator(true);
2179        }
2180
2181        // handle cell counters: update max index and remove cells that are no longer used
2182        let old_counters: FxHashMap<_, _> = task
2183            .iter_cell_type_max_index()
2184            .map(|(&k, &v)| (k, v))
2185            .collect();
2186        let mut counters_to_remove = old_counters.clone();
2187
2188        for (&cell_type, &max_index) in cell_counters.iter() {
2189            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2190                if old_max_index != max_index {
2191                    task.insert_cell_type_max_index(cell_type, max_index);
2192                }
2193            } else {
2194                task.insert_cell_type_max_index(cell_type, max_index);
2195            }
2196        }
2197        for (cell_type, _) in counters_to_remove {
2198            task.remove_cell_type_max_index(&cell_type);
2199        }
2200
2201        let mut queue = AggregationUpdateQueue::new();
2202
2203        let mut old_edges = Vec::new();
2204
2205        let has_children = !new_children.is_empty();
2206        let is_immutable = task.immutable();
2207        let task_dependencies_for_immutable =
2208            // Task was previously marked as immutable
2209            if !is_immutable
2210            // Task is not session dependent (session dependent tasks can change between sessions)
2211            && !session_dependent
2212            // Task has no invalidator
2213            && !task.invalidator()
2214            // Task has no dependencies on collectibles
2215            && task.is_collectibles_dependencies_empty()
2216        {
2217            Some(
2218                // Collect all dependencies on tasks to check if all dependencies are immutable
2219                task.iter_output_dependencies()
2220                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2221                    .collect::<FxHashSet<_>>(),
2222            )
2223        } else {
2224            None
2225        };
2226
2227        if has_children {
2228            // Prepare all new children
2229            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2230
2231            // Filter actual new children
2232            old_edges.extend(
2233                task.iter_children()
2234                    .filter(|task| !new_children.remove(task))
2235                    .map(OutdatedEdge::Child),
2236            );
2237        } else {
2238            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2239        }
2240
2241        old_edges.extend(
2242            task.iter_outdated_collectibles()
2243                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2244        );
2245
2246        if self.should_track_dependencies() {
2247            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2248            // At execution start, active deps are copied to outdated as a "before" snapshot.
2249            // During execution, new deps are added to active.
2250            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2251            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2252            // breaking dependency tracking.
2253            old_edges.extend(
2254                task.iter_outdated_cell_dependencies()
2255                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2256            );
2257            old_edges.extend(
2258                task.iter_outdated_output_dependencies()
2259                    .map(OutdatedEdge::OutputDependency),
2260            );
2261        }
2262
2263        // Check if output need to be updated
2264        let current_output = task.get_output();
2265        #[cfg(feature = "verify_determinism")]
2266        let no_output_set = current_output.is_none();
2267        let new_output = match result {
2268            Ok(RawVc::TaskOutput(output_task_id)) => {
2269                if let Some(OutputValue::Output(current_task_id)) = current_output
2270                    && *current_task_id == output_task_id
2271                {
2272                    None
2273                } else {
2274                    Some(OutputValue::Output(output_task_id))
2275                }
2276            }
2277            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2278                if let Some(OutputValue::Cell(CellRef {
2279                    task: current_task_id,
2280                    cell: current_cell,
2281                })) = current_output
2282                    && *current_task_id == output_task_id
2283                    && *current_cell == cell
2284                {
2285                    None
2286                } else {
2287                    Some(OutputValue::Cell(CellRef {
2288                        task: output_task_id,
2289                        cell,
2290                    }))
2291                }
2292            }
2293            Ok(RawVc::LocalOutput(..)) => {
2294                panic!("Non-local tasks must not return a local Vc");
2295            }
2296            Err(err) => {
2297                if let Some(OutputValue::Error(old_error)) = current_output
2298                    && **old_error == err
2299                {
2300                    None
2301                } else {
2302                    Some(OutputValue::Error(Arc::new((&err).into())))
2303                }
2304            }
2305        };
2306        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2307        // When output has changed, grab the dependent tasks
2308        if new_output.is_some() && ctx.should_track_dependencies() {
2309            output_dependent_tasks = task.iter_output_dependent().collect();
2310        }
2311
2312        drop(task);
2313
2314        // Check if the task can be marked as immutable
2315        let mut is_now_immutable = false;
2316        if let Some(dependencies) = task_dependencies_for_immutable
2317            && dependencies
2318                .iter()
2319                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2320        {
2321            is_now_immutable = true;
2322        }
2323        #[cfg(feature = "trace_task_details")]
2324        span.record("immutable", is_immutable || is_now_immutable);
2325
2326        if !queue.is_empty() || !old_edges.is_empty() {
2327            #[cfg(feature = "trace_task_completion")]
2328            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2329            // Remove outdated edges first, before removing in_progress+dirty flag.
2330            // We need to make sure all outdated edges are removed before the task can potentially
2331            // be scheduled and executed again
2332            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2333        }
2334
2335        Some(TaskExecutionCompletePrepareResult {
2336            new_children,
2337            is_now_immutable,
2338            #[cfg(feature = "verify_determinism")]
2339            no_output_set,
2340            new_output,
2341            output_dependent_tasks,
2342        })
2343    }
2344
2345    fn task_execution_completed_invalidate_output_dependent(
2346        &self,
2347        ctx: &mut impl ExecuteContext<'_>,
2348        task_id: TaskId,
2349        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2350    ) {
2351        debug_assert!(!output_dependent_tasks.is_empty());
2352
2353        if output_dependent_tasks.len() > 1 {
2354            ctx.prepare_tasks(
2355                output_dependent_tasks
2356                    .iter()
2357                    .map(|&id| (id, TaskDataCategory::All)),
2358            );
2359        }
2360
2361        fn process_output_dependents(
2362            ctx: &mut impl ExecuteContext<'_>,
2363            task_id: TaskId,
2364            dependent_task_id: TaskId,
2365            queue: &mut AggregationUpdateQueue,
2366        ) {
2367            #[cfg(feature = "trace_task_output_dependencies")]
2368            let span = tracing::trace_span!(
2369                "invalidate output dependency",
2370                task = %task_id,
2371                dependent_task = %dependent_task_id,
2372                result = tracing::field::Empty,
2373            )
2374            .entered();
2375            let mut make_stale = true;
2376            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2377            let transient_task_type = dependent.get_transient_task_type();
2378            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2379                // once tasks are never invalidated
2380                #[cfg(feature = "trace_task_output_dependencies")]
2381                span.record("result", "once task");
2382                return;
2383            }
2384            if dependent.outdated_output_dependencies_contains(&task_id) {
2385                #[cfg(feature = "trace_task_output_dependencies")]
2386                span.record("result", "outdated dependency");
2387                // output dependency is outdated, so it hasn't read the output yet
2388                // and doesn't need to be invalidated
2389                // But importantly we still need to make the task dirty as it should no longer
2390                // be considered as "recomputation".
2391                make_stale = false;
2392            } else if !dependent.output_dependencies_contains(&task_id) {
2393                // output dependency has been removed, so the task doesn't depend on the
2394                // output anymore and doesn't need to be invalidated
2395                #[cfg(feature = "trace_task_output_dependencies")]
2396                span.record("result", "no backward dependency");
2397                return;
2398            }
2399            make_task_dirty_internal(
2400                dependent,
2401                dependent_task_id,
2402                make_stale,
2403                #[cfg(feature = "trace_task_dirty")]
2404                TaskDirtyCause::OutputChange { task_id },
2405                queue,
2406                ctx,
2407            );
2408            #[cfg(feature = "trace_task_output_dependencies")]
2409            span.record("result", "marked dirty");
2410        }
2411
2412        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2413            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2414            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2415            let _ = scope_and_block(chunks.len(), |scope| {
2416                for chunk in chunks {
2417                    let child_ctx = ctx.child_context();
2418                    scope.spawn(move || {
2419                        let mut ctx = child_ctx.create();
2420                        let mut queue = AggregationUpdateQueue::new();
2421                        for dependent_task_id in chunk {
2422                            process_output_dependents(
2423                                &mut ctx,
2424                                task_id,
2425                                dependent_task_id,
2426                                &mut queue,
2427                            )
2428                        }
2429                        queue.execute(&mut ctx);
2430                    });
2431                }
2432            });
2433        } else {
2434            let mut queue = AggregationUpdateQueue::new();
2435            for dependent_task_id in output_dependent_tasks {
2436                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2437            }
2438            queue.execute(ctx);
2439        }
2440    }
2441
2442    fn task_execution_completed_unfinished_children_dirty(
2443        &self,
2444        ctx: &mut impl ExecuteContext<'_>,
2445        new_children: &FxHashSet<TaskId>,
2446    ) {
2447        debug_assert!(!new_children.is_empty());
2448
2449        let mut queue = AggregationUpdateQueue::new();
2450        ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2451            if !child_task.has_output() {
2452                let child_id = child_task.id();
2453                make_task_dirty_internal(
2454                    child_task,
2455                    child_id,
2456                    false,
2457                    #[cfg(feature = "trace_task_dirty")]
2458                    TaskDirtyCause::InitialDirty,
2459                    &mut queue,
2460                    ctx,
2461                );
2462            }
2463        });
2464
2465        queue.execute(ctx);
2466    }
2467
2468    fn task_execution_completed_connect(
2469        &self,
2470        ctx: &mut impl ExecuteContext<'_>,
2471        task_id: TaskId,
2472        new_children: FxHashSet<TaskId>,
2473    ) -> bool {
2474        debug_assert!(!new_children.is_empty());
2475
2476        let mut task = ctx.task(task_id, TaskDataCategory::All);
2477        let Some(in_progress) = task.get_in_progress() else {
2478            panic!("Task execution completed, but task is not in progress: {task:#?}");
2479        };
2480        if matches!(in_progress, InProgressState::Canceled) {
2481            // Task was canceled in the meantime, so we don't connect the children
2482            return false;
2483        }
2484        let InProgressState::InProgress(box InProgressStateInner {
2485            #[cfg(not(feature = "no_fast_stale"))]
2486            stale,
2487            once_task: is_once_task,
2488            ..
2489        }) = in_progress
2490        else {
2491            panic!("Task execution completed, but task is not in progress: {task:#?}");
2492        };
2493
2494        // If the task is stale, reschedule it
2495        #[cfg(not(feature = "no_fast_stale"))]
2496        if *stale && !is_once_task {
2497            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2498                task.take_in_progress()
2499            else {
2500                unreachable!();
2501            };
2502            let old = task.set_in_progress(InProgressState::Scheduled {
2503                done_event,
2504                reason: TaskExecutionReason::Stale,
2505            });
2506            debug_assert!(old.is_none(), "InProgress already exists");
2507            drop(task);
2508
2509            // All `new_children` are currently hold active with an active count and we need to undo
2510            // that. (We already filtered out the old children from that list)
2511            AggregationUpdateQueue::run(
2512                AggregationUpdateJob::DecreaseActiveCounts {
2513                    task_ids: new_children.into_iter().collect(),
2514                },
2515                ctx,
2516            );
2517            return true;
2518        }
2519
2520        let has_active_count = ctx.should_track_activeness()
2521            && task
2522                .get_activeness()
2523                .is_some_and(|activeness| activeness.active_counter > 0);
2524        connect_children(
2525            ctx,
2526            task_id,
2527            task,
2528            new_children,
2529            has_active_count,
2530            ctx.should_track_activeness(),
2531        );
2532
2533        false
2534    }
2535
2536    fn task_execution_completed_finish(
2537        &self,
2538        ctx: &mut impl ExecuteContext<'_>,
2539        task_id: TaskId,
2540        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2541        new_output: Option<OutputValue>,
2542        is_now_immutable: bool,
2543    ) -> (
2544        bool,
2545        Option<
2546            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2547        >,
2548    ) {
2549        let mut task = ctx.task(task_id, TaskDataCategory::All);
2550        let Some(in_progress) = task.take_in_progress() else {
2551            panic!("Task execution completed, but task is not in progress: {task:#?}");
2552        };
2553        if matches!(in_progress, InProgressState::Canceled) {
2554            // Task was canceled in the meantime, so we don't finish it
2555            return (false, None);
2556        }
2557        let InProgressState::InProgress(box InProgressStateInner {
2558            done_event,
2559            once_task: is_once_task,
2560            stale,
2561            session_dependent,
2562            marked_as_completed: _,
2563            new_children,
2564        }) = in_progress
2565        else {
2566            panic!("Task execution completed, but task is not in progress: {task:#?}");
2567        };
2568        debug_assert!(new_children.is_empty());
2569
2570        // If the task is stale, reschedule it
2571        if stale && !is_once_task {
2572            let old = task.set_in_progress(InProgressState::Scheduled {
2573                done_event,
2574                reason: TaskExecutionReason::Stale,
2575            });
2576            debug_assert!(old.is_none(), "InProgress already exists");
2577            return (true, None);
2578        }
2579
2580        // Set the output if it has changed
2581        let mut old_content = None;
2582        if let Some(value) = new_output {
2583            old_content = task.set_output(value);
2584        }
2585
2586        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2587        // to be invalidated and we can mark it as immutable.
2588        if is_now_immutable {
2589            task.set_immutable(true);
2590        }
2591
2592        // Notify in progress cells and remove all of them
2593        let in_progress_cells = task.take_in_progress_cells();
2594        if let Some(ref cells) = in_progress_cells {
2595            for state in cells.values() {
2596                state.event.notify(usize::MAX);
2597            }
2598        }
2599
2600        // Grab the old dirty state
2601        let old_dirtyness = task.get_dirty().cloned();
2602        let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2603            None => (false, false),
2604            Some(Dirtyness::Dirty(_)) => (true, false),
2605            Some(Dirtyness::SessionDependent) => {
2606                let clean_in_current_session = task.current_session_clean();
2607                (true, clean_in_current_session)
2608            }
2609        };
2610
2611        // Compute the new dirty state
2612        let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2613            (Some(Dirtyness::SessionDependent), true, true)
2614        } else {
2615            (None, false, false)
2616        };
2617
2618        // Update the dirty state
2619        let dirty_changed = old_dirtyness != new_dirtyness;
2620        if dirty_changed {
2621            if let Some(value) = new_dirtyness {
2622                task.set_dirty(value);
2623            } else if old_dirtyness.is_some() {
2624                task.take_dirty();
2625            }
2626        }
2627        if old_current_session_self_clean != new_current_session_self_clean {
2628            if new_current_session_self_clean {
2629                task.set_current_session_clean(true);
2630            } else if old_current_session_self_clean {
2631                task.set_current_session_clean(false);
2632            }
2633        }
2634
2635        // Propagate dirtyness changes
2636        let data_update = if old_self_dirty != new_self_dirty
2637            || old_current_session_self_clean != new_current_session_self_clean
2638        {
2639            let dirty_container_count = task
2640                .get_aggregated_dirty_container_count()
2641                .cloned()
2642                .unwrap_or_default();
2643            let current_session_clean_container_count = task
2644                .get_aggregated_current_session_clean_container_count()
2645                .copied()
2646                .unwrap_or_default();
2647            let result = ComputeDirtyAndCleanUpdate {
2648                old_dirty_container_count: dirty_container_count,
2649                new_dirty_container_count: dirty_container_count,
2650                old_current_session_clean_container_count: current_session_clean_container_count,
2651                new_current_session_clean_container_count: current_session_clean_container_count,
2652                old_self_dirty,
2653                new_self_dirty,
2654                old_current_session_self_clean,
2655                new_current_session_self_clean,
2656            }
2657            .compute();
2658            if result.dirty_count_update - result.current_session_clean_update < 0 {
2659                // The task is clean now
2660                if let Some(activeness_state) = task.get_activeness_mut() {
2661                    activeness_state.all_clean_event.notify(usize::MAX);
2662                    activeness_state.unset_active_until_clean();
2663                    if activeness_state.is_empty() {
2664                        task.take_activeness();
2665                    }
2666                }
2667            }
2668            result
2669                .aggregated_update(task_id)
2670                .and_then(|aggregated_update| {
2671                    AggregationUpdateJob::data_update(&mut task, aggregated_update)
2672                })
2673        } else {
2674            None
2675        };
2676
2677        #[cfg(feature = "verify_determinism")]
2678        let reschedule =
2679            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2680        #[cfg(not(feature = "verify_determinism"))]
2681        let reschedule = false;
2682        if reschedule {
2683            let old = task.set_in_progress(InProgressState::Scheduled {
2684                done_event,
2685                reason: TaskExecutionReason::Stale,
2686            });
2687            debug_assert!(old.is_none(), "InProgress already exists");
2688            drop(task);
2689        } else {
2690            drop(task);
2691
2692            // Notify dependent tasks that are waiting for this task to finish
2693            done_event.notify(usize::MAX);
2694        }
2695
2696        drop(old_content);
2697
2698        if let Some(data_update) = data_update {
2699            AggregationUpdateQueue::run(data_update, ctx);
2700        }
2701
2702        // We return so the data can be dropped outside of critical sections
2703        (reschedule, in_progress_cells)
2704    }
2705
2706    fn task_execution_completed_cleanup(
2707        &self,
2708        ctx: &mut impl ExecuteContext<'_>,
2709        task_id: TaskId,
2710        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2711        is_error: bool,
2712    ) -> Vec<SharedReference> {
2713        let mut task = ctx.task(task_id, TaskDataCategory::All);
2714        let mut removed_cell_data = Vec::new();
2715        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2716        // after an error as it is likely transient and we want to keep the dependent tasks
2717        // clean to avoid re-executions.
2718        if !is_error {
2719            // Remove no longer existing cells and
2720            // find all outdated data items (removed cells, outdated edges)
2721            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2722            // anyway and we want to avoid needless re-executions. When the cells become
2723            // used again, they are invalidated from the update cell operation.
2724            // Remove cell data for cells that no longer exist
2725            let to_remove_persistent: Vec<_> = task
2726                .iter_persistent_cell_data()
2727                .filter_map(|(cell, _)| {
2728                    cell_counters
2729                        .get(&cell.type_id)
2730                        .is_none_or(|start_index| cell.index >= *start_index)
2731                        .then_some(*cell)
2732                })
2733                .collect();
2734
2735            // Remove transient cell data for cells that no longer exist
2736            let to_remove_transient: Vec<_> = task
2737                .iter_transient_cell_data()
2738                .filter_map(|(cell, _)| {
2739                    cell_counters
2740                        .get(&cell.type_id)
2741                        .is_none_or(|start_index| cell.index >= *start_index)
2742                        .then_some(*cell)
2743                })
2744                .collect();
2745            removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2746            for cell in to_remove_persistent {
2747                if let Some(data) = task.remove_persistent_cell_data(&cell) {
2748                    removed_cell_data.push(data.into_untyped());
2749                }
2750            }
2751            for cell in to_remove_transient {
2752                if let Some(data) = task.remove_transient_cell_data(&cell) {
2753                    removed_cell_data.push(data);
2754                }
2755            }
2756        }
2757
2758        // Clean up task storage after execution:
2759        // - Shrink collections marked with shrink_on_completion
2760        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2761        task.cleanup_after_execution();
2762
2763        drop(task);
2764
2765        // Return so we can drop outside of critical sections
2766        removed_cell_data
2767    }
2768
2769    fn run_backend_job<'a>(
2770        self: &'a Arc<Self>,
2771        job: TurboTasksBackendJob,
2772        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2773    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2774        Box::pin(async move {
2775            match job {
2776                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2777                    debug_assert!(self.should_persist());
2778
2779                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2780                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2781                    let mut idle_start_listener = self.idle_start_event.listen();
2782                    let mut idle_end_listener = self.idle_end_event.listen();
2783                    let mut fresh_idle = true;
2784                    loop {
2785                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2786                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2787                        let idle_timeout = *IDLE_TIMEOUT;
2788                        let (time, mut reason) =
2789                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2790                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2791                            } else {
2792                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2793                            };
2794
2795                        let until = last_snapshot + time;
2796                        if until > Instant::now() {
2797                            let mut stop_listener = self.stopping_event.listen();
2798                            if self.stopping.load(Ordering::Acquire) {
2799                                return;
2800                            }
2801                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2802                                Instant::now() + idle_timeout
2803                            } else {
2804                                far_future()
2805                            };
2806                            loop {
2807                                tokio::select! {
2808                                    _ = &mut stop_listener => {
2809                                        return;
2810                                    },
2811                                    _ = &mut idle_start_listener => {
2812                                        fresh_idle = true;
2813                                        idle_time = Instant::now() + idle_timeout;
2814                                        idle_start_listener = self.idle_start_event.listen()
2815                                    },
2816                                    _ = &mut idle_end_listener => {
2817                                        idle_time = until + idle_timeout;
2818                                        idle_end_listener = self.idle_end_event.listen()
2819                                    },
2820                                    _ = tokio::time::sleep_until(until) => {
2821                                        break;
2822                                    },
2823                                    _ = tokio::time::sleep_until(idle_time) => {
2824                                        if turbo_tasks.is_idle() {
2825                                            reason = "idle timeout";
2826                                            break;
2827                                        }
2828                                    },
2829                                }
2830                            }
2831                        }
2832
2833                        let this = self.clone();
2834                        let snapshot = this.snapshot_and_persist(None, reason, turbo_tasks);
2835                        if let Some((snapshot_start, new_data)) = snapshot {
2836                            last_snapshot = snapshot_start;
2837                            if !new_data {
2838                                fresh_idle = false;
2839                                continue;
2840                            }
2841                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2842                            self.last_snapshot.store(
2843                                last_snapshot.as_millis().try_into().unwrap(),
2844                                Ordering::Relaxed,
2845                            );
2846
2847                            turbo_tasks.schedule_backend_background_job(
2848                                TurboTasksBackendJob::FollowUpSnapshot,
2849                            );
2850                            return;
2851                        }
2852                    }
2853                }
2854            }
2855        })
2856    }
2857
2858    fn try_read_own_task_cell(
2859        &self,
2860        task_id: TaskId,
2861        cell: CellId,
2862        options: ReadCellOptions,
2863        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2864    ) -> Result<TypedCellContent> {
2865        let mut ctx = self.execute_context(turbo_tasks);
2866        let task = ctx.task(task_id, TaskDataCategory::Data);
2867        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2868            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2869            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2870        } else {
2871            Ok(CellContent(None).into_typed(cell.type_id))
2872        }
2873    }
2874
2875    fn read_task_collectibles(
2876        &self,
2877        task_id: TaskId,
2878        collectible_type: TraitTypeId,
2879        reader_id: Option<TaskId>,
2880        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2881    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2882        let mut ctx = self.execute_context(turbo_tasks);
2883        let mut collectibles = AutoMap::default();
2884        {
2885            let mut task = ctx.task(task_id, TaskDataCategory::All);
2886            // Ensure it's an root node
2887            loop {
2888                let aggregation_number = get_aggregation_number(&task);
2889                if is_root_node(aggregation_number) {
2890                    break;
2891                }
2892                drop(task);
2893                AggregationUpdateQueue::run(
2894                    AggregationUpdateJob::UpdateAggregationNumber {
2895                        task_id,
2896                        base_aggregation_number: u32::MAX,
2897                        distance: None,
2898                    },
2899                    &mut ctx,
2900                );
2901                task = ctx.task(task_id, TaskDataCategory::All);
2902            }
2903            for (collectible, count) in task.iter_aggregated_collectibles() {
2904                if *count > 0 && collectible.collectible_type == collectible_type {
2905                    *collectibles
2906                        .entry(RawVc::TaskCell(
2907                            collectible.cell.task,
2908                            collectible.cell.cell,
2909                        ))
2910                        .or_insert(0) += 1;
2911                }
2912            }
2913            for (&collectible, &count) in task.iter_collectibles() {
2914                if collectible.collectible_type == collectible_type {
2915                    *collectibles
2916                        .entry(RawVc::TaskCell(
2917                            collectible.cell.task,
2918                            collectible.cell.cell,
2919                        ))
2920                        .or_insert(0) += count;
2921                }
2922            }
2923            if let Some(reader_id) = reader_id {
2924                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2925            }
2926        }
2927        if let Some(reader_id) = reader_id {
2928            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2929            let target = CollectiblesRef {
2930                task: task_id,
2931                collectible_type,
2932            };
2933            if !reader.remove_outdated_collectibles_dependencies(&target) {
2934                let _ = reader.add_collectibles_dependencies(target);
2935            }
2936        }
2937        collectibles
2938    }
2939
2940    fn emit_collectible(
2941        &self,
2942        collectible_type: TraitTypeId,
2943        collectible: RawVc,
2944        task_id: TaskId,
2945        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2946    ) {
2947        self.assert_valid_collectible(task_id, collectible);
2948
2949        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2950            panic!("Collectibles need to be resolved");
2951        };
2952        let cell = CellRef {
2953            task: collectible_task,
2954            cell,
2955        };
2956        operation::UpdateCollectibleOperation::run(
2957            task_id,
2958            CollectibleRef {
2959                collectible_type,
2960                cell,
2961            },
2962            1,
2963            self.execute_context(turbo_tasks),
2964        );
2965    }
2966
2967    fn unemit_collectible(
2968        &self,
2969        collectible_type: TraitTypeId,
2970        collectible: RawVc,
2971        count: u32,
2972        task_id: TaskId,
2973        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2974    ) {
2975        self.assert_valid_collectible(task_id, collectible);
2976
2977        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2978            panic!("Collectibles need to be resolved");
2979        };
2980        let cell = CellRef {
2981            task: collectible_task,
2982            cell,
2983        };
2984        operation::UpdateCollectibleOperation::run(
2985            task_id,
2986            CollectibleRef {
2987                collectible_type,
2988                cell,
2989            },
2990            -(i32::try_from(count).unwrap()),
2991            self.execute_context(turbo_tasks),
2992        );
2993    }
2994
2995    fn update_task_cell(
2996        &self,
2997        task_id: TaskId,
2998        cell: CellId,
2999        is_serializable_cell_content: bool,
3000        content: CellContent,
3001        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3002        verification_mode: VerificationMode,
3003        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3004    ) {
3005        operation::UpdateCellOperation::run(
3006            task_id,
3007            cell,
3008            content,
3009            is_serializable_cell_content,
3010            updated_key_hashes,
3011            verification_mode,
3012            self.execute_context(turbo_tasks),
3013        );
3014    }
3015
3016    fn mark_own_task_as_session_dependent(
3017        &self,
3018        task_id: TaskId,
3019        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3020    ) {
3021        if !self.should_track_dependencies() {
3022            // Without dependency tracking we don't need session dependent tasks
3023            return;
3024        }
3025        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3026        let mut ctx = self.execute_context(turbo_tasks);
3027        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3028        let aggregation_number = get_aggregation_number(&task);
3029        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3030            drop(task);
3031            // We want to use a high aggregation number to avoid large aggregation chains for
3032            // session dependent tasks (which change on every run)
3033            AggregationUpdateQueue::run(
3034                AggregationUpdateJob::UpdateAggregationNumber {
3035                    task_id,
3036                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3037                    distance: None,
3038                },
3039                &mut ctx,
3040            );
3041            task = ctx.task(task_id, TaskDataCategory::Meta);
3042        }
3043        if let Some(InProgressState::InProgress(box InProgressStateInner {
3044            session_dependent,
3045            ..
3046        })) = task.get_in_progress_mut()
3047        {
3048            *session_dependent = true;
3049        }
3050    }
3051
3052    fn mark_own_task_as_finished(
3053        &self,
3054        task: TaskId,
3055        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3056    ) {
3057        let mut ctx = self.execute_context(turbo_tasks);
3058        let mut task = ctx.task(task, TaskDataCategory::Data);
3059        if let Some(InProgressState::InProgress(box InProgressStateInner {
3060            marked_as_completed,
3061            ..
3062        })) = task.get_in_progress_mut()
3063        {
3064            *marked_as_completed = true;
3065            // TODO this should remove the dirty state (also check session_dependent)
3066            // but this would break some assumptions for strongly consistent reads.
3067            // Client tasks are not connected yet, so we wouldn't wait for them.
3068            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3069        }
3070    }
3071
3072    fn connect_task(
3073        &self,
3074        task: TaskId,
3075        parent_task: Option<TaskId>,
3076        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3077    ) {
3078        self.assert_not_persistent_calling_transient(parent_task, task, None);
3079        ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3080    }
3081
3082    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3083        let task_id = self.transient_task_id_factory.get();
3084        {
3085            let mut task = self.storage.access_mut(task_id);
3086            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3087        }
3088        #[cfg(feature = "verify_aggregation_graph")]
3089        self.root_tasks.lock().insert(task_id);
3090        task_id
3091    }
3092
3093    fn dispose_root_task(
3094        &self,
3095        task_id: TaskId,
3096        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3097    ) {
3098        #[cfg(feature = "verify_aggregation_graph")]
3099        self.root_tasks.lock().remove(&task_id);
3100
3101        let mut ctx = self.execute_context(turbo_tasks);
3102        let mut task = ctx.task(task_id, TaskDataCategory::All);
3103        let is_dirty = task.is_dirty();
3104        let has_dirty_containers = task.has_dirty_containers();
3105        if is_dirty.is_some() || has_dirty_containers {
3106            if let Some(activeness_state) = task.get_activeness_mut() {
3107                // We will finish the task, but it would be removed after the task is done
3108                activeness_state.unset_root_type();
3109                activeness_state.set_active_until_clean();
3110            };
3111        } else if let Some(activeness_state) = task.take_activeness() {
3112            // Technically nobody should be listening to this event, but just in case
3113            // we notify it anyway
3114            activeness_state.all_clean_event.notify(usize::MAX);
3115        }
3116    }
3117
3118    #[cfg(feature = "verify_aggregation_graph")]
3119    fn verify_aggregation_graph(
3120        &self,
3121        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3122        idle: bool,
3123    ) {
3124        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3125            return;
3126        }
3127        use std::{collections::VecDeque, env, io::stdout};
3128
3129        use crate::backend::operation::{get_uppers, is_aggregating_node};
3130
3131        let mut ctx = self.execute_context(turbo_tasks);
3132        let root_tasks = self.root_tasks.lock().clone();
3133
3134        for task_id in root_tasks.into_iter() {
3135            let mut queue = VecDeque::new();
3136            let mut visited = FxHashSet::default();
3137            let mut aggregated_nodes = FxHashSet::default();
3138            let mut collectibles = FxHashMap::default();
3139            let root_task_id = task_id;
3140            visited.insert(task_id);
3141            aggregated_nodes.insert(task_id);
3142            queue.push_back(task_id);
3143            let mut counter = 0;
3144            while let Some(task_id) = queue.pop_front() {
3145                counter += 1;
3146                if counter % 100000 == 0 {
3147                    println!(
3148                        "queue={}, visited={}, aggregated_nodes={}",
3149                        queue.len(),
3150                        visited.len(),
3151                        aggregated_nodes.len()
3152                    );
3153                }
3154                let task = ctx.task(task_id, TaskDataCategory::All);
3155                if idle && !self.is_idle.load(Ordering::Relaxed) {
3156                    return;
3157                }
3158
3159                let uppers = get_uppers(&task);
3160                if task_id != root_task_id
3161                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3162                {
3163                    panic!(
3164                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3165                         {:?})",
3166                        task_id,
3167                        task.get_task_description(),
3168                        uppers
3169                    );
3170                }
3171
3172                for (collectible, _) in task.iter_aggregated_collectibles() {
3173                    collectibles
3174                        .entry(*collectible)
3175                        .or_insert_with(|| (false, Vec::new()))
3176                        .1
3177                        .push(task_id);
3178                }
3179
3180                for (&collectible, &value) in task.iter_collectibles() {
3181                    if value > 0 {
3182                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3183                            *flag = true
3184                        } else {
3185                            panic!(
3186                                "Task {} has a collectible {:?} that is not in any upper task",
3187                                task_id, collectible
3188                            );
3189                        }
3190                    }
3191                }
3192
3193                let is_dirty = task.has_dirty();
3194                let has_dirty_container = task.has_dirty_containers();
3195                let should_be_in_upper = is_dirty || has_dirty_container;
3196
3197                let aggregation_number = get_aggregation_number(&task);
3198                if is_aggregating_node(aggregation_number) {
3199                    aggregated_nodes.insert(task_id);
3200                }
3201                // println!(
3202                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3203                //     ctx.get_task_description(task_id),
3204                //     uppers
3205                // );
3206
3207                for child_id in task.iter_children() {
3208                    // println!("{task_id}: child -> {child_id}");
3209                    if visited.insert(child_id) {
3210                        queue.push_back(child_id);
3211                    }
3212                }
3213                drop(task);
3214
3215                if should_be_in_upper {
3216                    for upper_id in uppers {
3217                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3218                        let in_upper = upper
3219                            .get_aggregated_dirty_containers(&task_id)
3220                            .is_some_and(|&dirty| dirty > 0);
3221                        if !in_upper {
3222                            let containers: Vec<_> = upper
3223                                .iter_aggregated_dirty_containers()
3224                                .map(|(&k, &v)| (k, v))
3225                                .collect();
3226                            let upper_task_desc = upper.get_task_description();
3227                            drop(upper);
3228                            panic!(
3229                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3230                                 ({})\nThese dirty containers are present:\n{:#?}",
3231                                task_id,
3232                                ctx.task(task_id, TaskDataCategory::Data)
3233                                    .get_task_description(),
3234                                upper_id,
3235                                upper_task_desc,
3236                                containers,
3237                            );
3238                        }
3239                    }
3240                }
3241            }
3242
3243            for (collectible, (flag, task_ids)) in collectibles {
3244                if !flag {
3245                    use std::io::Write;
3246                    let mut stdout = stdout().lock();
3247                    writeln!(
3248                        stdout,
3249                        "{:?} that is not emitted in any child task but in these aggregated \
3250                         tasks: {:#?}",
3251                        collectible,
3252                        task_ids
3253                            .iter()
3254                            .map(|t| format!(
3255                                "{t} {}",
3256                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3257                            ))
3258                            .collect::<Vec<_>>()
3259                    )
3260                    .unwrap();
3261
3262                    let task_id = collectible.cell.task;
3263                    let mut queue = {
3264                        let task = ctx.task(task_id, TaskDataCategory::All);
3265                        get_uppers(&task)
3266                    };
3267                    let mut visited = FxHashSet::default();
3268                    for &upper_id in queue.iter() {
3269                        visited.insert(upper_id);
3270                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3271                    }
3272                    while let Some(task_id) = queue.pop() {
3273                        let task = ctx.task(task_id, TaskDataCategory::All);
3274                        let desc = task.get_task_description();
3275                        let aggregated_collectible = task
3276                            .get_aggregated_collectibles(&collectible)
3277                            .copied()
3278                            .unwrap_or_default();
3279                        let uppers = get_uppers(&task);
3280                        drop(task);
3281                        writeln!(
3282                            stdout,
3283                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3284                        )
3285                        .unwrap();
3286                        if task_ids.contains(&task_id) {
3287                            writeln!(
3288                                stdout,
3289                                "Task has an upper connection to an aggregated task that doesn't \
3290                                 reference it. Upper connection is invalid!"
3291                            )
3292                            .unwrap();
3293                        }
3294                        for upper_id in uppers {
3295                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3296                            if !visited.contains(&upper_id) {
3297                                queue.push(upper_id);
3298                            }
3299                        }
3300                    }
3301                    panic!("See stdout for more details");
3302                }
3303            }
3304        }
3305    }
3306
3307    fn assert_not_persistent_calling_transient(
3308        &self,
3309        parent_id: Option<TaskId>,
3310        child_id: TaskId,
3311        cell_id: Option<CellId>,
3312    ) {
3313        if let Some(parent_id) = parent_id
3314            && !parent_id.is_transient()
3315            && child_id.is_transient()
3316        {
3317            self.panic_persistent_calling_transient(
3318                self.debug_get_task_description(parent_id),
3319                self.debug_get_cached_task_type(child_id).as_deref(),
3320                cell_id,
3321            );
3322        }
3323    }
3324
3325    fn panic_persistent_calling_transient(
3326        &self,
3327        parent: String,
3328        child: Option<&CachedTaskType>,
3329        cell_id: Option<CellId>,
3330    ) {
3331        let transient_reason = if let Some(child) = child {
3332            Cow::Owned(format!(
3333                " The callee is transient because it depends on:\n{}",
3334                self.debug_trace_transient_task(child, cell_id),
3335            ))
3336        } else {
3337            Cow::Borrowed("")
3338        };
3339        panic!(
3340            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3341            parent,
3342            child.map_or("unknown", |t| t.get_name()),
3343            transient_reason,
3344        );
3345    }
3346
3347    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3348        // these checks occur in a potentially hot codepath, but they're cheap
3349        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3350            // This should never happen: The collectible APIs use ResolvedVc
3351            let task_info = if let Some(col_task_ty) = collectible
3352                .try_get_task_id()
3353                .map(|t| self.debug_get_task_description(t))
3354            {
3355                Cow::Owned(format!(" (return type of {col_task_ty})"))
3356            } else {
3357                Cow::Borrowed("")
3358            };
3359            panic!("Collectible{task_info} must be a ResolvedVc")
3360        };
3361        if col_task_id.is_transient() && !task_id.is_transient() {
3362            let transient_reason =
3363                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3364                    Cow::Owned(format!(
3365                        ". The collectible is transient because it depends on:\n{}",
3366                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3367                    ))
3368                } else {
3369                    Cow::Borrowed("")
3370                };
3371            // this should never happen: How would a persistent function get a transient Vc?
3372            panic!(
3373                "Collectible is transient, transient collectibles cannot be emitted from \
3374                 persistent tasks{transient_reason}",
3375            )
3376        }
3377    }
3378}
3379
3380impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3381    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3382        self.0.startup(turbo_tasks);
3383    }
3384
3385    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3386        self.0.stopping();
3387    }
3388
3389    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3390        self.0.stop(turbo_tasks);
3391    }
3392
3393    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3394        self.0.idle_start(turbo_tasks);
3395    }
3396
3397    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3398        self.0.idle_end();
3399    }
3400
3401    fn get_or_create_persistent_task(
3402        &self,
3403        task_type: CachedTaskType,
3404        parent_task: Option<TaskId>,
3405        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3406    ) -> TaskId {
3407        self.0
3408            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3409    }
3410
3411    fn get_or_create_transient_task(
3412        &self,
3413        task_type: CachedTaskType,
3414        parent_task: Option<TaskId>,
3415        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3416    ) -> TaskId {
3417        self.0
3418            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3419    }
3420
3421    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3422        self.0.invalidate_task(task_id, turbo_tasks);
3423    }
3424
3425    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3426        self.0.invalidate_tasks(tasks, turbo_tasks);
3427    }
3428
3429    fn invalidate_tasks_set(
3430        &self,
3431        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3432        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3433    ) {
3434        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3435    }
3436
3437    fn invalidate_serialization(
3438        &self,
3439        task_id: TaskId,
3440        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3441    ) {
3442        self.0.invalidate_serialization(task_id, turbo_tasks);
3443    }
3444
3445    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3446        self.0.task_execution_canceled(task, turbo_tasks)
3447    }
3448
3449    fn try_start_task_execution(
3450        &self,
3451        task_id: TaskId,
3452        priority: TaskPriority,
3453        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3454    ) -> Option<TaskExecutionSpec<'_>> {
3455        self.0
3456            .try_start_task_execution(task_id, priority, turbo_tasks)
3457    }
3458
3459    fn task_execution_completed(
3460        &self,
3461        task_id: TaskId,
3462        result: Result<RawVc, TurboTasksExecutionError>,
3463        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3464        #[cfg(feature = "verify_determinism")] stateful: bool,
3465        has_invalidator: bool,
3466        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3467    ) -> bool {
3468        self.0.task_execution_completed(
3469            task_id,
3470            result,
3471            cell_counters,
3472            #[cfg(feature = "verify_determinism")]
3473            stateful,
3474            has_invalidator,
3475            turbo_tasks,
3476        )
3477    }
3478
3479    type BackendJob = TurboTasksBackendJob;
3480
3481    fn run_backend_job<'a>(
3482        &'a self,
3483        job: Self::BackendJob,
3484        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3485    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3486        self.0.run_backend_job(job, turbo_tasks)
3487    }
3488
3489    fn try_read_task_output(
3490        &self,
3491        task_id: TaskId,
3492        reader: Option<TaskId>,
3493        options: ReadOutputOptions,
3494        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3495    ) -> Result<Result<RawVc, EventListener>> {
3496        self.0
3497            .try_read_task_output(task_id, reader, options, turbo_tasks)
3498    }
3499
3500    fn try_read_task_cell(
3501        &self,
3502        task_id: TaskId,
3503        cell: CellId,
3504        reader: Option<TaskId>,
3505        options: ReadCellOptions,
3506        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3507    ) -> Result<Result<TypedCellContent, EventListener>> {
3508        self.0
3509            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3510    }
3511
3512    fn try_read_own_task_cell(
3513        &self,
3514        task_id: TaskId,
3515        cell: CellId,
3516        options: ReadCellOptions,
3517        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3518    ) -> Result<TypedCellContent> {
3519        self.0
3520            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3521    }
3522
3523    fn read_task_collectibles(
3524        &self,
3525        task_id: TaskId,
3526        collectible_type: TraitTypeId,
3527        reader: Option<TaskId>,
3528        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3529    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3530        self.0
3531            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3532    }
3533
3534    fn emit_collectible(
3535        &self,
3536        collectible_type: TraitTypeId,
3537        collectible: RawVc,
3538        task_id: TaskId,
3539        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3540    ) {
3541        self.0
3542            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3543    }
3544
3545    fn unemit_collectible(
3546        &self,
3547        collectible_type: TraitTypeId,
3548        collectible: RawVc,
3549        count: u32,
3550        task_id: TaskId,
3551        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3552    ) {
3553        self.0
3554            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3555    }
3556
3557    fn update_task_cell(
3558        &self,
3559        task_id: TaskId,
3560        cell: CellId,
3561        is_serializable_cell_content: bool,
3562        content: CellContent,
3563        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3564        verification_mode: VerificationMode,
3565        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3566    ) {
3567        self.0.update_task_cell(
3568            task_id,
3569            cell,
3570            is_serializable_cell_content,
3571            content,
3572            updated_key_hashes,
3573            verification_mode,
3574            turbo_tasks,
3575        );
3576    }
3577
3578    fn mark_own_task_as_finished(
3579        &self,
3580        task_id: TaskId,
3581        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3582    ) {
3583        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3584    }
3585
3586    fn mark_own_task_as_session_dependent(
3587        &self,
3588        task: TaskId,
3589        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3590    ) {
3591        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3592    }
3593
3594    fn connect_task(
3595        &self,
3596        task: TaskId,
3597        parent_task: Option<TaskId>,
3598        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3599    ) {
3600        self.0.connect_task(task, parent_task, turbo_tasks);
3601    }
3602
3603    fn create_transient_task(
3604        &self,
3605        task_type: TransientTaskType,
3606        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3607    ) -> TaskId {
3608        self.0.create_transient_task(task_type)
3609    }
3610
3611    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3612        self.0.dispose_root_task(task_id, turbo_tasks);
3613    }
3614
3615    fn task_statistics(&self) -> &TaskStatisticsApi {
3616        &self.0.task_statistics
3617    }
3618
3619    fn is_tracking_dependencies(&self) -> bool {
3620        self.0.options.dependency_tracking
3621    }
3622
3623    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3624        self.0.get_task_name(task, turbo_tasks)
3625    }
3626}
3627
3628enum DebugTraceTransientTask {
3629    Cached {
3630        task_name: &'static str,
3631        cell_type_id: Option<ValueTypeId>,
3632        cause_self: Option<Box<DebugTraceTransientTask>>,
3633        cause_args: Vec<DebugTraceTransientTask>,
3634    },
3635    /// This representation is used when this task is a duplicate of one previously shown
3636    Collapsed {
3637        task_name: &'static str,
3638        cell_type_id: Option<ValueTypeId>,
3639    },
3640    Uncached {
3641        cell_type_id: Option<ValueTypeId>,
3642    },
3643}
3644
3645impl DebugTraceTransientTask {
3646    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3647        let indent = "    ".repeat(level);
3648        f.write_str(&indent)?;
3649
3650        fn fmt_cell_type_id(
3651            f: &mut fmt::Formatter<'_>,
3652            cell_type_id: Option<ValueTypeId>,
3653        ) -> fmt::Result {
3654            if let Some(ty) = cell_type_id {
3655                write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3656            } else {
3657                Ok(())
3658            }
3659        }
3660
3661        // write the name and type
3662        match self {
3663            Self::Cached {
3664                task_name,
3665                cell_type_id,
3666                ..
3667            }
3668            | Self::Collapsed {
3669                task_name,
3670                cell_type_id,
3671                ..
3672            } => {
3673                f.write_str(task_name)?;
3674                fmt_cell_type_id(f, *cell_type_id)?;
3675                if matches!(self, Self::Collapsed { .. }) {
3676                    f.write_str(" (collapsed)")?;
3677                }
3678            }
3679            Self::Uncached { cell_type_id } => {
3680                f.write_str("unknown transient task")?;
3681                fmt_cell_type_id(f, *cell_type_id)?;
3682            }
3683        }
3684        f.write_char('\n')?;
3685
3686        // write any extra "cause" information we might have
3687        if let Self::Cached {
3688            cause_self,
3689            cause_args,
3690            ..
3691        } = self
3692        {
3693            if let Some(c) = cause_self {
3694                writeln!(f, "{indent}  self:")?;
3695                c.fmt_indented(f, level + 1)?;
3696            }
3697            if !cause_args.is_empty() {
3698                writeln!(f, "{indent}  args:")?;
3699                for c in cause_args {
3700                    c.fmt_indented(f, level + 1)?;
3701                }
3702            }
3703        }
3704        Ok(())
3705    }
3706}
3707
3708impl fmt::Display for DebugTraceTransientTask {
3709    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3710        self.fmt_indented(f, 0)
3711    }
3712}
3713
3714// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3715fn far_future() -> Instant {
3716    // Roughly 30 years from now.
3717    // API does not provide a way to obtain max `Instant`
3718    // or convert specific date in the future to instant.
3719    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3720    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3721}
3722
3723/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3724/// buffer.
3725/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3726/// resulting buffer sizes.
3727fn encode_task_data(
3728    task: TaskId,
3729    data: &TaskStorage,
3730    category: SpecificTaskDataCategory,
3731    scratch_buffer: &mut TurboBincodeBuffer,
3732) -> Result<TurboBincodeBuffer> {
3733    scratch_buffer.clear();
3734    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3735    data.encode(category, &mut encoder)?;
3736
3737    if cfg!(feature = "verify_serialization") {
3738        TaskStorage::new()
3739            .decode(
3740                category,
3741                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3742            )
3743            .with_context(|| {
3744                format!(
3745                    "expected to be able to decode serialized data for '{category:?}' information \
3746                     for {task}"
3747                )
3748            })?;
3749    }
3750    Ok(SmallVec::from_slice(scratch_buffer))
3751}