Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7    borrow::Cow,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    pin::Pin,
13    sync::{
14        Arc, LazyLock,
15        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16    },
17    time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32    TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33    backend::{
34        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
35        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37        VerificationMode,
38    },
39    event::{Event, EventDescription, EventListener},
40    message_queue::{TimingEvent, TraceEvent},
41    registry::get_value_type,
42    scope::scope_and_block,
43    task_statistics::TaskStatisticsApi,
44    trace::TraceRawVcs,
45    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49    operation::AnyOperation,
50    storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55    backend::{
56        operation::{
57            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58            CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
59            ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
60            TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
61            is_root_node, make_task_dirty_internal, prepare_new_children,
62        },
63        storage::Storage,
64        storage_schema::{TaskStorage, TaskStorageAccessors},
65    },
66    backing_storage::{BackingStorage, SnapshotItem},
67    data::{
68        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69        InProgressState, InProgressStateInner, OutputValue, TransientTask,
70    },
71    error::TaskError,
72    utils::{
73        arc_or_owned::ArcOrOwned,
74        chunked_vec::ChunkedVec,
75        dash_map_drop_contents::drop_contents,
76        dash_map_raw_entry::{RawEntry, raw_entry},
77        ptr_eq_arc::PtrEqArc,
78        shard_amount::compute_shard_amount,
79        sharded::Sharded,
80        swap_retain,
81    },
82};
83
84/// Threshold for parallelizing making dependent tasks dirty.
85/// If the number of dependent tasks exceeds this threshold,
86/// the operation will be parallelized.
87const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
88
89const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
90
91/// Configurable idle timeout for snapshot persistence.
92/// Defaults to 2 seconds if not set or if the value is invalid.
93static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
94    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
95        .ok()
96        .and_then(|v| v.parse::<u64>().ok())
97        .map(Duration::from_millis)
98        .unwrap_or(Duration::from_secs(2))
99});
100
101struct SnapshotRequest {
102    snapshot_requested: bool,
103    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
104}
105
106impl SnapshotRequest {
107    fn new() -> Self {
108        Self {
109            snapshot_requested: false,
110            suspended_operations: FxHashSet::default(),
111        }
112    }
113}
114
115pub enum StorageMode {
116    /// Queries the storage for cache entries that don't exist locally.
117    ReadOnly,
118    /// Queries the storage for cache entries that don't exist locally.
119    /// Regularly pushes changes to the backing storage.
120    ReadWrite,
121    /// Queries the storage for cache entries that don't exist locally.
122    /// On shutdown, pushes all changes to the backing storage.
123    ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127    /// Enables dependency tracking.
128    ///
129    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
130    /// forever.
131    pub dependency_tracking: bool,
132
133    /// Enables active tracking.
134    ///
135    /// Automatically disabled when `dependency_tracking` is disabled.
136    ///
137    /// When disabled: All tasks are considered as active.
138    pub active_tracking: bool,
139
140    /// Enables the backing storage.
141    pub storage_mode: Option<StorageMode>,
142
143    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
144    /// datastructures. If `None`, it will use the available parallelism.
145    pub num_workers: Option<usize>,
146
147    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
148    pub small_preallocation: bool,
149}
150
151impl Default for BackendOptions {
152    fn default() -> Self {
153        Self {
154            dependency_tracking: true,
155            active_tracking: true,
156            storage_mode: Some(StorageMode::ReadWrite),
157            num_workers: None,
158            small_preallocation: false,
159        }
160    }
161}
162
163pub enum TurboTasksBackendJob {
164    InitialSnapshot,
165    FollowUpSnapshot,
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173    options: BackendOptions,
174
175    start_time: Instant,
176
177    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
178    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
179
180    persisted_task_cache_log: Option<TaskCacheLog>,
181    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
182
183    storage: Storage,
184
185    /// When true, the backing_storage has data that is not in the local storage.
186    /// This is determined once at startup and never changes.
187    local_is_partial: bool,
188
189    /// Number of executing operations + Highest bit is set when snapshot is
190    /// requested. When that bit is set, operations should pause until the
191    /// snapshot is completed. When the bit is set and in progress counter
192    /// reaches zero, `operations_completed_when_snapshot_requested` is
193    /// triggered.
194    in_progress_operations: AtomicUsize,
195
196    snapshot_request: Mutex<SnapshotRequest>,
197    /// Condition Variable that is triggered when `in_progress_operations`
198    /// reaches zero while snapshot is requested. All operations are either
199    /// completed or suspended.
200    operations_suspended: Condvar,
201    /// Condition Variable that is triggered when a snapshot is completed and
202    /// operations can continue.
203    snapshot_completed: Condvar,
204    /// The timestamp of the last started snapshot since [`Self::start_time`].
205    last_snapshot: AtomicU64,
206
207    stopping: AtomicBool,
208    stopping_event: Event,
209    idle_start_event: Event,
210    idle_end_event: Event,
211    #[cfg(feature = "verify_aggregation_graph")]
212    is_idle: AtomicBool,
213
214    task_statistics: TaskStatisticsApi,
215
216    backing_storage: B,
217
218    #[cfg(feature = "verify_aggregation_graph")]
219    root_tasks: Mutex<FxHashSet<TaskId>>,
220}
221
222impl<B: BackingStorage> TurboTasksBackend<B> {
223    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
224        Self(Arc::new(TurboTasksBackendInner::new(
225            options,
226            backing_storage,
227        )))
228    }
229
230    pub fn backing_storage(&self) -> &B {
231        &self.0.backing_storage
232    }
233}
234
235impl<B: BackingStorage> TurboTasksBackendInner<B> {
236    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
237        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
238        let need_log = matches!(
239            options.storage_mode,
240            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
241        );
242        if !options.dependency_tracking {
243            options.active_tracking = false;
244        }
245        let small_preallocation = options.small_preallocation;
246        let next_task_id = backing_storage
247            .next_free_task_id()
248            .expect("Failed to get task id");
249        Self {
250            options,
251            start_time: Instant::now(),
252            persisted_task_id_factory: IdFactoryWithReuse::new(
253                next_task_id,
254                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
255            ),
256            transient_task_id_factory: IdFactoryWithReuse::new(
257                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
258                TaskId::MAX,
259            ),
260            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
261            task_cache: FxDashMap::default(),
262            local_is_partial: next_task_id != TaskId::MIN,
263            storage: Storage::new(shard_amount, small_preallocation),
264            in_progress_operations: AtomicUsize::new(0),
265            snapshot_request: Mutex::new(SnapshotRequest::new()),
266            operations_suspended: Condvar::new(),
267            snapshot_completed: Condvar::new(),
268            last_snapshot: AtomicU64::new(0),
269            stopping: AtomicBool::new(false),
270            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
271            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
272            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
273            #[cfg(feature = "verify_aggregation_graph")]
274            is_idle: AtomicBool::new(false),
275            task_statistics: TaskStatisticsApi::default(),
276            backing_storage,
277            #[cfg(feature = "verify_aggregation_graph")]
278            root_tasks: Default::default(),
279        }
280    }
281
282    fn execute_context<'a>(
283        &'a self,
284        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
285    ) -> impl ExecuteContext<'a> {
286        ExecuteContextImpl::new(self, turbo_tasks)
287    }
288
289    fn suspending_requested(&self) -> bool {
290        self.should_persist()
291            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
292    }
293
294    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
295        #[cold]
296        fn operation_suspend_point_cold<B: BackingStorage>(
297            this: &TurboTasksBackendInner<B>,
298            suspend: impl FnOnce() -> AnyOperation,
299        ) {
300            let operation = Arc::new(suspend());
301            let mut snapshot_request = this.snapshot_request.lock();
302            if snapshot_request.snapshot_requested {
303                snapshot_request
304                    .suspended_operations
305                    .insert(operation.clone().into());
306                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
307                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
308                if value == SNAPSHOT_REQUESTED_BIT {
309                    this.operations_suspended.notify_all();
310                }
311                this.snapshot_completed
312                    .wait_while(&mut snapshot_request, |snapshot_request| {
313                        snapshot_request.snapshot_requested
314                    });
315                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316                snapshot_request
317                    .suspended_operations
318                    .remove(&operation.into());
319            }
320        }
321
322        if self.suspending_requested() {
323            operation_suspend_point_cold(self, suspend);
324        }
325    }
326
327    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
328        if !self.should_persist() {
329            return OperationGuard { backend: None };
330        }
331        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
332        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
333            let mut snapshot_request = self.snapshot_request.lock();
334            if snapshot_request.snapshot_requested {
335                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
336                if value == SNAPSHOT_REQUESTED_BIT {
337                    self.operations_suspended.notify_all();
338                }
339                self.snapshot_completed
340                    .wait_while(&mut snapshot_request, |snapshot_request| {
341                        snapshot_request.snapshot_requested
342                    });
343                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
344            }
345        }
346        OperationGuard {
347            backend: Some(self),
348        }
349    }
350
351    fn should_persist(&self) -> bool {
352        matches!(
353            self.options.storage_mode,
354            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
355        )
356    }
357
358    fn should_restore(&self) -> bool {
359        self.options.storage_mode.is_some()
360    }
361
362    fn should_track_dependencies(&self) -> bool {
363        self.options.dependency_tracking
364    }
365
366    /// Sets the initial aggregation number for a newly created task. Root tasks get `u32::MAX`
367    /// to stay at the top. Session-dependent tasks get a high (but not max) aggregation number
368    /// because they change on every session restore, behaving like dirty leaf nodes — keeping
369    /// them near the leaves prevents long dirty-propagation chains through intermediate
370    /// aggregated nodes.
371    fn set_initial_aggregation_number(
372        &self,
373        task_id: TaskId,
374        is_root: bool,
375        is_session_dependent: bool,
376        ctx: &mut impl ExecuteContext<'_>,
377    ) {
378        let base_aggregation_number = if is_root {
379            u32::MAX
380        } else if is_session_dependent && self.should_track_dependencies() {
381            const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
382            SESSION_DEPENDENT_AGGREGATION_NUMBER
383        } else {
384            return;
385        };
386
387        AggregationUpdateQueue::run(
388            AggregationUpdateJob::UpdateAggregationNumber {
389                task_id,
390                base_aggregation_number,
391                distance: None,
392            },
393            ctx,
394        );
395    }
396
397    fn should_track_activeness(&self) -> bool {
398        self.options.active_tracking
399    }
400
401    fn track_cache_hit(&self, task_type: &CachedTaskType) {
402        self.task_statistics
403            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
404    }
405
406    fn track_cache_miss(&self, task_type: &CachedTaskType) {
407        self.task_statistics
408            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
409    }
410
411    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
412    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
413    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
414    /// references for lazy name resolution.
415    fn task_error_to_turbo_tasks_execution_error(
416        &self,
417        error: &TaskError,
418        ctx: &mut impl ExecuteContext<'_>,
419    ) -> TurboTasksExecutionError {
420        match error {
421            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
422            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
423                message: item.message.clone(),
424                source: item
425                    .source
426                    .as_ref()
427                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
428            })),
429            TaskError::LocalTaskContext(local_task_context) => {
430                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
431                    name: local_task_context.name.clone(),
432                    source: local_task_context
433                        .source
434                        .as_ref()
435                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
436                }))
437            }
438            TaskError::TaskChain(chain) => {
439                let task_id = chain.last().unwrap();
440                let error = {
441                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
442                    if let Some(OutputValue::Error(error)) = task.get_output() {
443                        Some(error.clone())
444                    } else {
445                        None
446                    }
447                };
448                let error = error.map_or_else(
449                    || {
450                        // Eventual consistency will cause errors to no longer be available
451                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
452                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
453                                "Error no longer available",
454                            )),
455                            location: None,
456                        }))
457                    },
458                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
459                );
460                let mut current_error = error;
461                for &task_id in chain.iter().rev() {
462                    current_error =
463                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
464                            task_id,
465                            source: Some(current_error),
466                            turbo_tasks: ctx.turbo_tasks(),
467                        }));
468                }
469                current_error
470            }
471        }
472    }
473}
474
475pub(crate) struct OperationGuard<'a, B: BackingStorage> {
476    backend: Option<&'a TurboTasksBackendInner<B>>,
477}
478
479impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
480    fn drop(&mut self) {
481        if let Some(backend) = self.backend {
482            let fetch_sub = backend
483                .in_progress_operations
484                .fetch_sub(1, Ordering::AcqRel);
485            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
486                backend.operations_suspended.notify_all();
487            }
488        }
489    }
490}
491
492/// Intermediate result of step 1 of task execution completion.
493struct TaskExecutionCompletePrepareResult {
494    pub new_children: FxHashSet<TaskId>,
495    pub is_now_immutable: bool,
496    #[cfg(feature = "verify_determinism")]
497    pub no_output_set: bool,
498    pub new_output: Option<OutputValue>,
499    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
500}
501
502// Operations
503impl<B: BackingStorage> TurboTasksBackendInner<B> {
504    fn try_read_task_output(
505        self: &Arc<Self>,
506        task_id: TaskId,
507        reader: Option<TaskId>,
508        options: ReadOutputOptions,
509        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
510    ) -> Result<Result<RawVc, EventListener>> {
511        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
512
513        let mut ctx = self.execute_context(turbo_tasks);
514        let need_reader_task = if self.should_track_dependencies()
515            && !matches!(options.tracking, ReadTracking::Untracked)
516            && reader.is_some_and(|reader_id| reader_id != task_id)
517            && let Some(reader_id) = reader
518            && reader_id != task_id
519        {
520            Some(reader_id)
521        } else {
522            None
523        };
524        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
525            // Having a task_pair here is not optimal, but otherwise this would lead to a race
526            // condition. See below.
527            // TODO(sokra): solve that in a more performant way.
528            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
529            (task, Some(reader))
530        } else {
531            (ctx.task(task_id, TaskDataCategory::All), None)
532        };
533
534        fn listen_to_done_event(
535            reader_description: Option<EventDescription>,
536            tracking: ReadTracking,
537            done_event: &Event,
538        ) -> EventListener {
539            done_event.listen_with_note(move || {
540                move || {
541                    if let Some(reader_description) = reader_description.as_ref() {
542                        format!(
543                            "try_read_task_output from {} ({})",
544                            reader_description, tracking
545                        )
546                    } else {
547                        format!("try_read_task_output ({})", tracking)
548                    }
549                }
550            })
551        }
552
553        fn check_in_progress(
554            task: &impl TaskGuard,
555            reader_description: Option<EventDescription>,
556            tracking: ReadTracking,
557        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
558        {
559            match task.get_in_progress() {
560                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
561                    listen_to_done_event(reader_description, tracking, done_event),
562                ))),
563                Some(InProgressState::InProgress(box InProgressStateInner {
564                    done_event, ..
565                })) => Some(Ok(Err(listen_to_done_event(
566                    reader_description,
567                    tracking,
568                    done_event,
569                )))),
570                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
571                    "{} was canceled",
572                    task.get_task_description()
573                ))),
574                None => None,
575            }
576        }
577
578        if matches!(options.consistency, ReadConsistency::Strong) {
579            // Ensure it's an root node
580            loop {
581                let aggregation_number = get_aggregation_number(&task);
582                if is_root_node(aggregation_number) {
583                    break;
584                }
585                drop(task);
586                drop(reader_task);
587                {
588                    let _span = tracing::trace_span!(
589                        "make root node for strongly consistent read",
590                        %task_id
591                    )
592                    .entered();
593                    AggregationUpdateQueue::run(
594                        AggregationUpdateJob::UpdateAggregationNumber {
595                            task_id,
596                            base_aggregation_number: u32::MAX,
597                            distance: None,
598                        },
599                        &mut ctx,
600                    );
601                }
602                (task, reader_task) = if let Some(reader_id) = need_reader_task {
603                    // TODO(sokra): see comment above
604                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
605                    (task, Some(reader))
606                } else {
607                    (ctx.task(task_id, TaskDataCategory::All), None)
608                }
609            }
610
611            let is_dirty = task.is_dirty();
612
613            // Check the dirty count of the root node
614            let has_dirty_containers = task.has_dirty_containers();
615            if has_dirty_containers || is_dirty.is_some() {
616                let activeness = task.get_activeness_mut();
617                let mut task_ids_to_schedule: Vec<_> = Vec::new();
618                // When there are dirty task, subscribe to the all_clean_event
619                let activeness = if let Some(activeness) = activeness {
620                    // This makes sure all tasks stay active and this task won't stale.
621                    // active_until_clean is automatically removed when this
622                    // task is clean.
623                    activeness.set_active_until_clean();
624                    activeness
625                } else {
626                    // If we don't have a root state, add one. This also makes sure all tasks stay
627                    // active and this task won't stale. active_until_clean
628                    // is automatically removed when this task is clean.
629                    if ctx.should_track_activeness() {
630                        // A newly added Activeness need to make sure to schedule the tasks
631                        task_ids_to_schedule = task.dirty_containers().collect();
632                        task_ids_to_schedule.push(task_id);
633                    }
634                    let activeness =
635                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
636                    activeness.set_active_until_clean();
637                    activeness
638                };
639                let listener = activeness.all_clean_event.listen_with_note(move || {
640                    let this = self.clone();
641                    let tt = turbo_tasks.pin();
642                    move || {
643                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
644                        let mut ctx = this.execute_context(tt);
645                        let mut visited = FxHashSet::default();
646                        fn indent(s: &str) -> String {
647                            s.split_inclusive('\n')
648                                .flat_map(|line: &str| ["  ", line].into_iter())
649                                .collect::<String>()
650                        }
651                        fn get_info(
652                            ctx: &mut impl ExecuteContext<'_>,
653                            task_id: TaskId,
654                            parent_and_count: Option<(TaskId, i32)>,
655                            visited: &mut FxHashSet<TaskId>,
656                        ) -> String {
657                            let task = ctx.task(task_id, TaskDataCategory::All);
658                            let is_dirty = task.is_dirty();
659                            let in_progress =
660                                task.get_in_progress()
661                                    .map_or("not in progress", |p| match p {
662                                        InProgressState::InProgress(_) => "in progress",
663                                        InProgressState::Scheduled { .. } => "scheduled",
664                                        InProgressState::Canceled => "canceled",
665                                    });
666                            let activeness = task.get_activeness().map_or_else(
667                                || "not active".to_string(),
668                                |activeness| format!("{activeness:?}"),
669                            );
670                            let aggregation_number = get_aggregation_number(&task);
671                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
672                            {
673                                let uppers = get_uppers(&task);
674                                !uppers.contains(&parent_task_id)
675                            } else {
676                                false
677                            };
678
679                            // Check the dirty count of the root node
680                            let has_dirty_containers = task.has_dirty_containers();
681
682                            let task_description = task.get_task_description();
683                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
684                                format!(", dirty({parent_priority})")
685                            } else {
686                                String::new()
687                            };
688                            let has_dirty_containers_label = if has_dirty_containers {
689                                ", dirty containers"
690                            } else {
691                                ""
692                            };
693                            let count = if let Some((_, count)) = parent_and_count {
694                                format!(" {count}")
695                            } else {
696                                String::new()
697                            };
698                            let mut info = format!(
699                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
700                                 {in_progress}, \
701                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
702                            );
703                            let children: Vec<_> = task.dirty_containers_with_count().collect();
704                            drop(task);
705
706                            if missing_upper {
707                                info.push_str("\n  ERROR: missing upper connection");
708                            }
709
710                            if has_dirty_containers || !children.is_empty() {
711                                writeln!(info, "\n  dirty tasks:").unwrap();
712
713                                for (child_task_id, count) in children {
714                                    let task_description = ctx
715                                        .task(child_task_id, TaskDataCategory::Data)
716                                        .get_task_description();
717                                    if visited.insert(child_task_id) {
718                                        let child_info = get_info(
719                                            ctx,
720                                            child_task_id,
721                                            Some((task_id, count)),
722                                            visited,
723                                        );
724                                        info.push_str(&indent(&child_info));
725                                        if !info.ends_with('\n') {
726                                            info.push('\n');
727                                        }
728                                    } else {
729                                        writeln!(
730                                            info,
731                                            "  {child_task_id} {task_description} {count} \
732                                             (already visited)"
733                                        )
734                                        .unwrap();
735                                    }
736                                }
737                            }
738                            info
739                        }
740                        let info = get_info(&mut ctx, task_id, None, &mut visited);
741                        format!(
742                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
743                        )
744                    }
745                });
746                drop(reader_task);
747                drop(task);
748                if !task_ids_to_schedule.is_empty() {
749                    let mut queue = AggregationUpdateQueue::new();
750                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
751                    queue.execute(&mut ctx);
752                }
753
754                return Ok(Err(listener));
755            }
756        }
757
758        let reader_description = reader_task
759            .as_ref()
760            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
761        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
762        {
763            return value;
764        }
765
766        if let Some(output) = task.get_output() {
767            let result = match output {
768                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
769                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
770                OutputValue::Error(error) => Err(error.clone()),
771            };
772            if let Some(mut reader_task) = reader_task.take()
773                && options.tracking.should_track(result.is_err())
774                && (!task.immutable() || cfg!(feature = "verify_immutable"))
775            {
776                #[cfg(feature = "trace_task_output_dependencies")]
777                let _span = tracing::trace_span!(
778                    "add output dependency",
779                    task = %task_id,
780                    dependent_task = ?reader
781                )
782                .entered();
783                let mut queue = LeafDistanceUpdateQueue::new();
784                let reader = reader.unwrap();
785                if task.add_output_dependent(reader) {
786                    // Ensure that dependent leaf distance is strictly monotonic increasing
787                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
788                    let reader_leaf_distance =
789                        reader_task.get_leaf_distance().copied().unwrap_or_default();
790                    if reader_leaf_distance.distance <= leaf_distance.distance {
791                        queue.push(
792                            reader,
793                            leaf_distance.distance,
794                            leaf_distance.max_distance_in_buffer,
795                        );
796                    }
797                }
798
799                drop(task);
800
801                // Note: We use `task_pair` earlier to lock the task and its reader at the same
802                // time. If we didn't and just locked the reader here, an invalidation could occur
803                // between grabbing the locks. If that happened, and if the task is "outdated" or
804                // doesn't have the dependency edge yet, the invalidation would be lost.
805
806                if !reader_task.remove_outdated_output_dependencies(&task_id) {
807                    let _ = reader_task.add_output_dependencies(task_id);
808                }
809                drop(reader_task);
810
811                queue.execute(&mut ctx);
812            } else {
813                drop(task);
814            }
815
816            return result.map_err(|error| {
817                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
818                    .with_task_context(task_id, turbo_tasks.pin())
819                    .into()
820            });
821        }
822        drop(reader_task);
823
824        let note = EventDescription::new(|| {
825            move || {
826                if let Some(reader) = reader_description.as_ref() {
827                    format!("try_read_task_output (recompute) from {reader}",)
828                } else {
829                    "try_read_task_output (recompute, untracked)".to_string()
830                }
831            }
832        });
833
834        // Output doesn't exist. We need to schedule the task to compute it.
835        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
836            TaskExecutionReason::OutputNotAvailable,
837            EventDescription::new(|| task.get_task_desc_fn()),
838            note,
839        );
840
841        // It's not possible that the task is InProgress at this point. If it is InProgress {
842        // done: true } it must have Output and would early return.
843        let old = task.set_in_progress(in_progress_state);
844        debug_assert!(old.is_none(), "InProgress already exists");
845        ctx.schedule_task(task, TaskPriority::Initial);
846
847        Ok(Err(listener))
848    }
849
850    fn try_read_task_cell(
851        &self,
852        task_id: TaskId,
853        reader: Option<TaskId>,
854        cell: CellId,
855        options: ReadCellOptions,
856        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
857    ) -> Result<Result<TypedCellContent, EventListener>> {
858        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
859
860        fn add_cell_dependency(
861            task_id: TaskId,
862            mut task: impl TaskGuard,
863            reader: Option<TaskId>,
864            reader_task: Option<impl TaskGuard>,
865            cell: CellId,
866            key: Option<u64>,
867        ) {
868            if let Some(mut reader_task) = reader_task
869                && (!task.immutable() || cfg!(feature = "verify_immutable"))
870            {
871                let reader = reader.unwrap();
872                let _ = task.add_cell_dependents((cell, key, reader));
873                drop(task);
874
875                // Note: We use `task_pair` earlier to lock the task and its reader at the same
876                // time. If we didn't and just locked the reader here, an invalidation could occur
877                // between grabbing the locks. If that happened, and if the task is "outdated" or
878                // doesn't have the dependency edge yet, the invalidation would be lost.
879
880                let target = CellRef {
881                    task: task_id,
882                    cell,
883                };
884                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
885                    let _ = reader_task.add_cell_dependencies((target, key));
886                }
887                drop(reader_task);
888            }
889        }
890
891        let ReadCellOptions {
892            is_serializable_cell_content,
893            tracking,
894            final_read_hint,
895        } = options;
896
897        let mut ctx = self.execute_context(turbo_tasks);
898        let (mut task, reader_task) = if self.should_track_dependencies()
899            && !matches!(tracking, ReadCellTracking::Untracked)
900            && let Some(reader_id) = reader
901            && reader_id != task_id
902        {
903            // Having a task_pair here is not optimal, but otherwise this would lead to a race
904            // condition. See below.
905            // TODO(sokra): solve that in a more performant way.
906            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
907            (task, Some(reader))
908        } else {
909            (ctx.task(task_id, TaskDataCategory::All), None)
910        };
911
912        let content = if final_read_hint {
913            task.remove_cell_data(is_serializable_cell_content, cell)
914        } else {
915            task.get_cell_data(is_serializable_cell_content, cell)
916        };
917        if let Some(content) = content {
918            if tracking.should_track(false) {
919                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
920            }
921            return Ok(Ok(TypedCellContent(
922                cell.type_id,
923                CellContent(Some(content.reference)),
924            )));
925        }
926
927        let in_progress = task.get_in_progress();
928        if matches!(
929            in_progress,
930            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
931        ) {
932            return Ok(Err(self
933                .listen_to_cell(&mut task, task_id, &reader_task, cell)
934                .0));
935        }
936        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
937
938        // Check cell index range (cell might not exist at all)
939        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
940        let Some(max_id) = max_id else {
941            let task_desc = task.get_task_description();
942            if tracking.should_track(true) {
943                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
944            }
945            bail!(
946                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
947            );
948        };
949        if cell.index >= max_id {
950            let task_desc = task.get_task_description();
951            if tracking.should_track(true) {
952                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
953            }
954            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
955        }
956
957        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
958        // task the get the cell content.
959
960        // Listen to the cell and potentially schedule the task
961        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
962        drop(reader_task);
963        if !new_listener {
964            return Ok(Err(listener));
965        }
966
967        let _span = tracing::trace_span!(
968            "recomputation",
969            cell_type = get_value_type(cell.type_id).ty.global_name,
970            cell_index = cell.index
971        )
972        .entered();
973
974        // Schedule the task, if not already scheduled
975        if is_cancelled {
976            bail!("{} was canceled", task.get_task_description());
977        }
978
979        let _ = task.add_scheduled(
980            TaskExecutionReason::CellNotAvailable,
981            EventDescription::new(|| task.get_task_desc_fn()),
982        );
983        ctx.schedule_task(task, TaskPriority::Initial);
984
985        Ok(Err(listener))
986    }
987
988    fn listen_to_cell(
989        &self,
990        task: &mut impl TaskGuard,
991        task_id: TaskId,
992        reader_task: &Option<impl TaskGuard>,
993        cell: CellId,
994    ) -> (EventListener, bool) {
995        let note = || {
996            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
997            move || {
998                if let Some(reader_desc) = reader_desc.as_ref() {
999                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
1000                } else {
1001                    "try_read_task_cell (in progress, untracked)".to_string()
1002                }
1003            }
1004        };
1005        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
1006            // Someone else is already computing the cell
1007            let listener = in_progress.event.listen_with_note(note);
1008            return (listener, false);
1009        }
1010        let in_progress = InProgressCellState::new(task_id, cell);
1011        let listener = in_progress.event.listen_with_note(note);
1012        let old = task.insert_in_progress_cells(cell, in_progress);
1013        debug_assert!(old.is_none(), "InProgressCell already exists");
1014        (listener, true)
1015    }
1016
1017    fn snapshot_and_persist(
1018        &self,
1019        parent_span: Option<tracing::Id>,
1020        reason: &str,
1021        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1022    ) -> Option<(Instant, bool)> {
1023        let snapshot_span =
1024            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1025                .entered();
1026        let start = Instant::now();
1027        // SystemTime for wall-clock timestamps in trace events (milliseconds
1028        // since epoch). Instant is monotonic but has no defined epoch, so it
1029        // can't be used for cross-process trace correlation.
1030        let wall_start = SystemTime::now();
1031        debug_assert!(self.should_persist());
1032
1033        let suspended_operations;
1034        {
1035            let _span = tracing::info_span!("blocking").entered();
1036            let mut snapshot_request = self.snapshot_request.lock();
1037            snapshot_request.snapshot_requested = true;
1038            let active_operations = self
1039                .in_progress_operations
1040                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1041            if active_operations != 0 {
1042                self.operations_suspended
1043                    .wait_while(&mut snapshot_request, |_| {
1044                        self.in_progress_operations.load(Ordering::Relaxed)
1045                            != SNAPSHOT_REQUESTED_BIT
1046                    });
1047            }
1048            suspended_operations = snapshot_request
1049                .suspended_operations
1050                .iter()
1051                .map(|op| op.arc().clone())
1052                .collect::<Vec<_>>();
1053        }
1054        self.storage.start_snapshot();
1055        let mut persisted_task_cache_log = self
1056            .persisted_task_cache_log
1057            .as_ref()
1058            .map(|l| l.take(|i| i))
1059            .unwrap_or_default();
1060        let mut snapshot_request = self.snapshot_request.lock();
1061        snapshot_request.snapshot_requested = false;
1062        self.in_progress_operations
1063            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1064        self.snapshot_completed.notify_all();
1065        let snapshot_time = Instant::now();
1066        drop(snapshot_request);
1067
1068        #[cfg(feature = "print_cache_item_size")]
1069        #[derive(Default)]
1070        struct TaskCacheStats {
1071            data: usize,
1072            #[cfg(feature = "print_cache_item_size_with_compressed")]
1073            data_compressed: usize,
1074            data_count: usize,
1075            meta: usize,
1076            #[cfg(feature = "print_cache_item_size_with_compressed")]
1077            meta_compressed: usize,
1078            meta_count: usize,
1079            upper_count: usize,
1080            collectibles_count: usize,
1081            aggregated_collectibles_count: usize,
1082            children_count: usize,
1083            followers_count: usize,
1084            collectibles_dependents_count: usize,
1085            aggregated_dirty_containers_count: usize,
1086            output_size: usize,
1087        }
1088        /// Formats a byte size, optionally including the compressed size when the
1089        /// `print_cache_item_size_with_compressed` feature is enabled.
1090        #[cfg(feature = "print_cache_item_size")]
1091        struct FormatSizes {
1092            size: usize,
1093            #[cfg(feature = "print_cache_item_size_with_compressed")]
1094            compressed_size: usize,
1095        }
1096        #[cfg(feature = "print_cache_item_size")]
1097        impl std::fmt::Display for FormatSizes {
1098            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1099                use turbo_tasks::util::FormatBytes;
1100                #[cfg(feature = "print_cache_item_size_with_compressed")]
1101                {
1102                    write!(
1103                        f,
1104                        "{} ({} compressed)",
1105                        FormatBytes(self.size),
1106                        FormatBytes(self.compressed_size)
1107                    )
1108                }
1109                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1110                {
1111                    write!(f, "{}", FormatBytes(self.size))
1112                }
1113            }
1114        }
1115        #[cfg(feature = "print_cache_item_size")]
1116        impl TaskCacheStats {
1117            #[cfg(feature = "print_cache_item_size_with_compressed")]
1118            fn compressed_size(data: &[u8]) -> Result<usize> {
1119                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1120                    data,
1121                    &mut Vec::new(),
1122                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1123                )?)
1124            }
1125
1126            fn add_data(&mut self, data: &[u8]) {
1127                self.data += data.len();
1128                #[cfg(feature = "print_cache_item_size_with_compressed")]
1129                {
1130                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1131                }
1132                self.data_count += 1;
1133            }
1134
1135            fn add_meta(&mut self, data: &[u8]) {
1136                self.meta += data.len();
1137                #[cfg(feature = "print_cache_item_size_with_compressed")]
1138                {
1139                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1140                }
1141                self.meta_count += 1;
1142            }
1143
1144            fn add_counts(&mut self, storage: &TaskStorage) {
1145                let counts = storage.meta_counts();
1146                self.upper_count += counts.upper;
1147                self.collectibles_count += counts.collectibles;
1148                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1149                self.children_count += counts.children;
1150                self.followers_count += counts.followers;
1151                self.collectibles_dependents_count += counts.collectibles_dependents;
1152                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1153                if let Some(output) = storage.get_output() {
1154                    use turbo_bincode::turbo_bincode_encode;
1155
1156                    self.output_size += turbo_bincode_encode(&output)
1157                        .map(|data| data.len())
1158                        .unwrap_or(0);
1159                }
1160            }
1161
1162            /// Returns the task name used as the stats grouping key.
1163            fn task_name(storage: &TaskStorage) -> String {
1164                storage
1165                    .get_persistent_task_type()
1166                    .map(|t| t.to_string())
1167                    .unwrap_or_else(|| "<unknown>".to_string())
1168            }
1169
1170            /// Returns the primary sort key: compressed total when
1171            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1172            fn sort_key(&self) -> usize {
1173                #[cfg(feature = "print_cache_item_size_with_compressed")]
1174                {
1175                    self.data_compressed + self.meta_compressed
1176                }
1177                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1178                {
1179                    self.data + self.meta
1180                }
1181            }
1182
1183            fn format_total(&self) -> FormatSizes {
1184                FormatSizes {
1185                    size: self.data + self.meta,
1186                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1187                    compressed_size: self.data_compressed + self.meta_compressed,
1188                }
1189            }
1190
1191            fn format_data(&self) -> FormatSizes {
1192                FormatSizes {
1193                    size: self.data,
1194                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1195                    compressed_size: self.data_compressed,
1196                }
1197            }
1198
1199            fn format_avg_data(&self) -> FormatSizes {
1200                FormatSizes {
1201                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1202                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1203                    compressed_size: self
1204                        .data_compressed
1205                        .checked_div(self.data_count)
1206                        .unwrap_or(0),
1207                }
1208            }
1209
1210            fn format_meta(&self) -> FormatSizes {
1211                FormatSizes {
1212                    size: self.meta,
1213                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1214                    compressed_size: self.meta_compressed,
1215                }
1216            }
1217
1218            fn format_avg_meta(&self) -> FormatSizes {
1219                FormatSizes {
1220                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1221                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1222                    compressed_size: self
1223                        .meta_compressed
1224                        .checked_div(self.meta_count)
1225                        .unwrap_or(0),
1226                }
1227            }
1228        }
1229        #[cfg(feature = "print_cache_item_size")]
1230        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1231            Mutex::new(FxHashMap::default());
1232
1233        // Encode each task's modified categories. We only encode categories with `modified` set,
1234        // meaning the category was actually dirtied. Categories restored from disk but never
1235        // modified don't need re-persisting since the on-disk version is still valid.
1236        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1237        // flags were copied from the live task at snapshot creation time, reflecting which
1238        // categories were dirtied before the snapshot was taken.
1239        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1240            let encode_category = |task_id: TaskId,
1241                                   data: &TaskStorage,
1242                                   category: SpecificTaskDataCategory,
1243                                   buffer: &mut TurboBincodeBuffer|
1244             -> Option<TurboBincodeBuffer> {
1245                match encode_task_data(task_id, data, category, buffer) {
1246                    Ok(encoded) => {
1247                        #[cfg(feature = "print_cache_item_size")]
1248                        {
1249                            let mut stats = task_cache_stats.lock();
1250                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1251                            match category {
1252                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1253                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1254                            }
1255                        }
1256                        Some(encoded)
1257                    }
1258                    Err(err) => {
1259                        eprintln!(
1260                            "Serializing task {} failed ({:?}): {:?}",
1261                            self.debug_get_task_description(task_id),
1262                            category,
1263                            err
1264                        );
1265                        None
1266                    }
1267                }
1268            };
1269            if task_id.is_transient() {
1270                unreachable!("transient task_ids should never be enqueued to be persisted");
1271            }
1272
1273            let encode_meta = inner.flags.meta_modified();
1274            let encode_data = inner.flags.data_modified();
1275
1276            #[cfg(feature = "print_cache_item_size")]
1277            if encode_data || encode_meta {
1278                task_cache_stats
1279                    .lock()
1280                    .entry(TaskCacheStats::task_name(inner))
1281                    .or_default()
1282                    .add_counts(inner);
1283            }
1284
1285            let meta = if encode_meta {
1286                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1287            } else {
1288                None
1289            };
1290
1291            let data = if encode_data {
1292                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1293            } else {
1294                None
1295            };
1296
1297            SnapshotItem {
1298                task_id,
1299                meta,
1300                data,
1301            }
1302        };
1303
1304        // take_snapshot already filters empty items and empty shards in parallel
1305        let task_snapshots = self.storage.take_snapshot(&process);
1306
1307        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1308
1309        drop(snapshot_span);
1310        let snapshot_duration = start.elapsed();
1311        let task_count = task_snapshots.len();
1312
1313        if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1314            return Some((snapshot_time, false));
1315        }
1316
1317        let persist_start = Instant::now();
1318        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1319        {
1320            if let Err(err) = self.backing_storage.save_snapshot(
1321                suspended_operations,
1322                persisted_task_cache_log,
1323                task_snapshots,
1324            ) {
1325                eprintln!("Persisting failed: {err:?}");
1326                return None;
1327            }
1328            #[cfg(feature = "print_cache_item_size")]
1329            {
1330                let mut task_cache_stats = task_cache_stats
1331                    .into_inner()
1332                    .into_iter()
1333                    .collect::<Vec<_>>();
1334                if !task_cache_stats.is_empty() {
1335                    use turbo_tasks::util::FormatBytes;
1336
1337                    use crate::utils::markdown_table::print_markdown_table;
1338
1339                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1340                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1341                    });
1342
1343                    println!(
1344                        "Task cache stats: {}",
1345                        FormatSizes {
1346                            size: task_cache_stats
1347                                .iter()
1348                                .map(|(_, s)| s.data + s.meta)
1349                                .sum::<usize>(),
1350                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1351                            compressed_size: task_cache_stats
1352                                .iter()
1353                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1354                                .sum::<usize>()
1355                        },
1356                    );
1357
1358                    print_markdown_table(
1359                        [
1360                            "Task",
1361                            " Total Size",
1362                            " Data Size",
1363                            " Data Count x Avg",
1364                            " Data Count x Avg",
1365                            " Meta Size",
1366                            " Meta Count x Avg",
1367                            " Meta Count x Avg",
1368                            " Uppers",
1369                            " Coll",
1370                            " Agg Coll",
1371                            " Children",
1372                            " Followers",
1373                            " Coll Deps",
1374                            " Agg Dirty",
1375                            " Output Size",
1376                        ],
1377                        task_cache_stats.iter(),
1378                        |(task_desc, stats)| {
1379                            [
1380                                task_desc.to_string(),
1381                                format!(" {}", stats.format_total()),
1382                                format!(" {}", stats.format_data()),
1383                                format!(" {} x", stats.data_count),
1384                                format!("{}", stats.format_avg_data()),
1385                                format!(" {}", stats.format_meta()),
1386                                format!(" {} x", stats.meta_count),
1387                                format!("{}", stats.format_avg_meta()),
1388                                format!(" {}", stats.upper_count),
1389                                format!(" {}", stats.collectibles_count),
1390                                format!(" {}", stats.aggregated_collectibles_count),
1391                                format!(" {}", stats.children_count),
1392                                format!(" {}", stats.followers_count),
1393                                format!(" {}", stats.collectibles_dependents_count),
1394                                format!(" {}", stats.aggregated_dirty_containers_count),
1395                                format!(" {}", FormatBytes(stats.output_size)),
1396                            ]
1397                        },
1398                    );
1399                }
1400            }
1401        }
1402
1403        let elapsed = start.elapsed();
1404        let persist_duration = persist_start.elapsed();
1405        // avoid spamming the event queue with information about fast operations
1406        if elapsed > Duration::from_secs(10) {
1407            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1408                "Finished writing to filesystem cache".to_string(),
1409                elapsed,
1410            )));
1411        }
1412
1413        let wall_start_ms = wall_start
1414            .duration_since(SystemTime::UNIX_EPOCH)
1415            .unwrap_or_default()
1416            // as_millis_f64 is not stable yet
1417            .as_secs_f64()
1418            * 1000.0;
1419        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1420        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1421            "turbopack-persistence",
1422            wall_start_ms,
1423            wall_end_ms,
1424            vec![
1425                ("reason", serde_json::Value::from(reason)),
1426                (
1427                    "snapshot_duration_ms",
1428                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1429                ),
1430                (
1431                    "persist_duration_ms",
1432                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1433                ),
1434                ("task_count", serde_json::Value::from(task_count)),
1435            ],
1436        )));
1437
1438        Some((snapshot_time, true))
1439    }
1440
1441    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1442        if self.should_restore() {
1443            // Continue all uncompleted operations
1444            // They can't be interrupted by a snapshot since the snapshotting job has not been
1445            // scheduled yet.
1446            let uncompleted_operations = self
1447                .backing_storage
1448                .uncompleted_operations()
1449                .expect("Failed to get uncompleted operations");
1450            if !uncompleted_operations.is_empty() {
1451                let mut ctx = self.execute_context(turbo_tasks);
1452                for op in uncompleted_operations {
1453                    op.execute(&mut ctx);
1454                }
1455            }
1456        }
1457
1458        // Only when it should write regularly to the storage, we schedule the initial snapshot
1459        // job.
1460        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1461            // Schedule the snapshot job
1462            let _span = trace_span!("persisting background job").entered();
1463            let _span = tracing::info_span!("thread").entered();
1464            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1465        }
1466    }
1467
1468    fn stopping(&self) {
1469        self.stopping.store(true, Ordering::Release);
1470        self.stopping_event.notify(usize::MAX);
1471    }
1472
1473    #[allow(unused_variables)]
1474    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1475        #[cfg(feature = "verify_aggregation_graph")]
1476        {
1477            self.is_idle.store(false, Ordering::Release);
1478            self.verify_aggregation_graph(turbo_tasks, false);
1479        }
1480        if self.should_persist() {
1481            self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1482        }
1483        drop_contents(&self.task_cache);
1484        self.storage.drop_contents();
1485        if let Err(err) = self.backing_storage.shutdown() {
1486            println!("Shutting down failed: {err}");
1487        }
1488    }
1489
1490    #[allow(unused_variables)]
1491    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1492        self.idle_start_event.notify(usize::MAX);
1493
1494        #[cfg(feature = "verify_aggregation_graph")]
1495        {
1496            use tokio::select;
1497
1498            self.is_idle.store(true, Ordering::Release);
1499            let this = self.clone();
1500            let turbo_tasks = turbo_tasks.pin();
1501            tokio::task::spawn(async move {
1502                select! {
1503                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1504                        // do nothing
1505                    }
1506                    _ = this.idle_end_event.listen() => {
1507                        return;
1508                    }
1509                }
1510                if !this.is_idle.load(Ordering::Relaxed) {
1511                    return;
1512                }
1513                this.verify_aggregation_graph(&*turbo_tasks, true);
1514            });
1515        }
1516    }
1517
1518    fn idle_end(&self) {
1519        #[cfg(feature = "verify_aggregation_graph")]
1520        self.is_idle.store(false, Ordering::Release);
1521        self.idle_end_event.notify(usize::MAX);
1522    }
1523
1524    fn get_or_create_persistent_task(
1525        &self,
1526        task_type: CachedTaskType,
1527        parent_task: Option<TaskId>,
1528        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1529    ) -> TaskId {
1530        let is_root = task_type.native_fn.is_root;
1531        let is_session_dependent = task_type.native_fn.is_session_dependent;
1532        // Create a single ExecuteContext for both lookup and connect_child
1533        let mut ctx = self.execute_context(turbo_tasks);
1534        // First check if the task exists in the cache which only uses a read lock
1535        // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
1536        // before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
1537        if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1538            self.track_cache_hit(&task_type);
1539            operation::ConnectChildOperation::run(
1540                parent_task,
1541                task_id,
1542                Some(ArcOrOwned::Owned(task_type)),
1543                ctx,
1544            );
1545            return task_id;
1546        }
1547
1548        let mut is_new = false;
1549        let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1550            // Task exists in backing storage
1551            // So we only need to insert it into the in-memory cache
1552            self.track_cache_hit(&task_type);
1553            let task_type = match raw_entry(&self.task_cache, &task_type) {
1554                RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1555                RawEntry::Vacant(e) => {
1556                    let task_type = Arc::new(task_type);
1557                    e.insert(task_type.clone(), task_id);
1558                    ArcOrOwned::Arc(task_type)
1559                }
1560            };
1561            (task_id, task_type)
1562        } else {
1563            // Task doesn't exist in memory cache or backing storage
1564            // So we might need to create a new task
1565            let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1566                RawEntry::Occupied(e) => {
1567                    // Another thread beat us to creating this task - use their task_id.
1568                    // They will handle logging to persisted_task_cache_log.
1569                    let task_id = *e.get();
1570                    drop(e);
1571                    self.track_cache_hit(&task_type);
1572                    (task_id, ArcOrOwned::Owned(task_type))
1573                }
1574                RawEntry::Vacant(e) => {
1575                    // We're creating a new task.
1576                    let task_type = Arc::new(task_type);
1577                    let task_id = self.persisted_task_id_factory.get();
1578                    e.insert(task_type.clone(), task_id);
1579                    // insert() consumes e, releasing the lock
1580                    self.track_cache_miss(&task_type);
1581                    is_new = true;
1582                    if let Some(log) = &self.persisted_task_cache_log {
1583                        log.lock(task_id).push((task_type.clone(), task_id));
1584                    }
1585                    (task_id, ArcOrOwned::Arc(task_type))
1586                }
1587            };
1588            (task_id, task_type)
1589        };
1590        if is_new {
1591            self.set_initial_aggregation_number(task_id, is_root, is_session_dependent, &mut ctx);
1592        }
1593        // Reuse the same ExecuteContext for connect_child
1594        operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1595
1596        task_id
1597    }
1598
1599    fn get_or_create_transient_task(
1600        &self,
1601        task_type: CachedTaskType,
1602        parent_task: Option<TaskId>,
1603        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1604    ) -> TaskId {
1605        let is_root = task_type.native_fn.is_root;
1606        let is_session_dependent = task_type.native_fn.is_session_dependent;
1607
1608        if let Some(parent_task) = parent_task
1609            && !parent_task.is_transient()
1610        {
1611            self.panic_persistent_calling_transient(
1612                self.debug_get_task_description(parent_task),
1613                Some(&task_type),
1614                /* cell_id */ None,
1615            );
1616        }
1617        let mut ctx = self.execute_context(turbo_tasks);
1618        // First check if the task exists in the cache which only uses a read lock.
1619        // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
1620        // before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
1621        if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1622            self.track_cache_hit(&task_type);
1623            operation::ConnectChildOperation::run(
1624                parent_task,
1625                task_id,
1626                Some(ArcOrOwned::Owned(task_type)),
1627                ctx,
1628            );
1629            return task_id;
1630        }
1631        // If not, acquire a write lock and double check / insert
1632        match raw_entry(&self.task_cache, &task_type) {
1633            RawEntry::Occupied(e) => {
1634                let task_id = *e.get();
1635                drop(e);
1636                self.track_cache_hit(&task_type);
1637                operation::ConnectChildOperation::run(
1638                    parent_task,
1639                    task_id,
1640                    Some(ArcOrOwned::Owned(task_type)),
1641                    ctx,
1642                );
1643                task_id
1644            }
1645            RawEntry::Vacant(e) => {
1646                let task_type = Arc::new(task_type);
1647                let task_id = self.transient_task_id_factory.get();
1648                e.insert(task_type.clone(), task_id);
1649                self.track_cache_miss(&task_type);
1650
1651                self.set_initial_aggregation_number(
1652                    task_id,
1653                    is_root,
1654                    is_session_dependent,
1655                    &mut ctx,
1656                );
1657
1658                operation::ConnectChildOperation::run(
1659                    parent_task,
1660                    task_id,
1661                    Some(ArcOrOwned::Arc(task_type)),
1662                    ctx,
1663                );
1664
1665                task_id
1666            }
1667        }
1668    }
1669
1670    /// Generate an object that implements [`fmt::Display`] explaining why the given
1671    /// [`CachedTaskType`] is transient.
1672    fn debug_trace_transient_task(
1673        &self,
1674        task_type: &CachedTaskType,
1675        cell_id: Option<CellId>,
1676    ) -> DebugTraceTransientTask {
1677        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1678        // from tracing the same task many times, so use a visited_set
1679        fn inner_id(
1680            backend: &TurboTasksBackendInner<impl BackingStorage>,
1681            task_id: TaskId,
1682            cell_type_id: Option<ValueTypeId>,
1683            visited_set: &mut FxHashSet<TaskId>,
1684        ) -> DebugTraceTransientTask {
1685            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1686                if visited_set.contains(&task_id) {
1687                    let task_name = task_type.get_name();
1688                    DebugTraceTransientTask::Collapsed {
1689                        task_name,
1690                        cell_type_id,
1691                    }
1692                } else {
1693                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1694                }
1695            } else {
1696                DebugTraceTransientTask::Uncached { cell_type_id }
1697            }
1698        }
1699        fn inner_cached(
1700            backend: &TurboTasksBackendInner<impl BackingStorage>,
1701            task_type: &CachedTaskType,
1702            cell_type_id: Option<ValueTypeId>,
1703            visited_set: &mut FxHashSet<TaskId>,
1704        ) -> DebugTraceTransientTask {
1705            let task_name = task_type.get_name();
1706
1707            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1708                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1709                    // `task_id` should never be `None` at this point, as that would imply a
1710                    // non-local task is returning a local `Vc`...
1711                    // Just ignore if it happens, as we're likely already panicking.
1712                    return None;
1713                };
1714                if task_id.is_transient() {
1715                    Some(Box::new(inner_id(
1716                        backend,
1717                        task_id,
1718                        cause_self_raw_vc.try_get_type_id(),
1719                        visited_set,
1720                    )))
1721                } else {
1722                    None
1723                }
1724            });
1725            let cause_args = task_type
1726                .arg
1727                .get_raw_vcs()
1728                .into_iter()
1729                .filter_map(|raw_vc| {
1730                    let Some(task_id) = raw_vc.try_get_task_id() else {
1731                        // `task_id` should never be `None` (see comment above)
1732                        return None;
1733                    };
1734                    if !task_id.is_transient() {
1735                        return None;
1736                    }
1737                    Some((task_id, raw_vc.try_get_type_id()))
1738                })
1739                .collect::<IndexSet<_>>() // dedupe
1740                .into_iter()
1741                .map(|(task_id, cell_type_id)| {
1742                    inner_id(backend, task_id, cell_type_id, visited_set)
1743                })
1744                .collect();
1745
1746            DebugTraceTransientTask::Cached {
1747                task_name,
1748                cell_type_id,
1749                cause_self,
1750                cause_args,
1751            }
1752        }
1753        inner_cached(
1754            self,
1755            task_type,
1756            cell_id.map(|c| c.type_id),
1757            &mut FxHashSet::default(),
1758        )
1759    }
1760
1761    fn invalidate_task(
1762        &self,
1763        task_id: TaskId,
1764        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1765    ) {
1766        if !self.should_track_dependencies() {
1767            panic!("Dependency tracking is disabled so invalidation is not allowed");
1768        }
1769        operation::InvalidateOperation::run(
1770            smallvec![task_id],
1771            #[cfg(feature = "trace_task_dirty")]
1772            TaskDirtyCause::Invalidator,
1773            self.execute_context(turbo_tasks),
1774        );
1775    }
1776
1777    fn invalidate_tasks(
1778        &self,
1779        tasks: &[TaskId],
1780        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1781    ) {
1782        if !self.should_track_dependencies() {
1783            panic!("Dependency tracking is disabled so invalidation is not allowed");
1784        }
1785        operation::InvalidateOperation::run(
1786            tasks.iter().copied().collect(),
1787            #[cfg(feature = "trace_task_dirty")]
1788            TaskDirtyCause::Unknown,
1789            self.execute_context(turbo_tasks),
1790        );
1791    }
1792
1793    fn invalidate_tasks_set(
1794        &self,
1795        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1796        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1797    ) {
1798        if !self.should_track_dependencies() {
1799            panic!("Dependency tracking is disabled so invalidation is not allowed");
1800        }
1801        operation::InvalidateOperation::run(
1802            tasks.iter().copied().collect(),
1803            #[cfg(feature = "trace_task_dirty")]
1804            TaskDirtyCause::Unknown,
1805            self.execute_context(turbo_tasks),
1806        );
1807    }
1808
1809    fn invalidate_serialization(
1810        &self,
1811        task_id: TaskId,
1812        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1813    ) {
1814        if task_id.is_transient() {
1815            return;
1816        }
1817        let mut ctx = self.execute_context(turbo_tasks);
1818        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1819        task.invalidate_serialization();
1820    }
1821
1822    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1823        let task = self.storage.access_mut(task_id);
1824        if let Some(value) = task.get_persistent_task_type() {
1825            format!("{task_id:?} {}", value)
1826        } else if let Some(value) = task.get_transient_task_type() {
1827            format!("{task_id:?} {}", value)
1828        } else {
1829            format!("{task_id:?} unknown")
1830        }
1831    }
1832
1833    fn get_task_name(
1834        &self,
1835        task_id: TaskId,
1836        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1837    ) -> String {
1838        let mut ctx = self.execute_context(turbo_tasks);
1839        let task = ctx.task(task_id, TaskDataCategory::Data);
1840        if let Some(value) = task.get_persistent_task_type() {
1841            value.to_string()
1842        } else if let Some(value) = task.get_transient_task_type() {
1843            value.to_string()
1844        } else {
1845            "unknown".to_string()
1846        }
1847    }
1848
1849    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1850        let task = self.storage.access_mut(task_id);
1851        task.get_persistent_task_type().cloned()
1852    }
1853
1854    fn task_execution_canceled(
1855        &self,
1856        task_id: TaskId,
1857        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1858    ) {
1859        let mut ctx = self.execute_context(turbo_tasks);
1860        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1861        if let Some(in_progress) = task.take_in_progress() {
1862            match in_progress {
1863                InProgressState::Scheduled {
1864                    done_event,
1865                    reason: _,
1866                } => done_event.notify(usize::MAX),
1867                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1868                    done_event.notify(usize::MAX)
1869                }
1870                InProgressState::Canceled => {}
1871            }
1872        }
1873        let old = task.set_in_progress(InProgressState::Canceled);
1874        debug_assert!(old.is_none(), "InProgress already exists");
1875    }
1876
1877    fn try_start_task_execution(
1878        &self,
1879        task_id: TaskId,
1880        priority: TaskPriority,
1881        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1882    ) -> Option<TaskExecutionSpec<'_>> {
1883        let execution_reason;
1884        let task_type;
1885        {
1886            let mut ctx = self.execute_context(turbo_tasks);
1887            let mut task = ctx.task(task_id, TaskDataCategory::All);
1888            task_type = task.get_task_type().to_owned();
1889            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1890            if let Some(tasks) = task.prefetch() {
1891                drop(task);
1892                ctx.prepare_tasks(tasks);
1893                task = ctx.task(task_id, TaskDataCategory::All);
1894            }
1895            let in_progress = task.take_in_progress()?;
1896            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1897                let old = task.set_in_progress(in_progress);
1898                debug_assert!(old.is_none(), "InProgress already exists");
1899                return None;
1900            };
1901            execution_reason = reason;
1902            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1903                InProgressStateInner {
1904                    stale: false,
1905                    once_task,
1906                    done_event,
1907                    marked_as_completed: false,
1908                    new_children: Default::default(),
1909                },
1910            )));
1911            debug_assert!(old.is_none(), "InProgress already exists");
1912
1913            // Make all current collectibles outdated (remove left-over outdated collectibles)
1914            enum Collectible {
1915                Current(CollectibleRef, i32),
1916                Outdated(CollectibleRef),
1917            }
1918            let collectibles = task
1919                .iter_collectibles()
1920                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1921                .chain(
1922                    task.iter_outdated_collectibles()
1923                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1924                )
1925                .collect::<Vec<_>>();
1926            for collectible in collectibles {
1927                match collectible {
1928                    Collectible::Current(collectible, value) => {
1929                        let _ = task.insert_outdated_collectible(collectible, value);
1930                    }
1931                    Collectible::Outdated(collectible) => {
1932                        if task
1933                            .collectibles()
1934                            .is_none_or(|m| m.get(&collectible).is_none())
1935                        {
1936                            task.remove_outdated_collectibles(&collectible);
1937                        }
1938                    }
1939                }
1940            }
1941
1942            if self.should_track_dependencies() {
1943                // Make all dependencies outdated
1944                let cell_dependencies = task.iter_cell_dependencies().collect();
1945                task.set_outdated_cell_dependencies(cell_dependencies);
1946
1947                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1948                task.set_outdated_output_dependencies(outdated_output_dependencies);
1949            }
1950        }
1951
1952        let (span, future) = match task_type {
1953            TaskType::Cached(task_type) => {
1954                let CachedTaskType {
1955                    native_fn,
1956                    this,
1957                    arg,
1958                } = &*task_type;
1959                (
1960                    native_fn.span(task_id.persistence(), execution_reason, priority),
1961                    native_fn.execute(*this, &**arg),
1962                )
1963            }
1964            TaskType::Transient(task_type) => {
1965                let span = tracing::trace_span!("turbo_tasks::root_task");
1966                let future = match &*task_type {
1967                    TransientTask::Root(f) => f(),
1968                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1969                };
1970                (span, future)
1971            }
1972        };
1973        Some(TaskExecutionSpec { future, span })
1974    }
1975
1976    fn task_execution_completed(
1977        &self,
1978        task_id: TaskId,
1979        result: Result<RawVc, TurboTasksExecutionError>,
1980        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1981        #[cfg(feature = "verify_determinism")] stateful: bool,
1982        has_invalidator: bool,
1983        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1984    ) -> bool {
1985        // Task completion is a 4 step process:
1986        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1987        //    aggregation number of the task and the new children.
1988        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1989        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1990        // 4. Shrink the task memory to reduce footprint of the task.
1991
1992        // Due to persistence it is possible that the process is cancelled after any step. This is
1993        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1994        // in-memory representation.
1995
1996        // The task might be invalidated during this process, so we need to check the stale flag
1997        // at the start of every step.
1998
1999        #[cfg(not(feature = "trace_task_details"))]
2000        let span = tracing::trace_span!(
2001            "task execution completed",
2002            new_children = tracing::field::Empty
2003        )
2004        .entered();
2005        #[cfg(feature = "trace_task_details")]
2006        let span = tracing::trace_span!(
2007            "task execution completed",
2008            task_id = display(task_id),
2009            result = match result.as_ref() {
2010                Ok(value) => display(either::Either::Left(value)),
2011                Err(err) => display(either::Either::Right(err)),
2012            },
2013            new_children = tracing::field::Empty,
2014            immutable = tracing::field::Empty,
2015            new_output = tracing::field::Empty,
2016            output_dependents = tracing::field::Empty,
2017            stale = tracing::field::Empty,
2018        )
2019        .entered();
2020
2021        let is_error = result.is_err();
2022
2023        let mut ctx = self.execute_context(turbo_tasks);
2024
2025        let Some(TaskExecutionCompletePrepareResult {
2026            new_children,
2027            is_now_immutable,
2028            #[cfg(feature = "verify_determinism")]
2029            no_output_set,
2030            new_output,
2031            output_dependent_tasks,
2032        }) = self.task_execution_completed_prepare(
2033            &mut ctx,
2034            #[cfg(feature = "trace_task_details")]
2035            &span,
2036            task_id,
2037            result,
2038            cell_counters,
2039            #[cfg(feature = "verify_determinism")]
2040            stateful,
2041            has_invalidator,
2042        )
2043        else {
2044            // Task was stale and has been rescheduled
2045            #[cfg(feature = "trace_task_details")]
2046            span.record("stale", "prepare");
2047            return true;
2048        };
2049
2050        #[cfg(feature = "trace_task_details")]
2051        span.record("new_output", new_output.is_some());
2052        #[cfg(feature = "trace_task_details")]
2053        span.record("output_dependents", output_dependent_tasks.len());
2054
2055        // When restoring from filesystem cache the following might not be executed (since we can
2056        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2057        // would be executed again.
2058
2059        if !output_dependent_tasks.is_empty() {
2060            self.task_execution_completed_invalidate_output_dependent(
2061                &mut ctx,
2062                task_id,
2063                output_dependent_tasks,
2064            );
2065        }
2066
2067        let has_new_children = !new_children.is_empty();
2068        span.record("new_children", new_children.len());
2069
2070        if has_new_children {
2071            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2072        }
2073
2074        if has_new_children
2075            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2076        {
2077            // Task was stale and has been rescheduled
2078            #[cfg(feature = "trace_task_details")]
2079            span.record("stale", "connect");
2080            return true;
2081        }
2082
2083        let (stale, in_progress_cells) = self.task_execution_completed_finish(
2084            &mut ctx,
2085            task_id,
2086            #[cfg(feature = "verify_determinism")]
2087            no_output_set,
2088            new_output,
2089            is_now_immutable,
2090        );
2091        if stale {
2092            // Task was stale and has been rescheduled
2093            #[cfg(feature = "trace_task_details")]
2094            span.record("stale", "finish");
2095            return true;
2096        }
2097
2098        let removed_data =
2099            self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2100
2101        // Drop data outside of critical sections
2102        drop(removed_data);
2103        drop(in_progress_cells);
2104
2105        false
2106    }
2107
2108    fn task_execution_completed_prepare(
2109        &self,
2110        ctx: &mut impl ExecuteContext<'_>,
2111        #[cfg(feature = "trace_task_details")] span: &Span,
2112        task_id: TaskId,
2113        result: Result<RawVc, TurboTasksExecutionError>,
2114        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2115        #[cfg(feature = "verify_determinism")] stateful: bool,
2116        has_invalidator: bool,
2117    ) -> Option<TaskExecutionCompletePrepareResult> {
2118        let mut task = ctx.task(task_id, TaskDataCategory::All);
2119        let Some(in_progress) = task.get_in_progress_mut() else {
2120            panic!("Task execution completed, but task is not in progress: {task:#?}");
2121        };
2122        if matches!(in_progress, InProgressState::Canceled) {
2123            return Some(TaskExecutionCompletePrepareResult {
2124                new_children: Default::default(),
2125                is_now_immutable: false,
2126                #[cfg(feature = "verify_determinism")]
2127                no_output_set: false,
2128                new_output: None,
2129                output_dependent_tasks: Default::default(),
2130            });
2131        }
2132        let &mut InProgressState::InProgress(box InProgressStateInner {
2133            stale,
2134            ref mut new_children,
2135            once_task: is_once_task,
2136            ..
2137        }) = in_progress
2138        else {
2139            panic!("Task execution completed, but task is not in progress: {task:#?}");
2140        };
2141
2142        // If the task is stale, reschedule it
2143        #[cfg(not(feature = "no_fast_stale"))]
2144        if stale && !is_once_task {
2145            let Some(InProgressState::InProgress(box InProgressStateInner {
2146                done_event,
2147                mut new_children,
2148                ..
2149            })) = task.take_in_progress()
2150            else {
2151                unreachable!();
2152            };
2153            let old = task.set_in_progress(InProgressState::Scheduled {
2154                done_event,
2155                reason: TaskExecutionReason::Stale,
2156            });
2157            debug_assert!(old.is_none(), "InProgress already exists");
2158            // Remove old children from new_children to leave only the children that had their
2159            // active count increased
2160            for task in task.iter_children() {
2161                new_children.remove(&task);
2162            }
2163            drop(task);
2164
2165            // We need to undo the active count increase for the children since we throw away the
2166            // new_children list now.
2167            AggregationUpdateQueue::run(
2168                AggregationUpdateJob::DecreaseActiveCounts {
2169                    task_ids: new_children.into_iter().collect(),
2170                },
2171                ctx,
2172            );
2173            return None;
2174        }
2175
2176        // take the children from the task to process them
2177        let mut new_children = take(new_children);
2178
2179        // handle stateful (only tracked when verify_determinism is enabled)
2180        #[cfg(feature = "verify_determinism")]
2181        if stateful {
2182            task.set_stateful(true);
2183        }
2184
2185        // handle has_invalidator
2186        if has_invalidator {
2187            task.set_invalidator(true);
2188        }
2189
2190        // handle cell counters: update max index and remove cells that are no longer used
2191        let old_counters: FxHashMap<_, _> = task
2192            .iter_cell_type_max_index()
2193            .map(|(&k, &v)| (k, v))
2194            .collect();
2195        let mut counters_to_remove = old_counters.clone();
2196
2197        for (&cell_type, &max_index) in cell_counters.iter() {
2198            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2199                if old_max_index != max_index {
2200                    task.insert_cell_type_max_index(cell_type, max_index);
2201                }
2202            } else {
2203                task.insert_cell_type_max_index(cell_type, max_index);
2204            }
2205        }
2206        for (cell_type, _) in counters_to_remove {
2207            task.remove_cell_type_max_index(&cell_type);
2208        }
2209
2210        let mut queue = AggregationUpdateQueue::new();
2211
2212        let mut old_edges = Vec::new();
2213
2214        let has_children = !new_children.is_empty();
2215        let is_immutable = task.immutable();
2216        let task_dependencies_for_immutable =
2217            // Task was previously marked as immutable
2218            if !is_immutable
2219            // Task is not session dependent (session dependent tasks can change between sessions)
2220            && !task.is_session_dependent()
2221            // Task has no invalidator
2222            && !task.invalidator()
2223            // Task has no dependencies on collectibles
2224            && task.is_collectibles_dependencies_empty()
2225        {
2226            Some(
2227                // Collect all dependencies on tasks to check if all dependencies are immutable
2228                task.iter_output_dependencies()
2229                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2230                    .collect::<FxHashSet<_>>(),
2231            )
2232        } else {
2233            None
2234        };
2235
2236        if has_children {
2237            // Prepare all new children
2238            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2239
2240            // Filter actual new children
2241            old_edges.extend(
2242                task.iter_children()
2243                    .filter(|task| !new_children.remove(task))
2244                    .map(OutdatedEdge::Child),
2245            );
2246        } else {
2247            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2248        }
2249
2250        old_edges.extend(
2251            task.iter_outdated_collectibles()
2252                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2253        );
2254
2255        if self.should_track_dependencies() {
2256            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2257            // At execution start, active deps are copied to outdated as a "before" snapshot.
2258            // During execution, new deps are added to active.
2259            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2260            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2261            // breaking dependency tracking.
2262            old_edges.extend(
2263                task.iter_outdated_cell_dependencies()
2264                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2265            );
2266            old_edges.extend(
2267                task.iter_outdated_output_dependencies()
2268                    .map(OutdatedEdge::OutputDependency),
2269            );
2270        }
2271
2272        // Check if output need to be updated
2273        let current_output = task.get_output();
2274        #[cfg(feature = "verify_determinism")]
2275        let no_output_set = current_output.is_none();
2276        let new_output = match result {
2277            Ok(RawVc::TaskOutput(output_task_id)) => {
2278                if let Some(OutputValue::Output(current_task_id)) = current_output
2279                    && *current_task_id == output_task_id
2280                {
2281                    None
2282                } else {
2283                    Some(OutputValue::Output(output_task_id))
2284                }
2285            }
2286            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2287                if let Some(OutputValue::Cell(CellRef {
2288                    task: current_task_id,
2289                    cell: current_cell,
2290                })) = current_output
2291                    && *current_task_id == output_task_id
2292                    && *current_cell == cell
2293                {
2294                    None
2295                } else {
2296                    Some(OutputValue::Cell(CellRef {
2297                        task: output_task_id,
2298                        cell,
2299                    }))
2300                }
2301            }
2302            Ok(RawVc::LocalOutput(..)) => {
2303                panic!("Non-local tasks must not return a local Vc");
2304            }
2305            Err(err) => {
2306                if let Some(OutputValue::Error(old_error)) = current_output
2307                    && **old_error == err
2308                {
2309                    None
2310                } else {
2311                    Some(OutputValue::Error(Arc::new((&err).into())))
2312                }
2313            }
2314        };
2315        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2316        // When output has changed, grab the dependent tasks
2317        if new_output.is_some() && ctx.should_track_dependencies() {
2318            output_dependent_tasks = task.iter_output_dependent().collect();
2319        }
2320
2321        drop(task);
2322
2323        // Check if the task can be marked as immutable
2324        let mut is_now_immutable = false;
2325        if let Some(dependencies) = task_dependencies_for_immutable
2326            && dependencies
2327                .iter()
2328                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2329        {
2330            is_now_immutable = true;
2331        }
2332        #[cfg(feature = "trace_task_details")]
2333        span.record("immutable", is_immutable || is_now_immutable);
2334
2335        if !queue.is_empty() || !old_edges.is_empty() {
2336            #[cfg(feature = "trace_task_completion")]
2337            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2338            // Remove outdated edges first, before removing in_progress+dirty flag.
2339            // We need to make sure all outdated edges are removed before the task can potentially
2340            // be scheduled and executed again
2341            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2342        }
2343
2344        Some(TaskExecutionCompletePrepareResult {
2345            new_children,
2346            is_now_immutable,
2347            #[cfg(feature = "verify_determinism")]
2348            no_output_set,
2349            new_output,
2350            output_dependent_tasks,
2351        })
2352    }
2353
2354    fn task_execution_completed_invalidate_output_dependent(
2355        &self,
2356        ctx: &mut impl ExecuteContext<'_>,
2357        task_id: TaskId,
2358        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2359    ) {
2360        debug_assert!(!output_dependent_tasks.is_empty());
2361
2362        if output_dependent_tasks.len() > 1 {
2363            ctx.prepare_tasks(
2364                output_dependent_tasks
2365                    .iter()
2366                    .map(|&id| (id, TaskDataCategory::All)),
2367            );
2368        }
2369
2370        fn process_output_dependents(
2371            ctx: &mut impl ExecuteContext<'_>,
2372            task_id: TaskId,
2373            dependent_task_id: TaskId,
2374            queue: &mut AggregationUpdateQueue,
2375        ) {
2376            #[cfg(feature = "trace_task_output_dependencies")]
2377            let span = tracing::trace_span!(
2378                "invalidate output dependency",
2379                task = %task_id,
2380                dependent_task = %dependent_task_id,
2381                result = tracing::field::Empty,
2382            )
2383            .entered();
2384            let mut make_stale = true;
2385            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2386            let transient_task_type = dependent.get_transient_task_type();
2387            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2388                // once tasks are never invalidated
2389                #[cfg(feature = "trace_task_output_dependencies")]
2390                span.record("result", "once task");
2391                return;
2392            }
2393            if dependent.outdated_output_dependencies_contains(&task_id) {
2394                #[cfg(feature = "trace_task_output_dependencies")]
2395                span.record("result", "outdated dependency");
2396                // output dependency is outdated, so it hasn't read the output yet
2397                // and doesn't need to be invalidated
2398                // But importantly we still need to make the task dirty as it should no longer
2399                // be considered as "recomputation".
2400                make_stale = false;
2401            } else if !dependent.output_dependencies_contains(&task_id) {
2402                // output dependency has been removed, so the task doesn't depend on the
2403                // output anymore and doesn't need to be invalidated
2404                #[cfg(feature = "trace_task_output_dependencies")]
2405                span.record("result", "no backward dependency");
2406                return;
2407            }
2408            make_task_dirty_internal(
2409                dependent,
2410                dependent_task_id,
2411                make_stale,
2412                #[cfg(feature = "trace_task_dirty")]
2413                TaskDirtyCause::OutputChange { task_id },
2414                queue,
2415                ctx,
2416            );
2417            #[cfg(feature = "trace_task_output_dependencies")]
2418            span.record("result", "marked dirty");
2419        }
2420
2421        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2422            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2423            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2424            let _ = scope_and_block(chunks.len(), |scope| {
2425                for chunk in chunks {
2426                    let child_ctx = ctx.child_context();
2427                    scope.spawn(move || {
2428                        let mut ctx = child_ctx.create();
2429                        let mut queue = AggregationUpdateQueue::new();
2430                        for dependent_task_id in chunk {
2431                            process_output_dependents(
2432                                &mut ctx,
2433                                task_id,
2434                                dependent_task_id,
2435                                &mut queue,
2436                            )
2437                        }
2438                        queue.execute(&mut ctx);
2439                    });
2440                }
2441            });
2442        } else {
2443            let mut queue = AggregationUpdateQueue::new();
2444            for dependent_task_id in output_dependent_tasks {
2445                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2446            }
2447            queue.execute(ctx);
2448        }
2449    }
2450
2451    fn task_execution_completed_unfinished_children_dirty(
2452        &self,
2453        ctx: &mut impl ExecuteContext<'_>,
2454        new_children: &FxHashSet<TaskId>,
2455    ) {
2456        debug_assert!(!new_children.is_empty());
2457
2458        let mut queue = AggregationUpdateQueue::new();
2459        ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2460            if !child_task.has_output() {
2461                let child_id = child_task.id();
2462                make_task_dirty_internal(
2463                    child_task,
2464                    child_id,
2465                    false,
2466                    #[cfg(feature = "trace_task_dirty")]
2467                    TaskDirtyCause::InitialDirty,
2468                    &mut queue,
2469                    ctx,
2470                );
2471            }
2472        });
2473
2474        queue.execute(ctx);
2475    }
2476
2477    fn task_execution_completed_connect(
2478        &self,
2479        ctx: &mut impl ExecuteContext<'_>,
2480        task_id: TaskId,
2481        new_children: FxHashSet<TaskId>,
2482    ) -> bool {
2483        debug_assert!(!new_children.is_empty());
2484
2485        let mut task = ctx.task(task_id, TaskDataCategory::All);
2486        let Some(in_progress) = task.get_in_progress() else {
2487            panic!("Task execution completed, but task is not in progress: {task:#?}");
2488        };
2489        if matches!(in_progress, InProgressState::Canceled) {
2490            // Task was canceled in the meantime, so we don't connect the children
2491            return false;
2492        }
2493        let InProgressState::InProgress(box InProgressStateInner {
2494            #[cfg(not(feature = "no_fast_stale"))]
2495            stale,
2496            once_task: is_once_task,
2497            ..
2498        }) = in_progress
2499        else {
2500            panic!("Task execution completed, but task is not in progress: {task:#?}");
2501        };
2502
2503        // If the task is stale, reschedule it
2504        #[cfg(not(feature = "no_fast_stale"))]
2505        if *stale && !is_once_task {
2506            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2507                task.take_in_progress()
2508            else {
2509                unreachable!();
2510            };
2511            let old = task.set_in_progress(InProgressState::Scheduled {
2512                done_event,
2513                reason: TaskExecutionReason::Stale,
2514            });
2515            debug_assert!(old.is_none(), "InProgress already exists");
2516            drop(task);
2517
2518            // All `new_children` are currently hold active with an active count and we need to undo
2519            // that. (We already filtered out the old children from that list)
2520            AggregationUpdateQueue::run(
2521                AggregationUpdateJob::DecreaseActiveCounts {
2522                    task_ids: new_children.into_iter().collect(),
2523                },
2524                ctx,
2525            );
2526            return true;
2527        }
2528
2529        let has_active_count = ctx.should_track_activeness()
2530            && task
2531                .get_activeness()
2532                .is_some_and(|activeness| activeness.active_counter > 0);
2533        connect_children(
2534            ctx,
2535            task_id,
2536            task,
2537            new_children,
2538            has_active_count,
2539            ctx.should_track_activeness(),
2540        );
2541
2542        false
2543    }
2544
2545    fn task_execution_completed_finish(
2546        &self,
2547        ctx: &mut impl ExecuteContext<'_>,
2548        task_id: TaskId,
2549        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2550        new_output: Option<OutputValue>,
2551        is_now_immutable: bool,
2552    ) -> (
2553        bool,
2554        Option<
2555            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2556        >,
2557    ) {
2558        let mut task = ctx.task(task_id, TaskDataCategory::All);
2559        let Some(in_progress) = task.take_in_progress() else {
2560            panic!("Task execution completed, but task is not in progress: {task:#?}");
2561        };
2562        if matches!(in_progress, InProgressState::Canceled) {
2563            // Task was canceled in the meantime, so we don't finish it
2564            return (false, None);
2565        }
2566        let InProgressState::InProgress(box InProgressStateInner {
2567            done_event,
2568            once_task: is_once_task,
2569            stale,
2570            marked_as_completed: _,
2571            new_children,
2572        }) = in_progress
2573        else {
2574            panic!("Task execution completed, but task is not in progress: {task:#?}");
2575        };
2576        debug_assert!(new_children.is_empty());
2577
2578        // If the task is stale, reschedule it
2579        if stale && !is_once_task {
2580            let old = task.set_in_progress(InProgressState::Scheduled {
2581                done_event,
2582                reason: TaskExecutionReason::Stale,
2583            });
2584            debug_assert!(old.is_none(), "InProgress already exists");
2585            return (true, None);
2586        }
2587
2588        // Set the output if it has changed
2589        let mut old_content = None;
2590        if let Some(value) = new_output {
2591            old_content = task.set_output(value);
2592        }
2593
2594        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2595        // to be invalidated and we can mark it as immutable.
2596        if is_now_immutable {
2597            task.set_immutable(true);
2598        }
2599
2600        // Notify in progress cells and remove all of them
2601        let in_progress_cells = task.take_in_progress_cells();
2602        if let Some(ref cells) = in_progress_cells {
2603            for state in cells.values() {
2604                state.event.notify(usize::MAX);
2605            }
2606        }
2607
2608        // Grab the old dirty state
2609        let old_dirtyness = task.get_dirty().cloned();
2610        let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2611            None => (false, false),
2612            Some(Dirtyness::Dirty(_)) => (true, false),
2613            Some(Dirtyness::SessionDependent) => {
2614                let clean_in_current_session = task.current_session_clean();
2615                (true, clean_in_current_session)
2616            }
2617        };
2618
2619        // Compute the new dirty state
2620        let session_dependent = task.is_session_dependent();
2621        let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2622            (Some(Dirtyness::SessionDependent), true, true)
2623        } else {
2624            (None, false, false)
2625        };
2626
2627        // Update the dirty state
2628        let dirty_changed = old_dirtyness != new_dirtyness;
2629        if dirty_changed {
2630            if let Some(value) = new_dirtyness {
2631                task.set_dirty(value);
2632            } else if old_dirtyness.is_some() {
2633                task.take_dirty();
2634            }
2635        }
2636        if old_current_session_self_clean != new_current_session_self_clean {
2637            if new_current_session_self_clean {
2638                task.set_current_session_clean(true);
2639            } else if old_current_session_self_clean {
2640                task.set_current_session_clean(false);
2641            }
2642        }
2643
2644        // Propagate dirtyness changes
2645        let data_update = if old_self_dirty != new_self_dirty
2646            || old_current_session_self_clean != new_current_session_self_clean
2647        {
2648            let dirty_container_count = task
2649                .get_aggregated_dirty_container_count()
2650                .cloned()
2651                .unwrap_or_default();
2652            let current_session_clean_container_count = task
2653                .get_aggregated_current_session_clean_container_count()
2654                .copied()
2655                .unwrap_or_default();
2656            let result = ComputeDirtyAndCleanUpdate {
2657                old_dirty_container_count: dirty_container_count,
2658                new_dirty_container_count: dirty_container_count,
2659                old_current_session_clean_container_count: current_session_clean_container_count,
2660                new_current_session_clean_container_count: current_session_clean_container_count,
2661                old_self_dirty,
2662                new_self_dirty,
2663                old_current_session_self_clean,
2664                new_current_session_self_clean,
2665            }
2666            .compute();
2667            if result.dirty_count_update - result.current_session_clean_update < 0 {
2668                // The task is clean now
2669                if let Some(activeness_state) = task.get_activeness_mut() {
2670                    activeness_state.all_clean_event.notify(usize::MAX);
2671                    activeness_state.unset_active_until_clean();
2672                    if activeness_state.is_empty() {
2673                        task.take_activeness();
2674                    }
2675                }
2676            }
2677            result
2678                .aggregated_update(task_id)
2679                .and_then(|aggregated_update| {
2680                    AggregationUpdateJob::data_update(&mut task, aggregated_update)
2681                })
2682        } else {
2683            None
2684        };
2685
2686        #[cfg(feature = "verify_determinism")]
2687        let reschedule =
2688            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2689        #[cfg(not(feature = "verify_determinism"))]
2690        let reschedule = false;
2691        if reschedule {
2692            let old = task.set_in_progress(InProgressState::Scheduled {
2693                done_event,
2694                reason: TaskExecutionReason::Stale,
2695            });
2696            debug_assert!(old.is_none(), "InProgress already exists");
2697            drop(task);
2698        } else {
2699            drop(task);
2700
2701            // Notify dependent tasks that are waiting for this task to finish
2702            done_event.notify(usize::MAX);
2703        }
2704
2705        drop(old_content);
2706
2707        if let Some(data_update) = data_update {
2708            AggregationUpdateQueue::run(data_update, ctx);
2709        }
2710
2711        // We return so the data can be dropped outside of critical sections
2712        (reschedule, in_progress_cells)
2713    }
2714
2715    fn task_execution_completed_cleanup(
2716        &self,
2717        ctx: &mut impl ExecuteContext<'_>,
2718        task_id: TaskId,
2719        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2720        is_error: bool,
2721    ) -> Vec<SharedReference> {
2722        let mut task = ctx.task(task_id, TaskDataCategory::All);
2723        let mut removed_cell_data = Vec::new();
2724        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2725        // after an error as it is likely transient and we want to keep the dependent tasks
2726        // clean to avoid re-executions.
2727        if !is_error {
2728            // Remove no longer existing cells and
2729            // find all outdated data items (removed cells, outdated edges)
2730            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2731            // anyway and we want to avoid needless re-executions. When the cells become
2732            // used again, they are invalidated from the update cell operation.
2733            // Remove cell data for cells that no longer exist
2734            let to_remove_persistent: Vec<_> = task
2735                .iter_persistent_cell_data()
2736                .filter_map(|(cell, _)| {
2737                    cell_counters
2738                        .get(&cell.type_id)
2739                        .is_none_or(|start_index| cell.index >= *start_index)
2740                        .then_some(*cell)
2741                })
2742                .collect();
2743
2744            // Remove transient cell data for cells that no longer exist
2745            let to_remove_transient: Vec<_> = task
2746                .iter_transient_cell_data()
2747                .filter_map(|(cell, _)| {
2748                    cell_counters
2749                        .get(&cell.type_id)
2750                        .is_none_or(|start_index| cell.index >= *start_index)
2751                        .then_some(*cell)
2752                })
2753                .collect();
2754            removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2755            for cell in to_remove_persistent {
2756                if let Some(data) = task.remove_persistent_cell_data(&cell) {
2757                    removed_cell_data.push(data.into_untyped());
2758                }
2759            }
2760            for cell in to_remove_transient {
2761                if let Some(data) = task.remove_transient_cell_data(&cell) {
2762                    removed_cell_data.push(data);
2763                }
2764            }
2765        }
2766
2767        // Clean up task storage after execution:
2768        // - Shrink collections marked with shrink_on_completion
2769        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2770        task.cleanup_after_execution();
2771
2772        drop(task);
2773
2774        // Return so we can drop outside of critical sections
2775        removed_cell_data
2776    }
2777
2778    fn run_backend_job<'a>(
2779        self: &'a Arc<Self>,
2780        job: TurboTasksBackendJob,
2781        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2782    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2783        Box::pin(async move {
2784            match job {
2785                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2786                    debug_assert!(self.should_persist());
2787
2788                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2789                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2790                    let mut idle_start_listener = self.idle_start_event.listen();
2791                    let mut idle_end_listener = self.idle_end_event.listen();
2792                    let mut fresh_idle = true;
2793                    loop {
2794                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2795                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2796                        let idle_timeout = *IDLE_TIMEOUT;
2797                        let (time, mut reason) =
2798                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2799                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2800                            } else {
2801                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2802                            };
2803
2804                        let until = last_snapshot + time;
2805                        if until > Instant::now() {
2806                            let mut stop_listener = self.stopping_event.listen();
2807                            if self.stopping.load(Ordering::Acquire) {
2808                                return;
2809                            }
2810                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2811                                Instant::now() + idle_timeout
2812                            } else {
2813                                far_future()
2814                            };
2815                            loop {
2816                                tokio::select! {
2817                                    _ = &mut stop_listener => {
2818                                        return;
2819                                    },
2820                                    _ = &mut idle_start_listener => {
2821                                        fresh_idle = true;
2822                                        idle_time = Instant::now() + idle_timeout;
2823                                        idle_start_listener = self.idle_start_event.listen()
2824                                    },
2825                                    _ = &mut idle_end_listener => {
2826                                        idle_time = until + idle_timeout;
2827                                        idle_end_listener = self.idle_end_event.listen()
2828                                    },
2829                                    _ = tokio::time::sleep_until(until) => {
2830                                        break;
2831                                    },
2832                                    _ = tokio::time::sleep_until(idle_time) => {
2833                                        if turbo_tasks.is_idle() {
2834                                            reason = "idle timeout";
2835                                            break;
2836                                        }
2837                                    },
2838                                }
2839                            }
2840                        }
2841
2842                        let this = self.clone();
2843                        // Create a root span shared by both the snapshot/persist
2844                        // work and the subsequent compaction so they appear
2845                        // grouped together in trace viewers.
2846                        let background_span =
2847                            tracing::info_span!(parent: None, "background snapshot");
2848                        let snapshot =
2849                            this.snapshot_and_persist(background_span.id(), reason, turbo_tasks);
2850                        if let Some((snapshot_start, new_data)) = snapshot {
2851                            last_snapshot = snapshot_start;
2852
2853                            // Compact while idle (up to limit), regardless of
2854                            // whether the snapshot had new data.
2855                            // `background_span` is not entered here because
2856                            // `EnteredSpan` is `!Send` and would prevent the
2857                            // future from being sent across threads when it
2858                            // suspends at the `select!` await below.
2859                            const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2860                            for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2861                                let idle_ended = tokio::select! {
2862                                    biased;
2863                                    _ = &mut idle_end_listener => {
2864                                        idle_end_listener = self.idle_end_event.listen();
2865                                        true
2866                                    },
2867                                    _ = std::future::ready(()) => false,
2868                                };
2869                                if idle_ended {
2870                                    break;
2871                                }
2872                                // Enter the span only around the synchronous
2873                                // compact() call so we never hold an
2874                                // `EnteredSpan` across an await point.
2875                                let _compact_span = tracing::info_span!(
2876                                    parent: background_span.id(),
2877                                    "compact database"
2878                                )
2879                                .entered();
2880                                match self.backing_storage.compact() {
2881                                    Ok(true) => {}
2882                                    Ok(false) => break,
2883                                    Err(err) => {
2884                                        eprintln!("Compaction failed: {err:?}");
2885                                        break;
2886                                    }
2887                                }
2888                            }
2889
2890                            if !new_data {
2891                                fresh_idle = false;
2892                                continue;
2893                            }
2894                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2895                            self.last_snapshot.store(
2896                                last_snapshot.as_millis().try_into().unwrap(),
2897                                Ordering::Relaxed,
2898                            );
2899
2900                            turbo_tasks.schedule_backend_background_job(
2901                                TurboTasksBackendJob::FollowUpSnapshot,
2902                            );
2903                            return;
2904                        }
2905                    }
2906                }
2907            }
2908        })
2909    }
2910
2911    fn try_read_own_task_cell(
2912        &self,
2913        task_id: TaskId,
2914        cell: CellId,
2915        options: ReadCellOptions,
2916        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2917    ) -> Result<TypedCellContent> {
2918        let mut ctx = self.execute_context(turbo_tasks);
2919        let task = ctx.task(task_id, TaskDataCategory::Data);
2920        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2921            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2922            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2923        } else {
2924            Ok(CellContent(None).into_typed(cell.type_id))
2925        }
2926    }
2927
2928    fn read_task_collectibles(
2929        &self,
2930        task_id: TaskId,
2931        collectible_type: TraitTypeId,
2932        reader_id: Option<TaskId>,
2933        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2934    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2935        let mut ctx = self.execute_context(turbo_tasks);
2936        let mut collectibles = AutoMap::default();
2937        {
2938            let mut task = ctx.task(task_id, TaskDataCategory::All);
2939            // Ensure it's an root node
2940            loop {
2941                let aggregation_number = get_aggregation_number(&task);
2942                if is_root_node(aggregation_number) {
2943                    break;
2944                }
2945                drop(task);
2946                AggregationUpdateQueue::run(
2947                    AggregationUpdateJob::UpdateAggregationNumber {
2948                        task_id,
2949                        base_aggregation_number: u32::MAX,
2950                        distance: None,
2951                    },
2952                    &mut ctx,
2953                );
2954                task = ctx.task(task_id, TaskDataCategory::All);
2955            }
2956            for (collectible, count) in task.iter_aggregated_collectibles() {
2957                if *count > 0 && collectible.collectible_type == collectible_type {
2958                    *collectibles
2959                        .entry(RawVc::TaskCell(
2960                            collectible.cell.task,
2961                            collectible.cell.cell,
2962                        ))
2963                        .or_insert(0) += 1;
2964                }
2965            }
2966            for (&collectible, &count) in task.iter_collectibles() {
2967                if collectible.collectible_type == collectible_type {
2968                    *collectibles
2969                        .entry(RawVc::TaskCell(
2970                            collectible.cell.task,
2971                            collectible.cell.cell,
2972                        ))
2973                        .or_insert(0) += count;
2974                }
2975            }
2976            if let Some(reader_id) = reader_id {
2977                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2978            }
2979        }
2980        if let Some(reader_id) = reader_id {
2981            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2982            let target = CollectiblesRef {
2983                task: task_id,
2984                collectible_type,
2985            };
2986            if !reader.remove_outdated_collectibles_dependencies(&target) {
2987                let _ = reader.add_collectibles_dependencies(target);
2988            }
2989        }
2990        collectibles
2991    }
2992
2993    fn emit_collectible(
2994        &self,
2995        collectible_type: TraitTypeId,
2996        collectible: RawVc,
2997        task_id: TaskId,
2998        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2999    ) {
3000        self.assert_valid_collectible(task_id, collectible);
3001
3002        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3003            panic!("Collectibles need to be resolved");
3004        };
3005        let cell = CellRef {
3006            task: collectible_task,
3007            cell,
3008        };
3009        operation::UpdateCollectibleOperation::run(
3010            task_id,
3011            CollectibleRef {
3012                collectible_type,
3013                cell,
3014            },
3015            1,
3016            self.execute_context(turbo_tasks),
3017        );
3018    }
3019
3020    fn unemit_collectible(
3021        &self,
3022        collectible_type: TraitTypeId,
3023        collectible: RawVc,
3024        count: u32,
3025        task_id: TaskId,
3026        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3027    ) {
3028        self.assert_valid_collectible(task_id, collectible);
3029
3030        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3031            panic!("Collectibles need to be resolved");
3032        };
3033        let cell = CellRef {
3034            task: collectible_task,
3035            cell,
3036        };
3037        operation::UpdateCollectibleOperation::run(
3038            task_id,
3039            CollectibleRef {
3040                collectible_type,
3041                cell,
3042            },
3043            -(i32::try_from(count).unwrap()),
3044            self.execute_context(turbo_tasks),
3045        );
3046    }
3047
3048    fn update_task_cell(
3049        &self,
3050        task_id: TaskId,
3051        cell: CellId,
3052        is_serializable_cell_content: bool,
3053        content: CellContent,
3054        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3055        verification_mode: VerificationMode,
3056        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3057    ) {
3058        operation::UpdateCellOperation::run(
3059            task_id,
3060            cell,
3061            content,
3062            is_serializable_cell_content,
3063            updated_key_hashes,
3064            verification_mode,
3065            self.execute_context(turbo_tasks),
3066        );
3067    }
3068
3069    fn mark_own_task_as_finished(
3070        &self,
3071        task: TaskId,
3072        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3073    ) {
3074        let mut ctx = self.execute_context(turbo_tasks);
3075        let mut task = ctx.task(task, TaskDataCategory::Data);
3076        if let Some(InProgressState::InProgress(box InProgressStateInner {
3077            marked_as_completed,
3078            ..
3079        })) = task.get_in_progress_mut()
3080        {
3081            *marked_as_completed = true;
3082            // TODO this should remove the dirty state (also check session_dependent)
3083            // but this would break some assumptions for strongly consistent reads.
3084            // Client tasks are not connected yet, so we wouldn't wait for them.
3085            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3086        }
3087    }
3088
3089    fn connect_task(
3090        &self,
3091        task: TaskId,
3092        parent_task: Option<TaskId>,
3093        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3094    ) {
3095        self.assert_not_persistent_calling_transient(parent_task, task, None);
3096        ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3097    }
3098
3099    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3100        let task_id = self.transient_task_id_factory.get();
3101        {
3102            let mut task = self.storage.access_mut(task_id);
3103            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3104        }
3105        #[cfg(feature = "verify_aggregation_graph")]
3106        self.root_tasks.lock().insert(task_id);
3107        task_id
3108    }
3109
3110    fn dispose_root_task(
3111        &self,
3112        task_id: TaskId,
3113        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3114    ) {
3115        #[cfg(feature = "verify_aggregation_graph")]
3116        self.root_tasks.lock().remove(&task_id);
3117
3118        let mut ctx = self.execute_context(turbo_tasks);
3119        let mut task = ctx.task(task_id, TaskDataCategory::All);
3120        let is_dirty = task.is_dirty();
3121        let has_dirty_containers = task.has_dirty_containers();
3122        if is_dirty.is_some() || has_dirty_containers {
3123            if let Some(activeness_state) = task.get_activeness_mut() {
3124                // We will finish the task, but it would be removed after the task is done
3125                activeness_state.unset_root_type();
3126                activeness_state.set_active_until_clean();
3127            };
3128        } else if let Some(activeness_state) = task.take_activeness() {
3129            // Technically nobody should be listening to this event, but just in case
3130            // we notify it anyway
3131            activeness_state.all_clean_event.notify(usize::MAX);
3132        }
3133    }
3134
3135    #[cfg(feature = "verify_aggregation_graph")]
3136    fn verify_aggregation_graph(
3137        &self,
3138        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3139        idle: bool,
3140    ) {
3141        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3142            return;
3143        }
3144        use std::{collections::VecDeque, env, io::stdout};
3145
3146        use crate::backend::operation::{get_uppers, is_aggregating_node};
3147
3148        let mut ctx = self.execute_context(turbo_tasks);
3149        let root_tasks = self.root_tasks.lock().clone();
3150
3151        for task_id in root_tasks.into_iter() {
3152            let mut queue = VecDeque::new();
3153            let mut visited = FxHashSet::default();
3154            let mut aggregated_nodes = FxHashSet::default();
3155            let mut collectibles = FxHashMap::default();
3156            let root_task_id = task_id;
3157            visited.insert(task_id);
3158            aggregated_nodes.insert(task_id);
3159            queue.push_back(task_id);
3160            let mut counter = 0;
3161            while let Some(task_id) = queue.pop_front() {
3162                counter += 1;
3163                if counter % 100000 == 0 {
3164                    println!(
3165                        "queue={}, visited={}, aggregated_nodes={}",
3166                        queue.len(),
3167                        visited.len(),
3168                        aggregated_nodes.len()
3169                    );
3170                }
3171                let task = ctx.task(task_id, TaskDataCategory::All);
3172                if idle && !self.is_idle.load(Ordering::Relaxed) {
3173                    return;
3174                }
3175
3176                let uppers = get_uppers(&task);
3177                if task_id != root_task_id
3178                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3179                {
3180                    panic!(
3181                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3182                         {:?})",
3183                        task_id,
3184                        task.get_task_description(),
3185                        uppers
3186                    );
3187                }
3188
3189                for (collectible, _) in task.iter_aggregated_collectibles() {
3190                    collectibles
3191                        .entry(*collectible)
3192                        .or_insert_with(|| (false, Vec::new()))
3193                        .1
3194                        .push(task_id);
3195                }
3196
3197                for (&collectible, &value) in task.iter_collectibles() {
3198                    if value > 0 {
3199                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3200                            *flag = true
3201                        } else {
3202                            panic!(
3203                                "Task {} has a collectible {:?} that is not in any upper task",
3204                                task_id, collectible
3205                            );
3206                        }
3207                    }
3208                }
3209
3210                let is_dirty = task.has_dirty();
3211                let has_dirty_container = task.has_dirty_containers();
3212                let should_be_in_upper = is_dirty || has_dirty_container;
3213
3214                let aggregation_number = get_aggregation_number(&task);
3215                if is_aggregating_node(aggregation_number) {
3216                    aggregated_nodes.insert(task_id);
3217                }
3218                // println!(
3219                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3220                //     ctx.get_task_description(task_id),
3221                //     uppers
3222                // );
3223
3224                for child_id in task.iter_children() {
3225                    // println!("{task_id}: child -> {child_id}");
3226                    if visited.insert(child_id) {
3227                        queue.push_back(child_id);
3228                    }
3229                }
3230                drop(task);
3231
3232                if should_be_in_upper {
3233                    for upper_id in uppers {
3234                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3235                        let in_upper = upper
3236                            .get_aggregated_dirty_containers(&task_id)
3237                            .is_some_and(|&dirty| dirty > 0);
3238                        if !in_upper {
3239                            let containers: Vec<_> = upper
3240                                .iter_aggregated_dirty_containers()
3241                                .map(|(&k, &v)| (k, v))
3242                                .collect();
3243                            let upper_task_desc = upper.get_task_description();
3244                            drop(upper);
3245                            panic!(
3246                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3247                                 ({})\nThese dirty containers are present:\n{:#?}",
3248                                task_id,
3249                                ctx.task(task_id, TaskDataCategory::Data)
3250                                    .get_task_description(),
3251                                upper_id,
3252                                upper_task_desc,
3253                                containers,
3254                            );
3255                        }
3256                    }
3257                }
3258            }
3259
3260            for (collectible, (flag, task_ids)) in collectibles {
3261                if !flag {
3262                    use std::io::Write;
3263                    let mut stdout = stdout().lock();
3264                    writeln!(
3265                        stdout,
3266                        "{:?} that is not emitted in any child task but in these aggregated \
3267                         tasks: {:#?}",
3268                        collectible,
3269                        task_ids
3270                            .iter()
3271                            .map(|t| format!(
3272                                "{t} {}",
3273                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3274                            ))
3275                            .collect::<Vec<_>>()
3276                    )
3277                    .unwrap();
3278
3279                    let task_id = collectible.cell.task;
3280                    let mut queue = {
3281                        let task = ctx.task(task_id, TaskDataCategory::All);
3282                        get_uppers(&task)
3283                    };
3284                    let mut visited = FxHashSet::default();
3285                    for &upper_id in queue.iter() {
3286                        visited.insert(upper_id);
3287                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3288                    }
3289                    while let Some(task_id) = queue.pop() {
3290                        let task = ctx.task(task_id, TaskDataCategory::All);
3291                        let desc = task.get_task_description();
3292                        let aggregated_collectible = task
3293                            .get_aggregated_collectibles(&collectible)
3294                            .copied()
3295                            .unwrap_or_default();
3296                        let uppers = get_uppers(&task);
3297                        drop(task);
3298                        writeln!(
3299                            stdout,
3300                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3301                        )
3302                        .unwrap();
3303                        if task_ids.contains(&task_id) {
3304                            writeln!(
3305                                stdout,
3306                                "Task has an upper connection to an aggregated task that doesn't \
3307                                 reference it. Upper connection is invalid!"
3308                            )
3309                            .unwrap();
3310                        }
3311                        for upper_id in uppers {
3312                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3313                            if !visited.contains(&upper_id) {
3314                                queue.push(upper_id);
3315                            }
3316                        }
3317                    }
3318                    panic!("See stdout for more details");
3319                }
3320            }
3321        }
3322    }
3323
3324    fn assert_not_persistent_calling_transient(
3325        &self,
3326        parent_id: Option<TaskId>,
3327        child_id: TaskId,
3328        cell_id: Option<CellId>,
3329    ) {
3330        if let Some(parent_id) = parent_id
3331            && !parent_id.is_transient()
3332            && child_id.is_transient()
3333        {
3334            self.panic_persistent_calling_transient(
3335                self.debug_get_task_description(parent_id),
3336                self.debug_get_cached_task_type(child_id).as_deref(),
3337                cell_id,
3338            );
3339        }
3340    }
3341
3342    fn panic_persistent_calling_transient(
3343        &self,
3344        parent: String,
3345        child: Option<&CachedTaskType>,
3346        cell_id: Option<CellId>,
3347    ) {
3348        let transient_reason = if let Some(child) = child {
3349            Cow::Owned(format!(
3350                " The callee is transient because it depends on:\n{}",
3351                self.debug_trace_transient_task(child, cell_id),
3352            ))
3353        } else {
3354            Cow::Borrowed("")
3355        };
3356        panic!(
3357            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3358            parent,
3359            child.map_or("unknown", |t| t.get_name()),
3360            transient_reason,
3361        );
3362    }
3363
3364    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3365        // these checks occur in a potentially hot codepath, but they're cheap
3366        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3367            // This should never happen: The collectible APIs use ResolvedVc
3368            let task_info = if let Some(col_task_ty) = collectible
3369                .try_get_task_id()
3370                .map(|t| self.debug_get_task_description(t))
3371            {
3372                Cow::Owned(format!(" (return type of {col_task_ty})"))
3373            } else {
3374                Cow::Borrowed("")
3375            };
3376            panic!("Collectible{task_info} must be a ResolvedVc")
3377        };
3378        if col_task_id.is_transient() && !task_id.is_transient() {
3379            let transient_reason =
3380                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3381                    Cow::Owned(format!(
3382                        ". The collectible is transient because it depends on:\n{}",
3383                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3384                    ))
3385                } else {
3386                    Cow::Borrowed("")
3387                };
3388            // this should never happen: How would a persistent function get a transient Vc?
3389            panic!(
3390                "Collectible is transient, transient collectibles cannot be emitted from \
3391                 persistent tasks{transient_reason}",
3392            )
3393        }
3394    }
3395}
3396
3397impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3398    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3399        self.0.startup(turbo_tasks);
3400    }
3401
3402    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3403        self.0.stopping();
3404    }
3405
3406    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3407        self.0.stop(turbo_tasks);
3408    }
3409
3410    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3411        self.0.idle_start(turbo_tasks);
3412    }
3413
3414    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3415        self.0.idle_end();
3416    }
3417
3418    fn get_or_create_persistent_task(
3419        &self,
3420        task_type: CachedTaskType,
3421        parent_task: Option<TaskId>,
3422        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3423    ) -> TaskId {
3424        self.0
3425            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3426    }
3427
3428    fn get_or_create_transient_task(
3429        &self,
3430        task_type: CachedTaskType,
3431        parent_task: Option<TaskId>,
3432        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3433    ) -> TaskId {
3434        self.0
3435            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3436    }
3437
3438    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3439        self.0.invalidate_task(task_id, turbo_tasks);
3440    }
3441
3442    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3443        self.0.invalidate_tasks(tasks, turbo_tasks);
3444    }
3445
3446    fn invalidate_tasks_set(
3447        &self,
3448        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3449        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3450    ) {
3451        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3452    }
3453
3454    fn invalidate_serialization(
3455        &self,
3456        task_id: TaskId,
3457        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3458    ) {
3459        self.0.invalidate_serialization(task_id, turbo_tasks);
3460    }
3461
3462    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3463        self.0.task_execution_canceled(task, turbo_tasks)
3464    }
3465
3466    fn try_start_task_execution(
3467        &self,
3468        task_id: TaskId,
3469        priority: TaskPriority,
3470        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3471    ) -> Option<TaskExecutionSpec<'_>> {
3472        self.0
3473            .try_start_task_execution(task_id, priority, turbo_tasks)
3474    }
3475
3476    fn task_execution_completed(
3477        &self,
3478        task_id: TaskId,
3479        result: Result<RawVc, TurboTasksExecutionError>,
3480        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3481        #[cfg(feature = "verify_determinism")] stateful: bool,
3482        has_invalidator: bool,
3483        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3484    ) -> bool {
3485        self.0.task_execution_completed(
3486            task_id,
3487            result,
3488            cell_counters,
3489            #[cfg(feature = "verify_determinism")]
3490            stateful,
3491            has_invalidator,
3492            turbo_tasks,
3493        )
3494    }
3495
3496    type BackendJob = TurboTasksBackendJob;
3497
3498    fn run_backend_job<'a>(
3499        &'a self,
3500        job: Self::BackendJob,
3501        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3502    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3503        self.0.run_backend_job(job, turbo_tasks)
3504    }
3505
3506    fn try_read_task_output(
3507        &self,
3508        task_id: TaskId,
3509        reader: Option<TaskId>,
3510        options: ReadOutputOptions,
3511        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3512    ) -> Result<Result<RawVc, EventListener>> {
3513        self.0
3514            .try_read_task_output(task_id, reader, options, turbo_tasks)
3515    }
3516
3517    fn try_read_task_cell(
3518        &self,
3519        task_id: TaskId,
3520        cell: CellId,
3521        reader: Option<TaskId>,
3522        options: ReadCellOptions,
3523        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3524    ) -> Result<Result<TypedCellContent, EventListener>> {
3525        self.0
3526            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3527    }
3528
3529    fn try_read_own_task_cell(
3530        &self,
3531        task_id: TaskId,
3532        cell: CellId,
3533        options: ReadCellOptions,
3534        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3535    ) -> Result<TypedCellContent> {
3536        self.0
3537            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3538    }
3539
3540    fn read_task_collectibles(
3541        &self,
3542        task_id: TaskId,
3543        collectible_type: TraitTypeId,
3544        reader: Option<TaskId>,
3545        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3546    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3547        self.0
3548            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3549    }
3550
3551    fn emit_collectible(
3552        &self,
3553        collectible_type: TraitTypeId,
3554        collectible: RawVc,
3555        task_id: TaskId,
3556        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3557    ) {
3558        self.0
3559            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3560    }
3561
3562    fn unemit_collectible(
3563        &self,
3564        collectible_type: TraitTypeId,
3565        collectible: RawVc,
3566        count: u32,
3567        task_id: TaskId,
3568        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3569    ) {
3570        self.0
3571            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3572    }
3573
3574    fn update_task_cell(
3575        &self,
3576        task_id: TaskId,
3577        cell: CellId,
3578        is_serializable_cell_content: bool,
3579        content: CellContent,
3580        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3581        verification_mode: VerificationMode,
3582        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3583    ) {
3584        self.0.update_task_cell(
3585            task_id,
3586            cell,
3587            is_serializable_cell_content,
3588            content,
3589            updated_key_hashes,
3590            verification_mode,
3591            turbo_tasks,
3592        );
3593    }
3594
3595    fn mark_own_task_as_finished(
3596        &self,
3597        task_id: TaskId,
3598        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3599    ) {
3600        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3601    }
3602
3603    fn connect_task(
3604        &self,
3605        task: TaskId,
3606        parent_task: Option<TaskId>,
3607        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3608    ) {
3609        self.0.connect_task(task, parent_task, turbo_tasks);
3610    }
3611
3612    fn create_transient_task(
3613        &self,
3614        task_type: TransientTaskType,
3615        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3616    ) -> TaskId {
3617        self.0.create_transient_task(task_type)
3618    }
3619
3620    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3621        self.0.dispose_root_task(task_id, turbo_tasks);
3622    }
3623
3624    fn task_statistics(&self) -> &TaskStatisticsApi {
3625        &self.0.task_statistics
3626    }
3627
3628    fn is_tracking_dependencies(&self) -> bool {
3629        self.0.options.dependency_tracking
3630    }
3631
3632    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3633        self.0.get_task_name(task, turbo_tasks)
3634    }
3635}
3636
3637enum DebugTraceTransientTask {
3638    Cached {
3639        task_name: &'static str,
3640        cell_type_id: Option<ValueTypeId>,
3641        cause_self: Option<Box<DebugTraceTransientTask>>,
3642        cause_args: Vec<DebugTraceTransientTask>,
3643    },
3644    /// This representation is used when this task is a duplicate of one previously shown
3645    Collapsed {
3646        task_name: &'static str,
3647        cell_type_id: Option<ValueTypeId>,
3648    },
3649    Uncached {
3650        cell_type_id: Option<ValueTypeId>,
3651    },
3652}
3653
3654impl DebugTraceTransientTask {
3655    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3656        let indent = "    ".repeat(level);
3657        f.write_str(&indent)?;
3658
3659        fn fmt_cell_type_id(
3660            f: &mut fmt::Formatter<'_>,
3661            cell_type_id: Option<ValueTypeId>,
3662        ) -> fmt::Result {
3663            if let Some(ty) = cell_type_id {
3664                write!(
3665                    f,
3666                    " (read cell of type {})",
3667                    get_value_type(ty).ty.global_name
3668                )
3669            } else {
3670                Ok(())
3671            }
3672        }
3673
3674        // write the name and type
3675        match self {
3676            Self::Cached {
3677                task_name,
3678                cell_type_id,
3679                ..
3680            }
3681            | Self::Collapsed {
3682                task_name,
3683                cell_type_id,
3684                ..
3685            } => {
3686                f.write_str(task_name)?;
3687                fmt_cell_type_id(f, *cell_type_id)?;
3688                if matches!(self, Self::Collapsed { .. }) {
3689                    f.write_str(" (collapsed)")?;
3690                }
3691            }
3692            Self::Uncached { cell_type_id } => {
3693                f.write_str("unknown transient task")?;
3694                fmt_cell_type_id(f, *cell_type_id)?;
3695            }
3696        }
3697        f.write_char('\n')?;
3698
3699        // write any extra "cause" information we might have
3700        if let Self::Cached {
3701            cause_self,
3702            cause_args,
3703            ..
3704        } = self
3705        {
3706            if let Some(c) = cause_self {
3707                writeln!(f, "{indent}  self:")?;
3708                c.fmt_indented(f, level + 1)?;
3709            }
3710            if !cause_args.is_empty() {
3711                writeln!(f, "{indent}  args:")?;
3712                for c in cause_args {
3713                    c.fmt_indented(f, level + 1)?;
3714                }
3715            }
3716        }
3717        Ok(())
3718    }
3719}
3720
3721impl fmt::Display for DebugTraceTransientTask {
3722    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3723        self.fmt_indented(f, 0)
3724    }
3725}
3726
3727// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3728fn far_future() -> Instant {
3729    // Roughly 30 years from now.
3730    // API does not provide a way to obtain max `Instant`
3731    // or convert specific date in the future to instant.
3732    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3733    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3734}
3735
3736/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3737/// buffer.
3738/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3739/// resulting buffer sizes.
3740fn encode_task_data(
3741    task: TaskId,
3742    data: &TaskStorage,
3743    category: SpecificTaskDataCategory,
3744    scratch_buffer: &mut TurboBincodeBuffer,
3745) -> Result<TurboBincodeBuffer> {
3746    scratch_buffer.clear();
3747    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3748    data.encode(category, &mut encoder)?;
3749
3750    if cfg!(feature = "verify_serialization") {
3751        TaskStorage::new()
3752            .decode(
3753                category,
3754                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3755            )
3756            .with_context(|| {
3757                format!(
3758                    "expected to be able to decode serialized data for '{category:?}' information \
3759                     for {task}"
3760                )
3761            })?;
3762    }
3763    Ok(SmallVec::from_slice(scratch_buffer))
3764}