Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9    borrow::Cow,
10    fmt::{self, Write},
11    future::Future,
12    hash::BuildHasherDefault,
13    mem::take,
14    pin::Pin,
15    sync::{
16        Arc, LazyLock,
17        atomic::{AtomicBool, AtomicU64, Ordering},
18    },
19    time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
33    ReadOutputOptions, ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT,
34    TaskExecutionReason, TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi,
35    TurboTasksPanic, ValueTypeId,
36    backend::{
37        Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
38        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40        VerificationMode,
41    },
42    event::{Event, EventDescription, EventListener},
43    macro_helpers::NativeFunction,
44    message_queue::{TimingEvent, TraceEvent},
45    registry::get_value_type,
46    scope::scope_and_block,
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51
52pub use self::{
53    operation::AnyOperation,
54    storage::{SpecificTaskDataCategory, TaskDataCategory},
55};
56#[cfg(feature = "trace_task_dirty")]
57use crate::backend::operation::TaskDirtyCause;
58use crate::{
59    backend::{
60        operation::{
61            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
64            connect_children, get_aggregation_number, get_uppers, is_root_node,
65            make_task_dirty_internal, prepare_new_children,
66        },
67        snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68        storage::Storage,
69        storage_schema::{TaskStorage, TaskStorageAccessors},
70    },
71    backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
72    data::{
73        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
74        InProgressState, InProgressStateInner, OutputValue, TransientTask,
75    },
76    error::TaskError,
77    utils::{
78        dash_map_drop_contents::drop_contents,
79        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
80        shard_amount::compute_shard_amount,
81    },
82};
83
84/// Threshold for parallelizing making dependent tasks dirty.
85/// If the number of dependent tasks exceeds this threshold,
86/// the operation will be parallelized.
87const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
88
89/// Configurable idle timeout for snapshot persistence.
90/// Defaults to 2 seconds if not set or if the value is invalid.
91static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
92    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
93        .ok()
94        .and_then(|v| v.parse::<u64>().ok())
95        .map(Duration::from_millis)
96        .unwrap_or(Duration::from_secs(2))
97});
98
99pub enum StorageMode {
100    /// Queries the storage for cache entries that don't exist locally.
101    ReadOnly,
102    /// Queries the storage for cache entries that don't exist locally.
103    /// Regularly pushes changes to the backing storage.
104    ReadWrite,
105    /// Queries the storage for cache entries that don't exist locally.
106    /// On shutdown, pushes all changes to the backing storage.
107    ReadWriteOnShutdown,
108}
109
110pub struct BackendOptions {
111    /// Enables dependency tracking.
112    ///
113    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
114    /// forever.
115    pub dependency_tracking: bool,
116
117    /// Enables active tracking.
118    ///
119    /// Automatically disabled when `dependency_tracking` is disabled.
120    ///
121    /// When disabled: All tasks are considered as active.
122    pub active_tracking: bool,
123
124    /// Enables the backing storage.
125    pub storage_mode: Option<StorageMode>,
126
127    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
128    /// datastructures. If `None`, it will use the available parallelism.
129    pub num_workers: Option<usize>,
130
131    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
132    pub small_preallocation: bool,
133}
134
135impl Default for BackendOptions {
136    fn default() -> Self {
137        Self {
138            dependency_tracking: true,
139            active_tracking: true,
140            storage_mode: Some(StorageMode::ReadWrite),
141            num_workers: None,
142            small_preallocation: false,
143        }
144    }
145}
146
147pub enum TurboTasksBackendJob {
148    InitialSnapshot,
149    FollowUpSnapshot,
150}
151
152pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
153
154struct TurboTasksBackendInner<B: BackingStorage> {
155    options: BackendOptions,
156
157    start_time: Instant,
158
159    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
160    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
161
162    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
163
164    storage: Storage,
165
166    /// Coordinates the operation/snapshot interleaving protocol. See
167    /// [`SnapshotCoordinator`] for details.
168    snapshot_coord: SnapshotCoordinator,
169    /// Serializes calls to `snapshot_and_persist`. The coordinator's
170    /// `begin_snapshot` asserts that snapshots don't overlap; this mutex
171    /// enforces that contract for our two callers (background loop and
172    /// `stop_and_wait`).
173    snapshot_in_progress: Mutex<()>,
174    /// The timestamp of the last started snapshot since [`Self::start_time`].
175    last_snapshot: AtomicU64,
176
177    stopping: AtomicBool,
178    stopping_event: Event,
179    idle_start_event: Event,
180    idle_end_event: Event,
181    #[cfg(feature = "verify_aggregation_graph")]
182    is_idle: AtomicBool,
183
184    task_statistics: TaskStatisticsApi,
185
186    backing_storage: B,
187
188    #[cfg(feature = "verify_aggregation_graph")]
189    root_tasks: Mutex<FxHashSet<TaskId>>,
190}
191
192impl<B: BackingStorage> TurboTasksBackend<B> {
193    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
194        Self(Arc::new(TurboTasksBackendInner::new(
195            options,
196            backing_storage,
197        )))
198    }
199
200    pub fn backing_storage(&self) -> &B {
201        &self.0.backing_storage
202    }
203}
204
205impl<B: BackingStorage> TurboTasksBackendInner<B> {
206    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
207        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
208        if !options.dependency_tracking {
209            options.active_tracking = false;
210        }
211        let small_preallocation = options.small_preallocation;
212        let next_task_id = backing_storage
213            .next_free_task_id()
214            .expect("Failed to get task id");
215        Self {
216            options,
217            start_time: Instant::now(),
218            persisted_task_id_factory: IdFactoryWithReuse::new(
219                next_task_id,
220                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
221            ),
222            transient_task_id_factory: IdFactoryWithReuse::new(
223                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
224                TaskId::MAX,
225            ),
226            task_cache: FxDashMap::default(),
227            storage: Storage::new(shard_amount, small_preallocation),
228            snapshot_coord: SnapshotCoordinator::new(),
229            snapshot_in_progress: Mutex::new(()),
230            last_snapshot: AtomicU64::new(0),
231            stopping: AtomicBool::new(false),
232            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
233            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
234            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
235            #[cfg(feature = "verify_aggregation_graph")]
236            is_idle: AtomicBool::new(false),
237            task_statistics: TaskStatisticsApi::default(),
238            backing_storage,
239            #[cfg(feature = "verify_aggregation_graph")]
240            root_tasks: Default::default(),
241        }
242    }
243
244    fn execute_context<'a>(
245        &'a self,
246        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
247    ) -> impl ExecuteContext<'a> {
248        ExecuteContextImpl::new(self, turbo_tasks)
249    }
250
251    fn suspending_requested(&self) -> bool {
252        self.should_persist() && self.snapshot_coord.snapshot_pending()
253    }
254
255    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
256        if self.should_persist() {
257            self.snapshot_coord.suspend_point(suspend);
258        }
259    }
260
261    pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
262        if !self.should_persist() {
263            return OperationGuard::noop();
264        }
265        self.snapshot_coord.begin_operation()
266    }
267
268    fn should_persist(&self) -> bool {
269        matches!(
270            self.options.storage_mode,
271            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
272        )
273    }
274
275    fn should_restore(&self) -> bool {
276        self.options.storage_mode.is_some()
277    }
278
279    fn should_track_dependencies(&self) -> bool {
280        self.options.dependency_tracking
281    }
282
283    fn should_track_activeness(&self) -> bool {
284        self.options.active_tracking
285    }
286
287    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
288        self.task_statistics
289            .map(|stats| stats.increment_cache_hit(native_fn));
290    }
291
292    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
293        self.task_statistics
294            .map(|stats| stats.increment_cache_miss(native_fn));
295    }
296
297    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
298    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
299    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
300    /// references for lazy name resolution.
301    fn task_error_to_turbo_tasks_execution_error(
302        &self,
303        error: &TaskError,
304        ctx: &mut impl ExecuteContext<'_>,
305    ) -> TurboTasksExecutionError {
306        match error {
307            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
308            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
309                message: item.message.clone(),
310                source: item
311                    .source
312                    .as_ref()
313                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
314            })),
315            TaskError::LocalTaskContext(local_task_context) => {
316                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
317                    name: local_task_context.name.clone(),
318                    source: local_task_context
319                        .source
320                        .as_ref()
321                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
322                }))
323            }
324            TaskError::TaskChain(chain) => {
325                let task_id = chain.last().unwrap();
326                let error = {
327                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
328                    if let Some(OutputValue::Error(error)) = task.get_output() {
329                        Some(error.clone())
330                    } else {
331                        None
332                    }
333                };
334                let error = error.map_or_else(
335                    || {
336                        // Eventual consistency will cause errors to no longer be available
337                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
338                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
339                                "Error no longer available",
340                            )),
341                            location: None,
342                        }))
343                    },
344                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
345                );
346                let mut current_error = error;
347                for &task_id in chain.iter().rev() {
348                    current_error =
349                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
350                            task_id,
351                            source: Some(current_error),
352                            turbo_tasks: ctx.turbo_tasks(),
353                        }));
354                }
355                current_error
356            }
357        }
358    }
359}
360
361/// Intermediate result of step 1 of task execution completion.
362struct TaskExecutionCompletePrepareResult {
363    pub new_children: FxHashSet<TaskId>,
364    pub is_now_immutable: bool,
365    #[cfg(feature = "verify_determinism")]
366    pub no_output_set: bool,
367    pub new_output: Option<OutputValue>,
368    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
369    pub is_recomputation: bool,
370}
371
372// Operations
373impl<B: BackingStorage> TurboTasksBackendInner<B> {
374    fn connect_child(
375        &self,
376        parent_task: Option<TaskId>,
377        child_task: TaskId,
378        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
379    ) {
380        operation::ConnectChildOperation::run(
381            parent_task,
382            child_task,
383            self.execute_context(turbo_tasks),
384        );
385    }
386
387    fn try_read_task_output(
388        self: &Arc<Self>,
389        task_id: TaskId,
390        reader: Option<TaskId>,
391        options: ReadOutputOptions,
392        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
393    ) -> Result<Result<RawVc, EventListener>> {
394        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
395
396        let mut ctx = self.execute_context(turbo_tasks);
397        let need_reader_task = if self.should_track_dependencies()
398            && !matches!(options.tracking, ReadTracking::Untracked)
399            && reader.is_some_and(|reader_id| reader_id != task_id)
400            && let Some(reader_id) = reader
401            && reader_id != task_id
402        {
403            Some(reader_id)
404        } else {
405            None
406        };
407        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
408            // Having a task_pair here is not optimal, but otherwise this would lead to a race
409            // condition. See below.
410            // TODO(sokra): solve that in a more performant way.
411            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
412            (task, Some(reader))
413        } else {
414            (ctx.task(task_id, TaskDataCategory::All), None)
415        };
416
417        fn listen_to_done_event(
418            reader_description: Option<EventDescription>,
419            tracking: ReadTracking,
420            done_event: &Event,
421        ) -> EventListener {
422            done_event.listen_with_note(move || {
423                move || {
424                    if let Some(reader_description) = reader_description.as_ref() {
425                        format!(
426                            "try_read_task_output from {} ({})",
427                            reader_description, tracking
428                        )
429                    } else {
430                        format!("try_read_task_output ({})", tracking)
431                    }
432                }
433            })
434        }
435
436        fn check_in_progress(
437            task: &impl TaskGuard,
438            reader_description: Option<EventDescription>,
439            tracking: ReadTracking,
440        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
441        {
442            match task.get_in_progress() {
443                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
444                    listen_to_done_event(reader_description, tracking, done_event),
445                ))),
446                Some(InProgressState::InProgress(box InProgressStateInner {
447                    done_event, ..
448                })) => Some(Ok(Err(listen_to_done_event(
449                    reader_description,
450                    tracking,
451                    done_event,
452                )))),
453                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
454                    "{} was canceled",
455                    task.get_task_description()
456                ))),
457                None => None,
458            }
459        }
460
461        if matches!(options.consistency, ReadConsistency::Strong) {
462            // Ensure it's an root node
463            loop {
464                let aggregation_number = get_aggregation_number(&task);
465                if is_root_node(aggregation_number) {
466                    break;
467                }
468                drop(task);
469                drop(reader_task);
470                {
471                    let _span = tracing::trace_span!(
472                        "make root node for strongly consistent read",
473                        task = self.debug_get_task_description(task_id)
474                    )
475                    .entered();
476                    AggregationUpdateQueue::run(
477                        AggregationUpdateJob::UpdateAggregationNumber {
478                            task_id,
479                            base_aggregation_number: u32::MAX,
480                            distance: None,
481                        },
482                        &mut ctx,
483                    );
484                }
485                (task, reader_task) = if let Some(reader_id) = need_reader_task {
486                    // TODO(sokra): see comment above
487                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
488                    (task, Some(reader))
489                } else {
490                    (ctx.task(task_id, TaskDataCategory::All), None)
491                }
492            }
493
494            let is_dirty = task.is_dirty();
495
496            // Check the dirty count of the root node
497            let has_dirty_containers = task.has_dirty_containers();
498            if has_dirty_containers || is_dirty.is_some() {
499                let activeness = task.get_activeness_mut();
500                let mut task_ids_to_schedule: Vec<_> = Vec::new();
501                // When there are dirty task, subscribe to the all_clean_event
502                let activeness = if let Some(activeness) = activeness {
503                    // This makes sure all tasks stay active and this task won't stale.
504                    // active_until_clean is automatically removed when this
505                    // task is clean.
506                    activeness.set_active_until_clean();
507                    activeness
508                } else {
509                    // If we don't have a root state, add one. This also makes sure all tasks stay
510                    // active and this task won't stale. active_until_clean
511                    // is automatically removed when this task is clean.
512                    if ctx.should_track_activeness() {
513                        // A newly added Activeness need to make sure to schedule the tasks
514                        task_ids_to_schedule = task.dirty_containers().collect();
515                        task_ids_to_schedule.push(task_id);
516                    }
517                    let activeness =
518                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
519                    activeness.set_active_until_clean();
520                    activeness
521                };
522                let listener = activeness.all_clean_event.listen_with_note(move || {
523                    let this = self.clone();
524                    let tt = turbo_tasks.pin();
525                    move || {
526                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
527                        let mut ctx = this.execute_context(tt);
528                        let mut visited = FxHashSet::default();
529                        fn indent(s: &str) -> String {
530                            s.split_inclusive('\n')
531                                .flat_map(|line: &str| ["  ", line].into_iter())
532                                .collect::<String>()
533                        }
534                        fn get_info(
535                            ctx: &mut impl ExecuteContext<'_>,
536                            task_id: TaskId,
537                            parent_and_count: Option<(TaskId, i32)>,
538                            visited: &mut FxHashSet<TaskId>,
539                        ) -> String {
540                            let task = ctx.task(task_id, TaskDataCategory::All);
541                            let is_dirty = task.is_dirty();
542                            let in_progress =
543                                task.get_in_progress()
544                                    .map_or("not in progress", |p| match p {
545                                        InProgressState::InProgress(_) => "in progress",
546                                        InProgressState::Scheduled { .. } => "scheduled",
547                                        InProgressState::Canceled => "canceled",
548                                    });
549                            let activeness = task.get_activeness().map_or_else(
550                                || "not active".to_string(),
551                                |activeness| format!("{activeness:?}"),
552                            );
553                            let aggregation_number = get_aggregation_number(&task);
554                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
555                            {
556                                let uppers = get_uppers(&task);
557                                !uppers.contains(&parent_task_id)
558                            } else {
559                                false
560                            };
561
562                            // Check the dirty count of the root node
563                            let has_dirty_containers = task.has_dirty_containers();
564
565                            let task_description = task.get_task_description();
566                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
567                                format!(", dirty({parent_priority})")
568                            } else {
569                                String::new()
570                            };
571                            let has_dirty_containers_label = if has_dirty_containers {
572                                ", dirty containers"
573                            } else {
574                                ""
575                            };
576                            let count = if let Some((_, count)) = parent_and_count {
577                                format!(" {count}")
578                            } else {
579                                String::new()
580                            };
581                            let mut info = format!(
582                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
583                                 {in_progress}, \
584                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
585                            );
586                            let children: Vec<_> = task.dirty_containers_with_count().collect();
587                            drop(task);
588
589                            if missing_upper {
590                                info.push_str("\n  ERROR: missing upper connection");
591                            }
592
593                            if has_dirty_containers || !children.is_empty() {
594                                writeln!(info, "\n  dirty tasks:").unwrap();
595
596                                for (child_task_id, count) in children {
597                                    let task_description = ctx
598                                        .task(child_task_id, TaskDataCategory::Data)
599                                        .get_task_description();
600                                    if visited.insert(child_task_id) {
601                                        let child_info = get_info(
602                                            ctx,
603                                            child_task_id,
604                                            Some((task_id, count)),
605                                            visited,
606                                        );
607                                        info.push_str(&indent(&child_info));
608                                        if !info.ends_with('\n') {
609                                            info.push('\n');
610                                        }
611                                    } else {
612                                        writeln!(
613                                            info,
614                                            "  {child_task_id} {task_description} {count} \
615                                             (already visited)"
616                                        )
617                                        .unwrap();
618                                    }
619                                }
620                            }
621                            info
622                        }
623                        let info = get_info(&mut ctx, task_id, None, &mut visited);
624                        format!(
625                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
626                        )
627                    }
628                });
629                drop(reader_task);
630                drop(task);
631                if !task_ids_to_schedule.is_empty() {
632                    let mut queue = AggregationUpdateQueue::new();
633                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
634                    queue.execute(&mut ctx);
635                }
636
637                return Ok(Err(listener));
638            }
639        }
640
641        let reader_description = reader_task
642            .as_ref()
643            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
644        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
645        {
646            return value;
647        }
648
649        if let Some(output) = task.get_output() {
650            let result = match output {
651                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
652                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
653                OutputValue::Error(error) => Err(error.clone()),
654            };
655            if let Some(mut reader_task) = reader_task.take()
656                && options.tracking.should_track(result.is_err())
657                && (!task.immutable() || cfg!(feature = "verify_immutable"))
658            {
659                #[cfg(feature = "trace_task_output_dependencies")]
660                let _span = tracing::trace_span!(
661                    "add output dependency",
662                    task = %task_id,
663                    dependent_task = ?reader
664                )
665                .entered();
666                let mut queue = LeafDistanceUpdateQueue::new();
667                let reader = reader.unwrap();
668                if task.add_output_dependent(reader) {
669                    // Ensure that dependent leaf distance is strictly monotonic increasing
670                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
671                    let reader_leaf_distance =
672                        reader_task.get_leaf_distance().copied().unwrap_or_default();
673                    if reader_leaf_distance.distance <= leaf_distance.distance {
674                        queue.push(
675                            reader,
676                            leaf_distance.distance,
677                            leaf_distance.max_distance_in_buffer,
678                        );
679                    }
680                }
681
682                drop(task);
683
684                // Note: We use `task_pair` earlier to lock the task and its reader at the same
685                // time. If we didn't and just locked the reader here, an invalidation could occur
686                // between grabbing the locks. If that happened, and if the task is "outdated" or
687                // doesn't have the dependency edge yet, the invalidation would be lost.
688
689                if !reader_task.remove_outdated_output_dependencies(&task_id) {
690                    let _ = reader_task.add_output_dependencies(task_id);
691                }
692                drop(reader_task);
693
694                queue.execute(&mut ctx);
695            } else {
696                drop(task);
697            }
698
699            return result.map_err(|error| {
700                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
701                    .with_task_context(task_id, turbo_tasks.pin())
702                    .into()
703            });
704        }
705        drop(reader_task);
706
707        let note = EventDescription::new(|| {
708            move || {
709                if let Some(reader) = reader_description.as_ref() {
710                    format!("try_read_task_output (recompute) from {reader}",)
711                } else {
712                    "try_read_task_output (recompute, untracked)".to_string()
713                }
714            }
715        });
716
717        // Output doesn't exist. We need to schedule the task to compute it.
718        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
719            TaskExecutionReason::OutputNotAvailable,
720            EventDescription::new(|| task.get_task_desc_fn()),
721            note,
722        );
723
724        // It's not possible that the task is InProgress at this point. If it is InProgress {
725        // done: true } it must have Output and would early return.
726        let old = task.set_in_progress(in_progress_state);
727        debug_assert!(old.is_none(), "InProgress already exists");
728        ctx.schedule_task(task, TaskPriority::Initial);
729
730        Ok(Err(listener))
731    }
732
733    fn try_read_task_cell(
734        &self,
735        task_id: TaskId,
736        reader: Option<TaskId>,
737        cell: CellId,
738        options: ReadCellOptions,
739        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
740    ) -> Result<Result<TypedCellContent, EventListener>> {
741        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
742
743        fn add_cell_dependency(
744            task_id: TaskId,
745            mut task: impl TaskGuard,
746            reader: Option<TaskId>,
747            reader_task: Option<impl TaskGuard>,
748            cell: CellId,
749            key: Option<u64>,
750        ) {
751            if let Some(mut reader_task) = reader_task
752                && (!task.immutable() || cfg!(feature = "verify_immutable"))
753            {
754                let reader = reader.unwrap();
755                let _ = task.add_cell_dependents((cell, key, reader));
756                drop(task);
757
758                // Note: We use `task_pair` earlier to lock the task and its reader at the same
759                // time. If we didn't and just locked the reader here, an invalidation could occur
760                // between grabbing the locks. If that happened, and if the task is "outdated" or
761                // doesn't have the dependency edge yet, the invalidation would be lost.
762
763                let target = CellRef {
764                    task: task_id,
765                    cell,
766                };
767                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
768                    let _ = reader_task.add_cell_dependencies((target, key));
769                }
770                drop(reader_task);
771            }
772        }
773
774        let ReadCellOptions {
775            tracking,
776            final_read_hint,
777        } = options;
778
779        let mut ctx = self.execute_context(turbo_tasks);
780        let (mut task, reader_task) = if self.should_track_dependencies()
781            && !matches!(tracking, ReadCellTracking::Untracked)
782            && let Some(reader_id) = reader
783            && reader_id != task_id
784        {
785            // Having a task_pair here is not optimal, but otherwise this would lead to a race
786            // condition. See below.
787            // TODO(sokra): solve that in a more performant way.
788            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
789            (task, Some(reader))
790        } else {
791            (ctx.task(task_id, TaskDataCategory::All), None)
792        };
793
794        let content = if final_read_hint {
795            task.remove_cell_data(&cell)
796        } else {
797            task.get_cell_data(&cell).cloned()
798        };
799        if let Some(content) = content {
800            if tracking.should_track(false) {
801                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
802            }
803            return Ok(Ok(TypedCellContent(
804                cell.type_id,
805                CellContent(Some(content)),
806            )));
807        }
808
809        let in_progress = task.get_in_progress();
810        if matches!(
811            in_progress,
812            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
813        ) {
814            return Ok(Err(self
815                .listen_to_cell(&mut task, task_id, &reader_task, cell)
816                .0));
817        }
818        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
819
820        // Check cell index range (cell might not exist at all)
821        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
822        let Some(max_id) = max_id else {
823            let task_desc = task.get_task_description();
824            if tracking.should_track(true) {
825                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
826            }
827            bail!(
828                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
829            );
830        };
831        if cell.index >= max_id {
832            let task_desc = task.get_task_description();
833            if tracking.should_track(true) {
834                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
835            }
836            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
837        }
838
839        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
840        // task to get the cell content.
841
842        // Bail early if the task was cancelled — no point in registering a listener
843        // on a task that won't execute again.
844        if is_cancelled {
845            bail!("{} was canceled", task.get_task_description());
846        }
847
848        // Listen to the cell and potentially schedule the task
849        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
850        drop(reader_task);
851        if !new_listener {
852            return Ok(Err(listener));
853        }
854
855        let _span = tracing::trace_span!(
856            "recomputation",
857            cell_type = get_value_type(cell.type_id).ty.global_name,
858            cell_index = cell.index
859        )
860        .entered();
861
862        let _ = task.add_scheduled(
863            TaskExecutionReason::CellNotAvailable,
864            EventDescription::new(|| task.get_task_desc_fn()),
865        );
866        ctx.schedule_task(task, TaskPriority::Initial);
867
868        Ok(Err(listener))
869    }
870
871    fn listen_to_cell(
872        &self,
873        task: &mut impl TaskGuard,
874        task_id: TaskId,
875        reader_task: &Option<impl TaskGuard>,
876        cell: CellId,
877    ) -> (EventListener, bool) {
878        let note = || {
879            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
880            move || {
881                if let Some(reader_desc) = reader_desc.as_ref() {
882                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
883                } else {
884                    "try_read_task_cell (in progress, untracked)".to_string()
885                }
886            }
887        };
888        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
889            // Someone else is already computing the cell
890            let listener = in_progress.event.listen_with_note(note);
891            return (listener, false);
892        }
893        let in_progress = InProgressCellState::new(task_id, cell);
894        let listener = in_progress.event.listen_with_note(note);
895        let old = task.insert_in_progress_cells(cell, in_progress);
896        debug_assert!(old.is_none(), "InProgressCell already exists");
897        (listener, true)
898    }
899
900    fn snapshot_and_persist(
901        &self,
902        parent_span: Option<tracing::Id>,
903        reason: &str,
904        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
905    ) -> Result<(Instant, bool), anyhow::Error> {
906        let snapshot_span =
907            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
908                .entered();
909        // Serialize snapshots. The internal protocol (snapshot_mode, snapshot
910        // request bit, suspended_operations) assumes only one snapshot runs at
911        // a time. Held for the entire snapshot lifecycle.
912        let _snapshot_in_progress = self.snapshot_in_progress.lock();
913        let start = Instant::now();
914        // SystemTime for wall-clock timestamps in trace events (milliseconds
915        // since epoch). Instant is monotonic but has no defined epoch, so it
916        // can't be used for cross-process trace correlation.
917        let wall_start = SystemTime::now();
918        debug_assert!(self.should_persist());
919
920        let mut snapshot_phase = {
921            let _span = tracing::info_span!("blocking").entered();
922            self.snapshot_coord.begin_snapshot()
923        };
924        // Enter snapshot mode, which atomically reads and resets the modified count.
925        // Checking after start_snapshot ensures no concurrent increments can race.
926        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
927
928        let suspended_operations = snapshot_phase.take_suspended_operations();
929
930        let snapshot_time = Instant::now();
931        drop(snapshot_phase);
932
933        if !has_modifications {
934            // No tasks modified since the last snapshot — drop the guard (which
935            // calls end_snapshot) and skip the expensive O(N) scan.
936            drop(snapshot_guard);
937            return Ok((start, false));
938        }
939
940        #[cfg(feature = "print_cache_item_size")]
941        #[derive(Default)]
942        struct TaskCacheStats {
943            data: usize,
944            #[cfg(feature = "print_cache_item_size_with_compressed")]
945            data_compressed: usize,
946            data_count: usize,
947            meta: usize,
948            #[cfg(feature = "print_cache_item_size_with_compressed")]
949            meta_compressed: usize,
950            meta_count: usize,
951            upper_count: usize,
952            collectibles_count: usize,
953            aggregated_collectibles_count: usize,
954            children_count: usize,
955            followers_count: usize,
956            collectibles_dependents_count: usize,
957            aggregated_dirty_containers_count: usize,
958            output_size: usize,
959        }
960        /// Formats a byte size, optionally including the compressed size when the
961        /// `print_cache_item_size_with_compressed` feature is enabled.
962        #[cfg(feature = "print_cache_item_size")]
963        struct FormatSizes {
964            size: usize,
965            #[cfg(feature = "print_cache_item_size_with_compressed")]
966            compressed_size: usize,
967        }
968        #[cfg(feature = "print_cache_item_size")]
969        impl std::fmt::Display for FormatSizes {
970            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
971                use turbo_tasks::util::FormatBytes;
972                #[cfg(feature = "print_cache_item_size_with_compressed")]
973                {
974                    write!(
975                        f,
976                        "{} ({} compressed)",
977                        FormatBytes(self.size),
978                        FormatBytes(self.compressed_size)
979                    )
980                }
981                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
982                {
983                    write!(f, "{}", FormatBytes(self.size))
984                }
985            }
986        }
987        #[cfg(feature = "print_cache_item_size")]
988        impl TaskCacheStats {
989            #[cfg(feature = "print_cache_item_size_with_compressed")]
990            fn compressed_size(data: &[u8]) -> Result<usize> {
991                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
992                    data,
993                    &mut Vec::new(),
994                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
995                )?)
996            }
997
998            fn add_data(&mut self, data: &[u8]) {
999                self.data += data.len();
1000                #[cfg(feature = "print_cache_item_size_with_compressed")]
1001                {
1002                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1003                }
1004                self.data_count += 1;
1005            }
1006
1007            fn add_meta(&mut self, data: &[u8]) {
1008                self.meta += data.len();
1009                #[cfg(feature = "print_cache_item_size_with_compressed")]
1010                {
1011                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1012                }
1013                self.meta_count += 1;
1014            }
1015
1016            fn add_counts(&mut self, storage: &TaskStorage) {
1017                let counts = storage.meta_counts();
1018                self.upper_count += counts.upper;
1019                self.collectibles_count += counts.collectibles;
1020                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1021                self.children_count += counts.children;
1022                self.followers_count += counts.followers;
1023                self.collectibles_dependents_count += counts.collectibles_dependents;
1024                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1025                if let Some(output) = storage.get_output() {
1026                    use turbo_bincode::turbo_bincode_encode;
1027
1028                    self.output_size += turbo_bincode_encode(&output)
1029                        .map(|data| data.len())
1030                        .unwrap_or(0);
1031                }
1032            }
1033
1034            /// Returns the task name used as the stats grouping key.
1035            fn task_name(storage: &TaskStorage) -> String {
1036                storage
1037                    .get_persistent_task_type()
1038                    .map(|t| t.to_string())
1039                    .unwrap_or_else(|| "<unknown>".to_string())
1040            }
1041
1042            /// Returns the primary sort key: compressed total when
1043            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1044            fn sort_key(&self) -> usize {
1045                #[cfg(feature = "print_cache_item_size_with_compressed")]
1046                {
1047                    self.data_compressed + self.meta_compressed
1048                }
1049                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1050                {
1051                    self.data + self.meta
1052                }
1053            }
1054
1055            fn format_total(&self) -> FormatSizes {
1056                FormatSizes {
1057                    size: self.data + self.meta,
1058                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1059                    compressed_size: self.data_compressed + self.meta_compressed,
1060                }
1061            }
1062
1063            fn format_data(&self) -> FormatSizes {
1064                FormatSizes {
1065                    size: self.data,
1066                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1067                    compressed_size: self.data_compressed,
1068                }
1069            }
1070
1071            fn format_avg_data(&self) -> FormatSizes {
1072                FormatSizes {
1073                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1074                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1075                    compressed_size: self
1076                        .data_compressed
1077                        .checked_div(self.data_count)
1078                        .unwrap_or(0),
1079                }
1080            }
1081
1082            fn format_meta(&self) -> FormatSizes {
1083                FormatSizes {
1084                    size: self.meta,
1085                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1086                    compressed_size: self.meta_compressed,
1087                }
1088            }
1089
1090            fn format_avg_meta(&self) -> FormatSizes {
1091                FormatSizes {
1092                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1093                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1094                    compressed_size: self
1095                        .meta_compressed
1096                        .checked_div(self.meta_count)
1097                        .unwrap_or(0),
1098                }
1099            }
1100        }
1101        #[cfg(feature = "print_cache_item_size")]
1102        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1103            Mutex::new(FxHashMap::default());
1104
1105        // Encode each task's modified categories. We only encode categories with `modified` set,
1106        // meaning the category was actually dirtied. Categories restored from disk but never
1107        // modified don't need re-persisting since the on-disk version is still valid.
1108        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1109        // flags were copied from the live task at snapshot creation time, reflecting which
1110        // categories were dirtied before the snapshot was taken.
1111        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1112            let encode_category = |task_id: TaskId,
1113                                   data: &TaskStorage,
1114                                   category: SpecificTaskDataCategory,
1115                                   buffer: &mut TurboBincodeBuffer|
1116             -> Option<TurboBincodeBuffer> {
1117                match encode_task_data(task_id, data, category, buffer) {
1118                    Ok(encoded) => {
1119                        #[cfg(feature = "print_cache_item_size")]
1120                        {
1121                            let mut stats = task_cache_stats.lock();
1122                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1123                            match category {
1124                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1125                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1126                            }
1127                        }
1128                        Some(encoded)
1129                    }
1130                    Err(err) => {
1131                        panic!(
1132                            "Serializing task {} failed ({:?}): {:?}",
1133                            self.debug_get_task_description(task_id),
1134                            category,
1135                            err
1136                        );
1137                    }
1138                }
1139            };
1140            if task_id.is_transient() {
1141                unreachable!("transient task_ids should never be enqueued to be persisted");
1142            }
1143
1144            let encode_meta = inner.flags.meta_modified();
1145            let encode_data = inner.flags.data_modified();
1146
1147            #[cfg(feature = "print_cache_item_size")]
1148            if encode_data || encode_meta {
1149                task_cache_stats
1150                    .lock()
1151                    .entry(TaskCacheStats::task_name(inner))
1152                    .or_default()
1153                    .add_counts(inner);
1154            }
1155
1156            let meta = if encode_meta {
1157                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1158            } else {
1159                None
1160            };
1161
1162            let data = if encode_data {
1163                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1164            } else {
1165                None
1166            };
1167            let task_type_hash = if inner.flags.new_task() {
1168                let task_type = inner.get_persistent_task_type().expect(
1169                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1170                     creation for persistent tasks uses a single ExecutionContextImpl for \
1171                     creating the task (which sets new_task) and connect_child (which sets \
1172                     persistent_task_type) and take_snapshot waits for all operations to complete \
1173                     or suspend before we start snapshotting.  So task creation will always set \
1174                     the task_type.",
1175                );
1176                Some(compute_task_type_hash(task_type))
1177            } else {
1178                None
1179            };
1180
1181            SnapshotItem {
1182                task_id,
1183                meta,
1184                data,
1185                task_type_hash,
1186            }
1187        };
1188
1189        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1190
1191        drop(snapshot_span);
1192        let snapshot_duration = start.elapsed();
1193        let task_count = task_snapshots.len();
1194
1195        if task_snapshots.is_empty() {
1196            // This should be impossible — if we got here, modified_count was nonzero, and every
1197            // modification that increments the count also failed during encoding.
1198            std::hint::cold_path();
1199            return Ok((snapshot_time, false));
1200        }
1201
1202        let persist_start = Instant::now();
1203        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1204        {
1205            // Tasks were already consumed by take_snapshot, so a future snapshot
1206            // would not re-persist them — returning an error signals to the caller
1207            // that further persist attempts would corrupt the task graph in storage.
1208            self.backing_storage
1209                .save_snapshot(suspended_operations, task_snapshots)?;
1210            #[cfg(feature = "print_cache_item_size")]
1211            {
1212                let mut task_cache_stats = task_cache_stats
1213                    .into_inner()
1214                    .into_iter()
1215                    .collect::<Vec<_>>();
1216                if !task_cache_stats.is_empty() {
1217                    use turbo_tasks::util::FormatBytes;
1218
1219                    use crate::utils::markdown_table::print_markdown_table;
1220
1221                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1222                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1223                    });
1224
1225                    println!(
1226                        "Task cache stats: {}",
1227                        FormatSizes {
1228                            size: task_cache_stats
1229                                .iter()
1230                                .map(|(_, s)| s.data + s.meta)
1231                                .sum::<usize>(),
1232                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1233                            compressed_size: task_cache_stats
1234                                .iter()
1235                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1236                                .sum::<usize>()
1237                        },
1238                    );
1239
1240                    print_markdown_table(
1241                        [
1242                            "Task",
1243                            " Total Size",
1244                            " Data Size",
1245                            " Data Count x Avg",
1246                            " Data Count x Avg",
1247                            " Meta Size",
1248                            " Meta Count x Avg",
1249                            " Meta Count x Avg",
1250                            " Uppers",
1251                            " Coll",
1252                            " Agg Coll",
1253                            " Children",
1254                            " Followers",
1255                            " Coll Deps",
1256                            " Agg Dirty",
1257                            " Output Size",
1258                        ],
1259                        task_cache_stats.iter(),
1260                        |(task_desc, stats)| {
1261                            [
1262                                task_desc.to_string(),
1263                                format!(" {}", stats.format_total()),
1264                                format!(" {}", stats.format_data()),
1265                                format!(" {} x", stats.data_count),
1266                                format!("{}", stats.format_avg_data()),
1267                                format!(" {}", stats.format_meta()),
1268                                format!(" {} x", stats.meta_count),
1269                                format!("{}", stats.format_avg_meta()),
1270                                format!(" {}", stats.upper_count),
1271                                format!(" {}", stats.collectibles_count),
1272                                format!(" {}", stats.aggregated_collectibles_count),
1273                                format!(" {}", stats.children_count),
1274                                format!(" {}", stats.followers_count),
1275                                format!(" {}", stats.collectibles_dependents_count),
1276                                format!(" {}", stats.aggregated_dirty_containers_count),
1277                                format!(" {}", FormatBytes(stats.output_size)),
1278                            ]
1279                        },
1280                    );
1281                }
1282            }
1283        }
1284
1285        let elapsed = start.elapsed();
1286        let persist_duration = persist_start.elapsed();
1287        // avoid spamming the event queue with information about fast operations
1288        if elapsed > Duration::from_secs(10) {
1289            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1290                "Finished writing to filesystem cache".to_string(),
1291                elapsed,
1292            )));
1293        }
1294
1295        let wall_start_ms = wall_start
1296            .duration_since(SystemTime::UNIX_EPOCH)
1297            .unwrap_or_default()
1298            // as_millis_f64 is not stable yet
1299            .as_secs_f64()
1300            * 1000.0;
1301        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1302        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1303            "turbopack-persistence",
1304            wall_start_ms,
1305            wall_end_ms,
1306            vec![
1307                ("reason", serde_json::Value::from(reason)),
1308                (
1309                    "snapshot_duration_ms",
1310                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1311                ),
1312                (
1313                    "persist_duration_ms",
1314                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1315                ),
1316                ("task_count", serde_json::Value::from(task_count)),
1317            ],
1318        )));
1319
1320        Ok((snapshot_time, true))
1321    }
1322
1323    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1324        if self.should_restore() {
1325            // Continue all uncompleted operations
1326            // They can't be interrupted by a snapshot since the snapshotting job has not been
1327            // scheduled yet.
1328            let uncompleted_operations = self
1329                .backing_storage
1330                .uncompleted_operations()
1331                .expect("Failed to get uncompleted operations");
1332            if !uncompleted_operations.is_empty() {
1333                let mut ctx = self.execute_context(turbo_tasks);
1334                for op in uncompleted_operations {
1335                    op.execute(&mut ctx);
1336                }
1337            }
1338        }
1339
1340        // Only when it should write regularly to the storage, we schedule the initial snapshot
1341        // job.
1342        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1343            // Schedule the snapshot job
1344            let _span = trace_span!("persisting background job").entered();
1345            let _span = tracing::info_span!("thread").entered();
1346            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1347        }
1348    }
1349
1350    fn stopping(&self) {
1351        self.stopping.store(true, Ordering::Release);
1352        self.stopping_event.notify(usize::MAX);
1353    }
1354
1355    #[allow(unused_variables)]
1356    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1357        #[cfg(feature = "verify_aggregation_graph")]
1358        {
1359            self.is_idle.store(false, Ordering::Release);
1360            self.verify_aggregation_graph(turbo_tasks, false);
1361        }
1362        if self.should_persist()
1363            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1364        {
1365            eprintln!("Persisting failed during shutdown: {err:?}");
1366        }
1367        drop_contents(&self.task_cache);
1368        self.storage.drop_contents();
1369        if let Err(err) = self.backing_storage.shutdown() {
1370            println!("Shutting down failed: {err}");
1371        }
1372    }
1373
1374    #[allow(unused_variables)]
1375    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1376        self.idle_start_event.notify(usize::MAX);
1377
1378        #[cfg(feature = "verify_aggregation_graph")]
1379        {
1380            use tokio::select;
1381
1382            self.is_idle.store(true, Ordering::Release);
1383            let this = self.clone();
1384            let turbo_tasks = turbo_tasks.pin();
1385            tokio::task::spawn(async move {
1386                select! {
1387                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1388                        // do nothing
1389                    }
1390                    _ = this.idle_end_event.listen() => {
1391                        return;
1392                    }
1393                }
1394                if !this.is_idle.load(Ordering::Relaxed) {
1395                    return;
1396                }
1397                this.verify_aggregation_graph(&*turbo_tasks, true);
1398            });
1399        }
1400    }
1401
1402    fn idle_end(&self) {
1403        #[cfg(feature = "verify_aggregation_graph")]
1404        self.is_idle.store(false, Ordering::Release);
1405        self.idle_end_event.notify(usize::MAX);
1406    }
1407
1408    fn get_or_create_task(
1409        &self,
1410        native_fn: &'static NativeFunction,
1411        this: Option<RawVc>,
1412        arg: &mut dyn StackDynTaskInputs,
1413        parent_task: Option<TaskId>,
1414        persistence: TaskPersistence,
1415        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1416    ) -> TaskId {
1417        let transient = matches!(persistence, TaskPersistence::Transient);
1418
1419        if transient
1420            && let Some(parent_task) = parent_task
1421            && !parent_task.is_transient()
1422        {
1423            let task_type = CachedTaskType {
1424                native_fn,
1425                this,
1426                arg: arg.take_box(),
1427            };
1428            self.panic_persistent_calling_transient(
1429                self.debug_get_task_description(parent_task),
1430                Some(&task_type),
1431                /* cell_id */ None,
1432            );
1433        }
1434
1435        let is_root = native_fn.is_root;
1436
1437        // Compute hash and shard index once from borrowed components (no heap allocation).
1438        let arg_ref = arg.as_ref();
1439        let hash = CachedTaskType::hash_from_components(
1440            self.task_cache.hasher(),
1441            native_fn,
1442            this,
1443            arg_ref,
1444        );
1445        // Locate the shard once so that the read-only lookup and any
1446        // write-lock retry below share the same reference (saves a modulo +
1447        // memory lookup on the miss path).
1448        let shard = get_shard(&self.task_cache, hash);
1449
1450        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1451        // Use a read lock rather than a write lock to avoid contention. connect_child
1452        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1453        if let Some(task_id) =
1454            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1455        {
1456            self.track_cache_hit_by_fn(native_fn);
1457            self.connect_child(parent_task, task_id, turbo_tasks);
1458            return task_id;
1459        }
1460
1461        // Step 2: Check backing storage using borrowed components (no box needed yet).
1462        let mut ctx = self.execute_context(turbo_tasks);
1463
1464        let mut is_new = false;
1465
1466        // Task exists in backing storage.
1467        // We only need to insert it into the in-memory cache.
1468        let task_id = if !transient
1469            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1470        {
1471            self.track_cache_hit_by_fn(native_fn);
1472            // Step 3a: Insert into in-memory cache using the pre-located shard.
1473            // Use the existing Arc from storage to avoid a duplicate allocation.
1474            match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| {
1475                k.eq_components(native_fn, this, arg_ref)
1476            }) {
1477                RawEntry::Occupied(_) => {}
1478                RawEntry::Vacant(e) => {
1479                    e.insert(stored_type, task_id);
1480                }
1481            };
1482            task_id
1483        } else {
1484            match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| {
1485                k.eq_components(native_fn, this, arg_ref)
1486            }) {
1487                RawEntry::Occupied(e) => {
1488                    // Another thread beat us to creating this task — use their task_id.
1489                    // They will handle logging the new task as modified.
1490                    let task_id = *e.get();
1491                    drop(e);
1492                    self.track_cache_hit_by_fn(native_fn);
1493                    task_id
1494                }
1495                RawEntry::Vacant(e) => {
1496                    // Only now do we force the allocation.
1497                    // NOTE: if our caller had to perform resolution, then this will have already
1498                    // been boxed and take_box just takes it.
1499                    let task_type = Arc::new(CachedTaskType {
1500                        native_fn,
1501                        this,
1502                        arg: arg.take_box(),
1503                    });
1504                    let task_id = if transient {
1505                        self.transient_task_id_factory.get()
1506                    } else {
1507                        self.persisted_task_id_factory.get()
1508                    };
1509                    // Initialize storage BEFORE making task_id visible in the cache.
1510                    // This ensures any thread that reads task_id from the cache sees
1511                    // the storage entry already initialized (restored flags set).
1512                    self.storage
1513                        .initialize_new_task(task_id, Some(task_type.clone()));
1514                    // insert() consumes e, releasing the shard write lock.
1515                    e.insert(task_type, task_id);
1516                    self.track_cache_miss_by_fn(native_fn);
1517                    is_new = true;
1518                    task_id
1519                }
1520            }
1521        };
1522
1523        if is_new && is_root {
1524            AggregationUpdateQueue::run(
1525                AggregationUpdateJob::UpdateAggregationNumber {
1526                    task_id,
1527                    base_aggregation_number: u32::MAX,
1528                    distance: None,
1529                },
1530                &mut ctx,
1531            );
1532        }
1533
1534        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1535
1536        task_id
1537    }
1538
1539    /// Generate an object that implements [`fmt::Display`] explaining why the given
1540    /// [`CachedTaskType`] is transient.
1541    fn debug_trace_transient_task(
1542        &self,
1543        task_type: &CachedTaskType,
1544        cell_id: Option<CellId>,
1545    ) -> DebugTraceTransientTask {
1546        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1547        // from tracing the same task many times, so use a visited_set
1548        fn inner_id(
1549            backend: &TurboTasksBackendInner<impl BackingStorage>,
1550            task_id: TaskId,
1551            cell_type_id: Option<ValueTypeId>,
1552            visited_set: &mut FxHashSet<TaskId>,
1553        ) -> DebugTraceTransientTask {
1554            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1555                if visited_set.contains(&task_id) {
1556                    let task_name = task_type.get_name();
1557                    DebugTraceTransientTask::Collapsed {
1558                        task_name,
1559                        cell_type_id,
1560                    }
1561                } else {
1562                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1563                }
1564            } else {
1565                DebugTraceTransientTask::Uncached { cell_type_id }
1566            }
1567        }
1568        fn inner_cached(
1569            backend: &TurboTasksBackendInner<impl BackingStorage>,
1570            task_type: &CachedTaskType,
1571            cell_type_id: Option<ValueTypeId>,
1572            visited_set: &mut FxHashSet<TaskId>,
1573        ) -> DebugTraceTransientTask {
1574            let task_name = task_type.get_name();
1575
1576            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1577                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1578                    // `task_id` should never be `None` at this point, as that would imply a
1579                    // non-local task is returning a local `Vc`...
1580                    // Just ignore if it happens, as we're likely already panicking.
1581                    return None;
1582                };
1583                if task_id.is_transient() {
1584                    Some(Box::new(inner_id(
1585                        backend,
1586                        task_id,
1587                        cause_self_raw_vc.try_get_type_id(),
1588                        visited_set,
1589                    )))
1590                } else {
1591                    None
1592                }
1593            });
1594            let cause_args = task_type
1595                .arg
1596                .get_raw_vcs()
1597                .into_iter()
1598                .filter_map(|raw_vc| {
1599                    let Some(task_id) = raw_vc.try_get_task_id() else {
1600                        // `task_id` should never be `None` (see comment above)
1601                        return None;
1602                    };
1603                    if !task_id.is_transient() {
1604                        return None;
1605                    }
1606                    Some((task_id, raw_vc.try_get_type_id()))
1607                })
1608                .collect::<IndexSet<_>>() // dedupe
1609                .into_iter()
1610                .map(|(task_id, cell_type_id)| {
1611                    inner_id(backend, task_id, cell_type_id, visited_set)
1612                })
1613                .collect();
1614
1615            DebugTraceTransientTask::Cached {
1616                task_name,
1617                cell_type_id,
1618                cause_self,
1619                cause_args,
1620            }
1621        }
1622        inner_cached(
1623            self,
1624            task_type,
1625            cell_id.map(|c| c.type_id),
1626            &mut FxHashSet::default(),
1627        )
1628    }
1629
1630    fn invalidate_task(
1631        &self,
1632        task_id: TaskId,
1633        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1634    ) {
1635        if !self.should_track_dependencies() {
1636            panic!("Dependency tracking is disabled so invalidation is not allowed");
1637        }
1638        operation::InvalidateOperation::run(
1639            smallvec![task_id],
1640            #[cfg(feature = "trace_task_dirty")]
1641            TaskDirtyCause::Invalidator,
1642            self.execute_context(turbo_tasks),
1643        );
1644    }
1645
1646    fn invalidate_tasks(
1647        &self,
1648        tasks: &[TaskId],
1649        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1650    ) {
1651        if !self.should_track_dependencies() {
1652            panic!("Dependency tracking is disabled so invalidation is not allowed");
1653        }
1654        operation::InvalidateOperation::run(
1655            tasks.iter().copied().collect(),
1656            #[cfg(feature = "trace_task_dirty")]
1657            TaskDirtyCause::Unknown,
1658            self.execute_context(turbo_tasks),
1659        );
1660    }
1661
1662    fn invalidate_tasks_set(
1663        &self,
1664        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1665        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1666    ) {
1667        if !self.should_track_dependencies() {
1668            panic!("Dependency tracking is disabled so invalidation is not allowed");
1669        }
1670        operation::InvalidateOperation::run(
1671            tasks.iter().copied().collect(),
1672            #[cfg(feature = "trace_task_dirty")]
1673            TaskDirtyCause::Unknown,
1674            self.execute_context(turbo_tasks),
1675        );
1676    }
1677
1678    fn invalidate_serialization(
1679        &self,
1680        task_id: TaskId,
1681        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1682    ) {
1683        if task_id.is_transient() {
1684            return;
1685        }
1686        let mut ctx = self.execute_context(turbo_tasks);
1687        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1688        task.invalidate_serialization();
1689    }
1690
1691    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1692        let task = self.storage.access_mut(task_id);
1693        if let Some(value) = task.get_persistent_task_type() {
1694            format!("{task_id:?} {}", value)
1695        } else if let Some(value) = task.get_transient_task_type() {
1696            format!("{task_id:?} {}", value)
1697        } else {
1698            format!("{task_id:?} unknown")
1699        }
1700    }
1701
1702    fn get_task_name(
1703        &self,
1704        task_id: TaskId,
1705        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1706    ) -> String {
1707        let mut ctx = self.execute_context(turbo_tasks);
1708        let task = ctx.task(task_id, TaskDataCategory::Data);
1709        if let Some(value) = task.get_persistent_task_type() {
1710            value.to_string()
1711        } else if let Some(value) = task.get_transient_task_type() {
1712            value.to_string()
1713        } else {
1714            "unknown".to_string()
1715        }
1716    }
1717
1718    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1719        let task = self.storage.access_mut(task_id);
1720        task.get_persistent_task_type().cloned()
1721    }
1722
1723    fn task_execution_canceled(
1724        &self,
1725        task_id: TaskId,
1726        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1727    ) {
1728        let mut ctx = self.execute_context(turbo_tasks);
1729        let mut task = ctx.task(task_id, TaskDataCategory::All);
1730        if let Some(in_progress) = task.take_in_progress() {
1731            match in_progress {
1732                InProgressState::Scheduled {
1733                    done_event,
1734                    reason: _,
1735                } => done_event.notify(usize::MAX),
1736                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1737                    done_event.notify(usize::MAX)
1738                }
1739                InProgressState::Canceled => {}
1740            }
1741        }
1742        // Notify any readers waiting on in-progress cells so their listeners
1743        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1744        let in_progress_cells = task.take_in_progress_cells();
1745        if let Some(ref cells) = in_progress_cells {
1746            for state in cells.values() {
1747                state.event.notify(usize::MAX);
1748            }
1749        }
1750
1751        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1752        // in the next session. Without this, any reader that encounters the cancelled task
1753        // records an error in its output. That error is persisted and would poison
1754        // subsequent builds. By marking the task session-dependent dirty, the next build
1755        // re-executes it, which invalidates dependents and corrects the stale errors.
1756        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1757            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1758        } else {
1759            None
1760        };
1761
1762        let old = task.set_in_progress(InProgressState::Canceled);
1763        debug_assert!(old.is_none(), "InProgress already exists");
1764        drop(task);
1765
1766        if let Some(data_update) = data_update {
1767            AggregationUpdateQueue::run(data_update, &mut ctx);
1768        }
1769
1770        drop(in_progress_cells);
1771    }
1772
1773    fn try_start_task_execution(
1774        &self,
1775        task_id: TaskId,
1776        priority: TaskPriority,
1777        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1778    ) -> Option<TaskExecutionSpec<'_>> {
1779        let execution_reason;
1780        let task_type;
1781        {
1782            let mut ctx = self.execute_context(turbo_tasks);
1783            let mut task = ctx.task(task_id, TaskDataCategory::All);
1784            task_type = task.get_task_type().to_owned();
1785            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1786            if let Some(tasks) = task.prefetch() {
1787                drop(task);
1788                ctx.prepare_tasks(tasks, "prefetch");
1789                task = ctx.task(task_id, TaskDataCategory::All);
1790            }
1791            let in_progress = task.take_in_progress()?;
1792            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1793                let old = task.set_in_progress(in_progress);
1794                debug_assert!(old.is_none(), "InProgress already exists");
1795                return None;
1796            };
1797            execution_reason = reason;
1798            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1799                InProgressStateInner {
1800                    stale: false,
1801                    once_task,
1802                    done_event,
1803                    session_dependent: false,
1804                    marked_as_completed: false,
1805                    new_children: Default::default(),
1806                },
1807            )));
1808            debug_assert!(old.is_none(), "InProgress already exists");
1809
1810            // Make all current collectibles outdated (remove left-over outdated collectibles)
1811            enum Collectible {
1812                Current(CollectibleRef, i32),
1813                Outdated(CollectibleRef),
1814            }
1815            let collectibles = task
1816                .iter_collectibles()
1817                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1818                .chain(
1819                    task.iter_outdated_collectibles()
1820                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1821                )
1822                .collect::<Vec<_>>();
1823            for collectible in collectibles {
1824                match collectible {
1825                    Collectible::Current(collectible, value) => {
1826                        let _ = task.insert_outdated_collectible(collectible, value);
1827                    }
1828                    Collectible::Outdated(collectible) => {
1829                        if task
1830                            .collectibles()
1831                            .is_none_or(|m| m.get(&collectible).is_none())
1832                        {
1833                            task.remove_outdated_collectibles(&collectible);
1834                        }
1835                    }
1836                }
1837            }
1838
1839            if self.should_track_dependencies() {
1840                // Make all dependencies outdated
1841                let cell_dependencies = task.iter_cell_dependencies().collect();
1842                task.set_outdated_cell_dependencies(cell_dependencies);
1843
1844                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1845                task.set_outdated_output_dependencies(outdated_output_dependencies);
1846            }
1847        }
1848
1849        let (span, future) = match task_type {
1850            TaskType::Cached(task_type) => {
1851                let CachedTaskType {
1852                    native_fn,
1853                    this,
1854                    arg,
1855                } = &*task_type;
1856                (
1857                    native_fn.span(task_id.persistence(), execution_reason, priority),
1858                    native_fn.execute(*this, &**arg),
1859                )
1860            }
1861            TaskType::Transient(task_type) => {
1862                let span = tracing::trace_span!("turbo_tasks::root_task");
1863                let future = match &*task_type {
1864                    TransientTask::Root(f) => f(),
1865                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1866                };
1867                (span, future)
1868            }
1869        };
1870        Some(TaskExecutionSpec { future, span })
1871    }
1872
1873    fn task_execution_completed(
1874        &self,
1875        task_id: TaskId,
1876        result: Result<RawVc, TurboTasksExecutionError>,
1877        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1878        #[cfg(feature = "verify_determinism")] stateful: bool,
1879        has_invalidator: bool,
1880        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1881    ) -> bool {
1882        // Task completion is a 4 step process:
1883        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1884        //    aggregation number of the task and the new children.
1885        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1886        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1887        // 4. Shrink the task memory to reduce footprint of the task.
1888
1889        // Due to persistence it is possible that the process is cancelled after any step. This is
1890        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1891        // in-memory representation.
1892
1893        // The task might be invalidated during this process, so we need to check the stale flag
1894        // at the start of every step.
1895
1896        #[cfg(not(feature = "trace_task_details"))]
1897        let span = tracing::trace_span!(
1898            "task execution completed",
1899            new_children = tracing::field::Empty
1900        )
1901        .entered();
1902        #[cfg(feature = "trace_task_details")]
1903        let span = tracing::trace_span!(
1904            "task execution completed",
1905            task_id = display(task_id),
1906            result = match result.as_ref() {
1907                Ok(value) => display(either::Either::Left(value)),
1908                Err(err) => display(either::Either::Right(err)),
1909            },
1910            new_children = tracing::field::Empty,
1911            immutable = tracing::field::Empty,
1912            new_output = tracing::field::Empty,
1913            output_dependents = tracing::field::Empty,
1914            stale = tracing::field::Empty,
1915        )
1916        .entered();
1917
1918        let is_error = result.is_err();
1919
1920        let mut ctx = self.execute_context(turbo_tasks);
1921
1922        let Some(TaskExecutionCompletePrepareResult {
1923            new_children,
1924            is_now_immutable,
1925            #[cfg(feature = "verify_determinism")]
1926            no_output_set,
1927            new_output,
1928            output_dependent_tasks,
1929            is_recomputation,
1930        }) = self.task_execution_completed_prepare(
1931            &mut ctx,
1932            #[cfg(feature = "trace_task_details")]
1933            &span,
1934            task_id,
1935            result,
1936            cell_counters,
1937            #[cfg(feature = "verify_determinism")]
1938            stateful,
1939            has_invalidator,
1940        )
1941        else {
1942            // Task was stale and has been rescheduled
1943            #[cfg(feature = "trace_task_details")]
1944            span.record("stale", "prepare");
1945            return true;
1946        };
1947
1948        #[cfg(feature = "trace_task_details")]
1949        span.record("new_output", new_output.is_some());
1950        #[cfg(feature = "trace_task_details")]
1951        span.record("output_dependents", output_dependent_tasks.len());
1952
1953        // When restoring from filesystem cache the following might not be executed (since we can
1954        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1955        // would be executed again.
1956
1957        if !output_dependent_tasks.is_empty() {
1958            self.task_execution_completed_invalidate_output_dependent(
1959                &mut ctx,
1960                task_id,
1961                output_dependent_tasks,
1962            );
1963        }
1964
1965        let has_new_children = !new_children.is_empty();
1966        span.record("new_children", new_children.len());
1967
1968        if has_new_children {
1969            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1970        }
1971
1972        if has_new_children
1973            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1974        {
1975            // Task was stale and has been rescheduled
1976            #[cfg(feature = "trace_task_details")]
1977            span.record("stale", "connect");
1978            return true;
1979        }
1980
1981        let (stale, in_progress_cells) = self.task_execution_completed_finish(
1982            &mut ctx,
1983            task_id,
1984            #[cfg(feature = "verify_determinism")]
1985            no_output_set,
1986            new_output,
1987            is_now_immutable,
1988        );
1989        if stale {
1990            // Task was stale and has been rescheduled
1991            #[cfg(feature = "trace_task_details")]
1992            span.record("stale", "finish");
1993            return true;
1994        }
1995
1996        let removed_data = self.task_execution_completed_cleanup(
1997            &mut ctx,
1998            task_id,
1999            cell_counters,
2000            is_error,
2001            is_recomputation,
2002        );
2003
2004        // Drop data outside of critical sections
2005        drop(removed_data);
2006        drop(in_progress_cells);
2007
2008        false
2009    }
2010
2011    fn task_execution_completed_prepare(
2012        &self,
2013        ctx: &mut impl ExecuteContext<'_>,
2014        #[cfg(feature = "trace_task_details")] span: &Span,
2015        task_id: TaskId,
2016        result: Result<RawVc, TurboTasksExecutionError>,
2017        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2018        #[cfg(feature = "verify_determinism")] stateful: bool,
2019        has_invalidator: bool,
2020    ) -> Option<TaskExecutionCompletePrepareResult> {
2021        let mut task = ctx.task(task_id, TaskDataCategory::All);
2022        let is_recomputation = task.is_dirty().is_none();
2023        let Some(in_progress) = task.get_in_progress_mut() else {
2024            panic!("Task execution completed, but task is not in progress: {task:#?}");
2025        };
2026        if matches!(in_progress, InProgressState::Canceled) {
2027            return Some(TaskExecutionCompletePrepareResult {
2028                new_children: Default::default(),
2029                is_now_immutable: false,
2030                #[cfg(feature = "verify_determinism")]
2031                no_output_set: false,
2032                new_output: None,
2033                output_dependent_tasks: Default::default(),
2034                is_recomputation,
2035            });
2036        }
2037        let &mut InProgressState::InProgress(box InProgressStateInner {
2038            stale,
2039            ref mut new_children,
2040            session_dependent,
2041            once_task: is_once_task,
2042            ..
2043        }) = in_progress
2044        else {
2045            panic!("Task execution completed, but task is not in progress: {task:#?}");
2046        };
2047
2048        // If the task is stale, reschedule it
2049        #[cfg(not(feature = "no_fast_stale"))]
2050        if stale && !is_once_task {
2051            let Some(InProgressState::InProgress(box InProgressStateInner {
2052                done_event,
2053                mut new_children,
2054                ..
2055            })) = task.take_in_progress()
2056            else {
2057                unreachable!();
2058            };
2059            let old = task.set_in_progress(InProgressState::Scheduled {
2060                done_event,
2061                reason: TaskExecutionReason::Stale,
2062            });
2063            debug_assert!(old.is_none(), "InProgress already exists");
2064            // Remove old children from new_children to leave only the children that had their
2065            // active count increased
2066            for task in task.iter_children() {
2067                new_children.remove(&task);
2068            }
2069            drop(task);
2070
2071            // We need to undo the active count increase for the children since we throw away the
2072            // new_children list now.
2073            AggregationUpdateQueue::run(
2074                AggregationUpdateJob::DecreaseActiveCounts {
2075                    task_ids: new_children.into_iter().collect(),
2076                },
2077                ctx,
2078            );
2079            return None;
2080        }
2081
2082        // take the children from the task to process them
2083        let mut new_children = take(new_children);
2084
2085        // handle stateful (only tracked when verify_determinism is enabled)
2086        #[cfg(feature = "verify_determinism")]
2087        if stateful {
2088            task.set_stateful(true);
2089        }
2090
2091        // handle has_invalidator
2092        if has_invalidator {
2093            task.set_invalidator(true);
2094        }
2095
2096        // handle cell counters: update max index and remove cells that are no longer used
2097        // On error, skip this update: the task may have failed before creating all cells it
2098        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2099        // based on partial counters would cause "cell no longer exists" errors for tasks that
2100        // still hold dependencies on those cells. The old cell data is preserved on error
2101        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2102        // that data is correct.
2103        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2104        // skips cell data removal on error.
2105        if result.is_ok() || is_recomputation {
2106            let old_counters: FxHashMap<_, _> = task
2107                .iter_cell_type_max_index()
2108                .map(|(&k, &v)| (k, v))
2109                .collect();
2110            let mut counters_to_remove = old_counters.clone();
2111
2112            for (&cell_type, &max_index) in cell_counters.iter() {
2113                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2114                    if old_max_index != max_index {
2115                        task.insert_cell_type_max_index(cell_type, max_index);
2116                    }
2117                } else {
2118                    task.insert_cell_type_max_index(cell_type, max_index);
2119                }
2120            }
2121            for (cell_type, _) in counters_to_remove {
2122                task.remove_cell_type_max_index(&cell_type);
2123            }
2124        }
2125
2126        let mut queue = AggregationUpdateQueue::new();
2127
2128        let mut old_edges = Vec::new();
2129
2130        let has_children = !new_children.is_empty();
2131        let is_immutable = task.immutable();
2132        let task_dependencies_for_immutable =
2133            // Task was previously marked as immutable
2134            if !is_immutable
2135            // Task is not session dependent (session dependent tasks can change between sessions)
2136            && !session_dependent
2137            // Task has no invalidator
2138            && !task.invalidator()
2139            // Task has no dependencies on collectibles
2140            && task.is_collectibles_dependencies_empty()
2141        {
2142            Some(
2143                // Collect all dependencies on tasks to check if all dependencies are immutable
2144                task.iter_output_dependencies()
2145                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2146                    .collect::<FxHashSet<_>>(),
2147            )
2148        } else {
2149            None
2150        };
2151
2152        if has_children {
2153            // Prepare all new children
2154            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2155
2156            // Filter actual new children
2157            old_edges.extend(
2158                task.iter_children()
2159                    .filter(|task| !new_children.remove(task))
2160                    .map(OutdatedEdge::Child),
2161            );
2162        } else {
2163            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2164        }
2165
2166        old_edges.extend(
2167            task.iter_outdated_collectibles()
2168                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2169        );
2170
2171        if self.should_track_dependencies() {
2172            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2173            // At execution start, active deps are copied to outdated as a "before" snapshot.
2174            // During execution, new deps are added to active.
2175            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2176            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2177            // breaking dependency tracking.
2178            old_edges.extend(
2179                task.iter_outdated_cell_dependencies()
2180                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2181            );
2182            old_edges.extend(
2183                task.iter_outdated_output_dependencies()
2184                    .map(OutdatedEdge::OutputDependency),
2185            );
2186        }
2187
2188        // Check if output need to be updated
2189        let current_output = task.get_output();
2190        #[cfg(feature = "verify_determinism")]
2191        let no_output_set = current_output.is_none();
2192        let new_output = match result {
2193            Ok(RawVc::TaskOutput(output_task_id)) => {
2194                if let Some(OutputValue::Output(current_task_id)) = current_output
2195                    && *current_task_id == output_task_id
2196                {
2197                    None
2198                } else {
2199                    Some(OutputValue::Output(output_task_id))
2200                }
2201            }
2202            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2203                if let Some(OutputValue::Cell(CellRef {
2204                    task: current_task_id,
2205                    cell: current_cell,
2206                })) = current_output
2207                    && *current_task_id == output_task_id
2208                    && *current_cell == cell
2209                {
2210                    None
2211                } else {
2212                    Some(OutputValue::Cell(CellRef {
2213                        task: output_task_id,
2214                        cell,
2215                    }))
2216                }
2217            }
2218            Ok(RawVc::LocalOutput(..)) => {
2219                panic!("Non-local tasks must not return a local Vc");
2220            }
2221            Err(err) => {
2222                if let Some(OutputValue::Error(old_error)) = current_output
2223                    && **old_error == err
2224                {
2225                    None
2226                } else {
2227                    Some(OutputValue::Error(Arc::new((&err).into())))
2228                }
2229            }
2230        };
2231        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2232        // When output has changed, grab the dependent tasks
2233        if new_output.is_some() && ctx.should_track_dependencies() {
2234            output_dependent_tasks = task.iter_output_dependent().collect();
2235        }
2236
2237        drop(task);
2238
2239        // Check if the task can be marked as immutable
2240        let mut is_now_immutable = false;
2241        if let Some(dependencies) = task_dependencies_for_immutable
2242            && dependencies
2243                .iter()
2244                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2245        {
2246            is_now_immutable = true;
2247        }
2248        #[cfg(feature = "trace_task_details")]
2249        span.record("immutable", is_immutable || is_now_immutable);
2250
2251        if !queue.is_empty() || !old_edges.is_empty() {
2252            #[cfg(any(
2253                feature = "trace_task_completion",
2254                feature = "trace_aggregation_update_stats"
2255            ))]
2256            let _span =
2257                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2258                    .entered();
2259            // Remove outdated edges first, before removing in_progress+dirty flag.
2260            // We need to make sure all outdated edges are removed before the task can potentially
2261            // be scheduled and executed again
2262            #[cfg(feature = "trace_aggregation_update_stats")]
2263            {
2264                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2265                _span.record("stats", tracing::field::debug(stats));
2266            }
2267            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2268            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2269        }
2270
2271        Some(TaskExecutionCompletePrepareResult {
2272            new_children,
2273            is_now_immutable,
2274            #[cfg(feature = "verify_determinism")]
2275            no_output_set,
2276            new_output,
2277            output_dependent_tasks,
2278            is_recomputation,
2279        })
2280    }
2281
2282    fn task_execution_completed_invalidate_output_dependent(
2283        &self,
2284        ctx: &mut impl ExecuteContext<'_>,
2285        task_id: TaskId,
2286        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2287    ) {
2288        debug_assert!(!output_dependent_tasks.is_empty());
2289
2290        if output_dependent_tasks.len() > 1 {
2291            ctx.prepare_tasks(
2292                output_dependent_tasks
2293                    .iter()
2294                    .map(|&id| (id, TaskDataCategory::All)),
2295                "invalidate output dependents",
2296            );
2297        }
2298
2299        fn process_output_dependents(
2300            ctx: &mut impl ExecuteContext<'_>,
2301            task_id: TaskId,
2302            dependent_task_id: TaskId,
2303            queue: &mut AggregationUpdateQueue,
2304        ) {
2305            #[cfg(feature = "trace_task_output_dependencies")]
2306            let span = tracing::trace_span!(
2307                "invalidate output dependency",
2308                task = %task_id,
2309                dependent_task = %dependent_task_id,
2310                result = tracing::field::Empty,
2311            )
2312            .entered();
2313            let mut make_stale = true;
2314            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2315            let transient_task_type = dependent.get_transient_task_type();
2316            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2317                // once tasks are never invalidated
2318                #[cfg(feature = "trace_task_output_dependencies")]
2319                span.record("result", "once task");
2320                return;
2321            }
2322            if dependent.outdated_output_dependencies_contains(&task_id) {
2323                #[cfg(feature = "trace_task_output_dependencies")]
2324                span.record("result", "outdated dependency");
2325                // output dependency is outdated, so it hasn't read the output yet
2326                // and doesn't need to be invalidated
2327                // But importantly we still need to make the task dirty as it should no longer
2328                // be considered as "recomputation".
2329                make_stale = false;
2330            } else if !dependent.output_dependencies_contains(&task_id) {
2331                // output dependency has been removed, so the task doesn't depend on the
2332                // output anymore and doesn't need to be invalidated
2333                #[cfg(feature = "trace_task_output_dependencies")]
2334                span.record("result", "no backward dependency");
2335                return;
2336            }
2337            make_task_dirty_internal(
2338                dependent,
2339                dependent_task_id,
2340                make_stale,
2341                #[cfg(feature = "trace_task_dirty")]
2342                TaskDirtyCause::OutputChange { task_id },
2343                queue,
2344                ctx,
2345            );
2346            #[cfg(feature = "trace_task_output_dependencies")]
2347            span.record("result", "marked dirty");
2348        }
2349
2350        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2351            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2352            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2353            let _ = scope_and_block(chunks.len(), |scope| {
2354                for chunk in chunks {
2355                    let child_ctx = ctx.child_context();
2356                    scope.spawn(move || {
2357                        let mut ctx = child_ctx.create();
2358                        let mut queue = AggregationUpdateQueue::new();
2359                        for dependent_task_id in chunk {
2360                            process_output_dependents(
2361                                &mut ctx,
2362                                task_id,
2363                                dependent_task_id,
2364                                &mut queue,
2365                            )
2366                        }
2367                        queue.execute(&mut ctx);
2368                    });
2369                }
2370            });
2371        } else {
2372            let mut queue = AggregationUpdateQueue::new();
2373            for dependent_task_id in output_dependent_tasks {
2374                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2375            }
2376            queue.execute(ctx);
2377        }
2378    }
2379
2380    fn task_execution_completed_unfinished_children_dirty(
2381        &self,
2382        ctx: &mut impl ExecuteContext<'_>,
2383        new_children: &FxHashSet<TaskId>,
2384    ) {
2385        debug_assert!(!new_children.is_empty());
2386
2387        let mut queue = AggregationUpdateQueue::new();
2388        ctx.for_each_task_all(
2389            new_children.iter().copied(),
2390            "unfinished children dirty",
2391            |child_task, ctx| {
2392                if !child_task.has_output() {
2393                    let child_id = child_task.id();
2394                    make_task_dirty_internal(
2395                        child_task,
2396                        child_id,
2397                        false,
2398                        #[cfg(feature = "trace_task_dirty")]
2399                        TaskDirtyCause::InitialDirty,
2400                        &mut queue,
2401                        ctx,
2402                    );
2403                }
2404            },
2405        );
2406
2407        queue.execute(ctx);
2408    }
2409
2410    fn task_execution_completed_connect(
2411        &self,
2412        ctx: &mut impl ExecuteContext<'_>,
2413        task_id: TaskId,
2414        new_children: FxHashSet<TaskId>,
2415    ) -> bool {
2416        debug_assert!(!new_children.is_empty());
2417
2418        let mut task = ctx.task(task_id, TaskDataCategory::All);
2419        let Some(in_progress) = task.get_in_progress() else {
2420            panic!("Task execution completed, but task is not in progress: {task:#?}");
2421        };
2422        if matches!(in_progress, InProgressState::Canceled) {
2423            // Task was canceled in the meantime, so we don't connect the children
2424            return false;
2425        }
2426        let InProgressState::InProgress(box InProgressStateInner {
2427            #[cfg(not(feature = "no_fast_stale"))]
2428            stale,
2429            once_task: is_once_task,
2430            ..
2431        }) = in_progress
2432        else {
2433            panic!("Task execution completed, but task is not in progress: {task:#?}");
2434        };
2435
2436        // If the task is stale, reschedule it
2437        #[cfg(not(feature = "no_fast_stale"))]
2438        if *stale && !is_once_task {
2439            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2440                task.take_in_progress()
2441            else {
2442                unreachable!();
2443            };
2444            let old = task.set_in_progress(InProgressState::Scheduled {
2445                done_event,
2446                reason: TaskExecutionReason::Stale,
2447            });
2448            debug_assert!(old.is_none(), "InProgress already exists");
2449            drop(task);
2450
2451            // All `new_children` are currently hold active with an active count and we need to undo
2452            // that. (We already filtered out the old children from that list)
2453            AggregationUpdateQueue::run(
2454                AggregationUpdateJob::DecreaseActiveCounts {
2455                    task_ids: new_children.into_iter().collect(),
2456                },
2457                ctx,
2458            );
2459            return true;
2460        }
2461
2462        let has_active_count = ctx.should_track_activeness()
2463            && task
2464                .get_activeness()
2465                .is_some_and(|activeness| activeness.active_counter > 0);
2466        connect_children(
2467            ctx,
2468            task_id,
2469            task,
2470            new_children,
2471            has_active_count,
2472            ctx.should_track_activeness(),
2473        );
2474
2475        false
2476    }
2477
2478    fn task_execution_completed_finish(
2479        &self,
2480        ctx: &mut impl ExecuteContext<'_>,
2481        task_id: TaskId,
2482        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2483        new_output: Option<OutputValue>,
2484        is_now_immutable: bool,
2485    ) -> (
2486        bool,
2487        Option<
2488            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2489        >,
2490    ) {
2491        let mut task = ctx.task(task_id, TaskDataCategory::All);
2492        let Some(in_progress) = task.take_in_progress() else {
2493            panic!("Task execution completed, but task is not in progress: {task:#?}");
2494        };
2495        if matches!(in_progress, InProgressState::Canceled) {
2496            // Task was canceled in the meantime, so we don't finish it
2497            return (false, None);
2498        }
2499        let InProgressState::InProgress(box InProgressStateInner {
2500            done_event,
2501            once_task: is_once_task,
2502            stale,
2503            session_dependent,
2504            marked_as_completed: _,
2505            new_children,
2506        }) = in_progress
2507        else {
2508            panic!("Task execution completed, but task is not in progress: {task:#?}");
2509        };
2510        debug_assert!(new_children.is_empty());
2511
2512        // If the task is stale, reschedule it
2513        if stale && !is_once_task {
2514            let old = task.set_in_progress(InProgressState::Scheduled {
2515                done_event,
2516                reason: TaskExecutionReason::Stale,
2517            });
2518            debug_assert!(old.is_none(), "InProgress already exists");
2519            return (true, None);
2520        }
2521
2522        // Set the output if it has changed
2523        let mut old_content = None;
2524        if let Some(value) = new_output {
2525            old_content = task.set_output(value);
2526        }
2527
2528        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2529        // to be invalidated and we can mark it as immutable.
2530        if is_now_immutable {
2531            task.set_immutable(true);
2532        }
2533
2534        // Notify in progress cells and remove all of them
2535        let in_progress_cells = task.take_in_progress_cells();
2536        if let Some(ref cells) = in_progress_cells {
2537            for state in cells.values() {
2538                state.event.notify(usize::MAX);
2539            }
2540        }
2541
2542        // Compute the new dirty state
2543        let new_dirtyness = if session_dependent {
2544            Some(Dirtyness::SessionDependent)
2545        } else {
2546            None
2547        };
2548        #[cfg(feature = "verify_determinism")]
2549        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2550        let data_update = task.update_dirty_state(new_dirtyness);
2551
2552        #[cfg(feature = "verify_determinism")]
2553        let reschedule =
2554            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2555        #[cfg(not(feature = "verify_determinism"))]
2556        let reschedule = false;
2557        if reschedule {
2558            let old = task.set_in_progress(InProgressState::Scheduled {
2559                done_event,
2560                reason: TaskExecutionReason::Stale,
2561            });
2562            debug_assert!(old.is_none(), "InProgress already exists");
2563            drop(task);
2564        } else {
2565            drop(task);
2566
2567            // Notify dependent tasks that are waiting for this task to finish
2568            done_event.notify(usize::MAX);
2569        }
2570
2571        drop(old_content);
2572
2573        if let Some(data_update) = data_update {
2574            AggregationUpdateQueue::run(data_update, ctx);
2575        }
2576
2577        // We return so the data can be dropped outside of critical sections
2578        (reschedule, in_progress_cells)
2579    }
2580
2581    fn task_execution_completed_cleanup(
2582        &self,
2583        ctx: &mut impl ExecuteContext<'_>,
2584        task_id: TaskId,
2585        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2586        is_error: bool,
2587        is_recomputation: bool,
2588    ) -> Vec<SharedReference> {
2589        let mut task = ctx.task(task_id, TaskDataCategory::All);
2590        let mut removed_cell_data = Vec::new();
2591        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2592        // after an error as it is likely transient and we want to keep the dependent tasks
2593        // clean to avoid re-executions.
2594        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2595        // skips cell_type_max_index updates on error.
2596        if !is_error || is_recomputation {
2597            // Remove no longer existing cells and
2598            // find all outdated data items (removed cells, outdated edges)
2599            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2600            // anyway and we want to avoid needless re-executions. When the cells become
2601            // used again, they are invalidated from the update cell operation.
2602            let to_remove: Vec<_> = task
2603                .iter_cell_data()
2604                .filter_map(|(cell, _)| {
2605                    cell_counters
2606                        .get(&cell.type_id)
2607                        .is_none_or(|start_index| cell.index >= *start_index)
2608                        .then_some(*cell)
2609                })
2610                .collect();
2611            removed_cell_data.reserve_exact(to_remove.len());
2612            for cell in to_remove {
2613                if let Some(data) = task.remove_cell_data(&cell) {
2614                    removed_cell_data.push(data);
2615                }
2616            }
2617            // Remove cell_data_hash entries for cells that no longer exist
2618            let to_remove_hash: Vec<_> = task
2619                .iter_cell_data_hash()
2620                .filter_map(|(cell, _)| {
2621                    cell_counters
2622                        .get(&cell.type_id)
2623                        .is_none_or(|start_index| cell.index >= *start_index)
2624                        .then_some(*cell)
2625                })
2626                .collect();
2627            for cell in to_remove_hash {
2628                task.remove_cell_data_hash(&cell);
2629            }
2630        }
2631
2632        // Clean up task storage after execution:
2633        // - Shrink collections marked with shrink_on_completion
2634        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2635        task.cleanup_after_execution();
2636
2637        drop(task);
2638
2639        // Return so we can drop outside of critical sections
2640        removed_cell_data
2641    }
2642
2643    /// Prints the standard message emitted when the background persisting process stops due to an
2644    /// unrecoverable write error. The caller is responsible for returning from the background job.
2645    fn log_unrecoverable_persist_error() {
2646        eprintln!(
2647            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2648             background persisting process."
2649        );
2650    }
2651
2652    fn run_backend_job<'a>(
2653        self: &'a Arc<Self>,
2654        job: TurboTasksBackendJob,
2655        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2656    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2657        Box::pin(async move {
2658            match job {
2659                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2660                    debug_assert!(self.should_persist());
2661
2662                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2663                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2664                    let mut idle_start_listener = self.idle_start_event.listen();
2665                    let mut idle_end_listener = self.idle_end_event.listen();
2666                    let mut fresh_idle = true;
2667                    loop {
2668                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2669                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2670                        let idle_timeout = *IDLE_TIMEOUT;
2671                        let (time, mut reason) =
2672                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2673                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2674                            } else {
2675                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2676                            };
2677
2678                        let until = last_snapshot + time;
2679                        if until > Instant::now() {
2680                            let mut stop_listener = self.stopping_event.listen();
2681                            if self.stopping.load(Ordering::Acquire) {
2682                                return;
2683                            }
2684                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2685                                Instant::now() + idle_timeout
2686                            } else {
2687                                far_future()
2688                            };
2689                            loop {
2690                                tokio::select! {
2691                                    _ = &mut stop_listener => {
2692                                        return;
2693                                    },
2694                                    _ = &mut idle_start_listener => {
2695                                        idle_time = Instant::now() + idle_timeout;
2696                                        idle_start_listener = self.idle_start_event.listen()
2697                                    },
2698                                    _ = &mut idle_end_listener => {
2699                                        idle_time = until + idle_timeout;
2700                                        idle_end_listener = self.idle_end_event.listen()
2701                                    },
2702                                    _ = tokio::time::sleep_until(until) => {
2703                                        break;
2704                                    },
2705                                    _ = tokio::time::sleep_until(idle_time) => {
2706                                        if turbo_tasks.is_idle() {
2707                                            reason = "idle timeout";
2708                                            break;
2709                                        }
2710                                    },
2711                                }
2712                            }
2713                        }
2714
2715                        let this = self.clone();
2716                        // Create a root span shared by both the snapshot/persist
2717                        // work and the subsequent compaction so they appear
2718                        // grouped together in trace viewers.
2719                        let background_span =
2720                            tracing::info_span!(parent: None, "background snapshot");
2721                        match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2722                            Err(err) => {
2723                                // save_snapshot consumed persisted_task_cache_log entries;
2724                                // further snapshots would corrupt the task graph.
2725                                eprintln!("Persisting failed: {err:?}");
2726                                Self::log_unrecoverable_persist_error();
2727                                return;
2728                            }
2729                            Ok((snapshot_start, new_data)) => {
2730                                last_snapshot = snapshot_start;
2731
2732                                // Compact while idle (up to limit), regardless of
2733                                // whether the snapshot had new data.
2734                                // `background_span` is not entered here because
2735                                // `EnteredSpan` is `!Send` and would prevent the
2736                                // future from being sent across threads when it
2737                                // suspends at the `select!` await below.
2738                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2739                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2740                                    let idle_ended = tokio::select! {
2741                                        biased;
2742                                        _ = &mut idle_end_listener => {
2743                                            idle_end_listener = self.idle_end_event.listen();
2744                                            true
2745                                        },
2746                                        _ = std::future::ready(()) => false,
2747                                    };
2748                                    if idle_ended {
2749                                        break;
2750                                    }
2751                                    // Enter the span only around the synchronous
2752                                    // compact() call so we never hold an
2753                                    // `EnteredSpan` across an await point.
2754                                    let _compact_span = tracing::info_span!(
2755                                        parent: background_span.id(),
2756                                        "compact database"
2757                                    )
2758                                    .entered();
2759                                    match self.backing_storage.compact() {
2760                                        Ok(true) => {}
2761                                        Ok(false) => break,
2762                                        Err(err) => {
2763                                            eprintln!("Compaction failed: {err:?}");
2764                                            if self.backing_storage.has_unrecoverable_write_error()
2765                                            {
2766                                                Self::log_unrecoverable_persist_error();
2767                                                return;
2768                                            }
2769                                            break;
2770                                        }
2771                                    }
2772                                }
2773
2774                                if !new_data {
2775                                    fresh_idle = false;
2776                                    continue;
2777                                }
2778                                let last_snapshot = last_snapshot.duration_since(self.start_time);
2779                                self.last_snapshot.store(
2780                                    last_snapshot.as_millis().try_into().unwrap(),
2781                                    Ordering::Relaxed,
2782                                );
2783
2784                                turbo_tasks.schedule_backend_background_job(
2785                                    TurboTasksBackendJob::FollowUpSnapshot,
2786                                );
2787                                return;
2788                            }
2789                        }
2790                    }
2791                }
2792            }
2793        })
2794    }
2795
2796    fn try_read_own_task_cell(
2797        &self,
2798        task_id: TaskId,
2799        cell: CellId,
2800        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2801    ) -> Result<TypedCellContent> {
2802        let mut ctx = self.execute_context(turbo_tasks);
2803        let task = ctx.task(task_id, TaskDataCategory::Data);
2804        if let Some(content) = task.get_cell_data(&cell).cloned() {
2805            Ok(CellContent(Some(content)).into_typed(cell.type_id))
2806        } else {
2807            Ok(CellContent(None).into_typed(cell.type_id))
2808        }
2809    }
2810
2811    fn read_task_collectibles(
2812        &self,
2813        task_id: TaskId,
2814        collectible_type: TraitTypeId,
2815        reader_id: Option<TaskId>,
2816        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2817    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2818        let mut ctx = self.execute_context(turbo_tasks);
2819        let mut collectibles = AutoMap::default();
2820        {
2821            let mut task = ctx.task(task_id, TaskDataCategory::All);
2822            // Ensure it's an root node
2823            loop {
2824                let aggregation_number = get_aggregation_number(&task);
2825                if is_root_node(aggregation_number) {
2826                    break;
2827                }
2828                drop(task);
2829                AggregationUpdateQueue::run(
2830                    AggregationUpdateJob::UpdateAggregationNumber {
2831                        task_id,
2832                        base_aggregation_number: u32::MAX,
2833                        distance: None,
2834                    },
2835                    &mut ctx,
2836                );
2837                task = ctx.task(task_id, TaskDataCategory::All);
2838            }
2839            for (collectible, count) in task.iter_aggregated_collectibles() {
2840                if *count > 0 && collectible.collectible_type == collectible_type {
2841                    *collectibles
2842                        .entry(RawVc::TaskCell(
2843                            collectible.cell.task,
2844                            collectible.cell.cell,
2845                        ))
2846                        .or_insert(0) += 1;
2847                }
2848            }
2849            for (&collectible, &count) in task.iter_collectibles() {
2850                if collectible.collectible_type == collectible_type {
2851                    *collectibles
2852                        .entry(RawVc::TaskCell(
2853                            collectible.cell.task,
2854                            collectible.cell.cell,
2855                        ))
2856                        .or_insert(0) += count;
2857                }
2858            }
2859            if let Some(reader_id) = reader_id {
2860                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2861            }
2862        }
2863        if let Some(reader_id) = reader_id {
2864            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2865            let target = CollectiblesRef {
2866                task: task_id,
2867                collectible_type,
2868            };
2869            if !reader.remove_outdated_collectibles_dependencies(&target) {
2870                let _ = reader.add_collectibles_dependencies(target);
2871            }
2872        }
2873        collectibles
2874    }
2875
2876    fn emit_collectible(
2877        &self,
2878        collectible_type: TraitTypeId,
2879        collectible: RawVc,
2880        task_id: TaskId,
2881        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2882    ) {
2883        self.assert_valid_collectible(task_id, collectible);
2884
2885        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2886            panic!("Collectibles need to be resolved");
2887        };
2888        let cell = CellRef {
2889            task: collectible_task,
2890            cell,
2891        };
2892        operation::UpdateCollectibleOperation::run(
2893            task_id,
2894            CollectibleRef {
2895                collectible_type,
2896                cell,
2897            },
2898            1,
2899            self.execute_context(turbo_tasks),
2900        );
2901    }
2902
2903    fn unemit_collectible(
2904        &self,
2905        collectible_type: TraitTypeId,
2906        collectible: RawVc,
2907        count: u32,
2908        task_id: TaskId,
2909        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2910    ) {
2911        self.assert_valid_collectible(task_id, collectible);
2912
2913        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2914            panic!("Collectibles need to be resolved");
2915        };
2916        let cell = CellRef {
2917            task: collectible_task,
2918            cell,
2919        };
2920        operation::UpdateCollectibleOperation::run(
2921            task_id,
2922            CollectibleRef {
2923                collectible_type,
2924                cell,
2925            },
2926            -(i32::try_from(count).unwrap()),
2927            self.execute_context(turbo_tasks),
2928        );
2929    }
2930
2931    fn update_task_cell(
2932        &self,
2933        task_id: TaskId,
2934        cell: CellId,
2935        content: CellContent,
2936        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2937        content_hash: Option<CellHash>,
2938        verification_mode: VerificationMode,
2939        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2940    ) {
2941        operation::UpdateCellOperation::run(
2942            task_id,
2943            cell,
2944            content,
2945            updated_key_hashes,
2946            content_hash,
2947            verification_mode,
2948            self.execute_context(turbo_tasks),
2949        );
2950    }
2951
2952    fn mark_own_task_as_session_dependent(
2953        &self,
2954        task_id: TaskId,
2955        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2956    ) {
2957        if !self.should_track_dependencies() {
2958            // Without dependency tracking we don't need session dependent tasks
2959            return;
2960        }
2961        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2962        let mut ctx = self.execute_context(turbo_tasks);
2963        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2964        let aggregation_number = get_aggregation_number(&task);
2965        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2966            drop(task);
2967            // We want to use a high aggregation number to avoid large aggregation chains for
2968            // session dependent tasks (which change on every run)
2969            AggregationUpdateQueue::run(
2970                AggregationUpdateJob::UpdateAggregationNumber {
2971                    task_id,
2972                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2973                    distance: None,
2974                },
2975                &mut ctx,
2976            );
2977            task = ctx.task(task_id, TaskDataCategory::Meta);
2978        }
2979        if let Some(InProgressState::InProgress(box InProgressStateInner {
2980            session_dependent,
2981            ..
2982        })) = task.get_in_progress_mut()
2983        {
2984            *session_dependent = true;
2985        }
2986    }
2987
2988    fn mark_own_task_as_finished(
2989        &self,
2990        task: TaskId,
2991        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2992    ) {
2993        let mut ctx = self.execute_context(turbo_tasks);
2994        let mut task = ctx.task(task, TaskDataCategory::Data);
2995        if let Some(InProgressState::InProgress(box InProgressStateInner {
2996            marked_as_completed,
2997            ..
2998        })) = task.get_in_progress_mut()
2999        {
3000            *marked_as_completed = true;
3001            // TODO this should remove the dirty state (also check session_dependent)
3002            // but this would break some assumptions for strongly consistent reads.
3003            // Client tasks are not connected yet, so we wouldn't wait for them.
3004            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3005        }
3006    }
3007
3008    fn connect_task(
3009        &self,
3010        task: TaskId,
3011        parent_task: Option<TaskId>,
3012        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3013    ) {
3014        self.assert_not_persistent_calling_transient(parent_task, task, None);
3015        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3016    }
3017
3018    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3019        let task_id = self.transient_task_id_factory.get();
3020        {
3021            let mut task = self.storage.access_mut(task_id);
3022            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3023        }
3024        #[cfg(feature = "verify_aggregation_graph")]
3025        self.root_tasks.lock().insert(task_id);
3026        task_id
3027    }
3028
3029    fn dispose_root_task(
3030        &self,
3031        task_id: TaskId,
3032        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3033    ) {
3034        #[cfg(feature = "verify_aggregation_graph")]
3035        self.root_tasks.lock().remove(&task_id);
3036
3037        let mut ctx = self.execute_context(turbo_tasks);
3038        let mut task = ctx.task(task_id, TaskDataCategory::All);
3039        let is_dirty = task.is_dirty();
3040        let has_dirty_containers = task.has_dirty_containers();
3041        if is_dirty.is_some() || has_dirty_containers {
3042            if let Some(activeness_state) = task.get_activeness_mut() {
3043                // We will finish the task, but it would be removed after the task is done
3044                activeness_state.unset_root_type();
3045                activeness_state.set_active_until_clean();
3046            };
3047        } else if let Some(activeness_state) = task.take_activeness() {
3048            // Technically nobody should be listening to this event, but just in case
3049            // we notify it anyway
3050            activeness_state.all_clean_event.notify(usize::MAX);
3051        }
3052    }
3053
3054    #[cfg(feature = "verify_aggregation_graph")]
3055    fn verify_aggregation_graph(
3056        &self,
3057        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3058        idle: bool,
3059    ) {
3060        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3061            return;
3062        }
3063        use std::{collections::VecDeque, env, io::stdout};
3064
3065        use crate::backend::operation::{get_uppers, is_aggregating_node};
3066
3067        let mut ctx = self.execute_context(turbo_tasks);
3068        let root_tasks = self.root_tasks.lock().clone();
3069
3070        for task_id in root_tasks.into_iter() {
3071            let mut queue = VecDeque::new();
3072            let mut visited = FxHashSet::default();
3073            let mut aggregated_nodes = FxHashSet::default();
3074            let mut collectibles = FxHashMap::default();
3075            let root_task_id = task_id;
3076            visited.insert(task_id);
3077            aggregated_nodes.insert(task_id);
3078            queue.push_back(task_id);
3079            let mut counter = 0;
3080            while let Some(task_id) = queue.pop_front() {
3081                counter += 1;
3082                if counter % 100000 == 0 {
3083                    println!(
3084                        "queue={}, visited={}, aggregated_nodes={}",
3085                        queue.len(),
3086                        visited.len(),
3087                        aggregated_nodes.len()
3088                    );
3089                }
3090                let task = ctx.task(task_id, TaskDataCategory::All);
3091                if idle && !self.is_idle.load(Ordering::Relaxed) {
3092                    return;
3093                }
3094
3095                let uppers = get_uppers(&task);
3096                if task_id != root_task_id
3097                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3098                {
3099                    panic!(
3100                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3101                         {:?})",
3102                        task_id,
3103                        task.get_task_description(),
3104                        uppers
3105                    );
3106                }
3107
3108                for (collectible, _) in task.iter_aggregated_collectibles() {
3109                    collectibles
3110                        .entry(*collectible)
3111                        .or_insert_with(|| (false, Vec::new()))
3112                        .1
3113                        .push(task_id);
3114                }
3115
3116                for (&collectible, &value) in task.iter_collectibles() {
3117                    if value > 0 {
3118                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3119                            *flag = true
3120                        } else {
3121                            panic!(
3122                                "Task {} has a collectible {:?} that is not in any upper task",
3123                                task_id, collectible
3124                            );
3125                        }
3126                    }
3127                }
3128
3129                let is_dirty = task.has_dirty();
3130                let has_dirty_container = task.has_dirty_containers();
3131                let should_be_in_upper = is_dirty || has_dirty_container;
3132
3133                let aggregation_number = get_aggregation_number(&task);
3134                if is_aggregating_node(aggregation_number) {
3135                    aggregated_nodes.insert(task_id);
3136                }
3137                // println!(
3138                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3139                //     ctx.get_task_description(task_id),
3140                //     uppers
3141                // );
3142
3143                for child_id in task.iter_children() {
3144                    // println!("{task_id}: child -> {child_id}");
3145                    if visited.insert(child_id) {
3146                        queue.push_back(child_id);
3147                    }
3148                }
3149                drop(task);
3150
3151                if should_be_in_upper {
3152                    for upper_id in uppers {
3153                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3154                        let in_upper = upper
3155                            .get_aggregated_dirty_containers(&task_id)
3156                            .is_some_and(|&dirty| dirty > 0);
3157                        if !in_upper {
3158                            let containers: Vec<_> = upper
3159                                .iter_aggregated_dirty_containers()
3160                                .map(|(&k, &v)| (k, v))
3161                                .collect();
3162                            let upper_task_desc = upper.get_task_description();
3163                            drop(upper);
3164                            panic!(
3165                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3166                                 ({})\nThese dirty containers are present:\n{:#?}",
3167                                task_id,
3168                                ctx.task(task_id, TaskDataCategory::Data)
3169                                    .get_task_description(),
3170                                upper_id,
3171                                upper_task_desc,
3172                                containers,
3173                            );
3174                        }
3175                    }
3176                }
3177            }
3178
3179            for (collectible, (flag, task_ids)) in collectibles {
3180                if !flag {
3181                    use std::io::Write;
3182                    let mut stdout = stdout().lock();
3183                    writeln!(
3184                        stdout,
3185                        "{:?} that is not emitted in any child task but in these aggregated \
3186                         tasks: {:#?}",
3187                        collectible,
3188                        task_ids
3189                            .iter()
3190                            .map(|t| format!(
3191                                "{t} {}",
3192                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3193                            ))
3194                            .collect::<Vec<_>>()
3195                    )
3196                    .unwrap();
3197
3198                    let task_id = collectible.cell.task;
3199                    let mut queue = {
3200                        let task = ctx.task(task_id, TaskDataCategory::All);
3201                        get_uppers(&task)
3202                    };
3203                    let mut visited = FxHashSet::default();
3204                    for &upper_id in queue.iter() {
3205                        visited.insert(upper_id);
3206                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3207                    }
3208                    while let Some(task_id) = queue.pop() {
3209                        let task = ctx.task(task_id, TaskDataCategory::All);
3210                        let desc = task.get_task_description();
3211                        let aggregated_collectible = task
3212                            .get_aggregated_collectibles(&collectible)
3213                            .copied()
3214                            .unwrap_or_default();
3215                        let uppers = get_uppers(&task);
3216                        drop(task);
3217                        writeln!(
3218                            stdout,
3219                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3220                        )
3221                        .unwrap();
3222                        if task_ids.contains(&task_id) {
3223                            writeln!(
3224                                stdout,
3225                                "Task has an upper connection to an aggregated task that doesn't \
3226                                 reference it. Upper connection is invalid!"
3227                            )
3228                            .unwrap();
3229                        }
3230                        for upper_id in uppers {
3231                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3232                            if !visited.contains(&upper_id) {
3233                                queue.push(upper_id);
3234                            }
3235                        }
3236                    }
3237                    panic!("See stdout for more details");
3238                }
3239            }
3240        }
3241    }
3242
3243    fn assert_not_persistent_calling_transient(
3244        &self,
3245        parent_id: Option<TaskId>,
3246        child_id: TaskId,
3247        cell_id: Option<CellId>,
3248    ) {
3249        if let Some(parent_id) = parent_id
3250            && !parent_id.is_transient()
3251            && child_id.is_transient()
3252        {
3253            self.panic_persistent_calling_transient(
3254                self.debug_get_task_description(parent_id),
3255                self.debug_get_cached_task_type(child_id).as_deref(),
3256                cell_id,
3257            );
3258        }
3259    }
3260
3261    fn panic_persistent_calling_transient(
3262        &self,
3263        parent: String,
3264        child: Option<&CachedTaskType>,
3265        cell_id: Option<CellId>,
3266    ) -> ! {
3267        let transient_reason = if let Some(child) = child {
3268            Cow::Owned(format!(
3269                " The callee is transient because it depends on:\n{}",
3270                self.debug_trace_transient_task(child, cell_id),
3271            ))
3272        } else {
3273            Cow::Borrowed("")
3274        };
3275        panic!(
3276            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3277            parent,
3278            child.map_or("unknown", |t| t.get_name()),
3279            transient_reason,
3280        );
3281    }
3282
3283    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3284        // these checks occur in a potentially hot codepath, but they're cheap
3285        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3286            // This should never happen: The collectible APIs use ResolvedVc
3287            let task_info = if let Some(col_task_ty) = collectible
3288                .try_get_task_id()
3289                .map(|t| self.debug_get_task_description(t))
3290            {
3291                Cow::Owned(format!(" (return type of {col_task_ty})"))
3292            } else {
3293                Cow::Borrowed("")
3294            };
3295            panic!("Collectible{task_info} must be a ResolvedVc")
3296        };
3297        if col_task_id.is_transient() && !task_id.is_transient() {
3298            let transient_reason =
3299                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3300                    Cow::Owned(format!(
3301                        ". The collectible is transient because it depends on:\n{}",
3302                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3303                    ))
3304                } else {
3305                    Cow::Borrowed("")
3306                };
3307            // this should never happen: How would a persistent function get a transient Vc?
3308            panic!(
3309                "Collectible is transient, transient collectibles cannot be emitted from \
3310                 persistent tasks{transient_reason}",
3311            )
3312        }
3313    }
3314}
3315
3316impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3317    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3318        self.0.startup(turbo_tasks);
3319    }
3320
3321    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3322        self.0.stopping();
3323    }
3324
3325    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3326        self.0.stop(turbo_tasks);
3327    }
3328
3329    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3330        self.0.idle_start(turbo_tasks);
3331    }
3332
3333    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3334        self.0.idle_end();
3335    }
3336
3337    fn get_or_create_task(
3338        &self,
3339        native_fn: &'static NativeFunction,
3340        this: Option<RawVc>,
3341        arg: &mut dyn StackDynTaskInputs,
3342        parent_task: Option<TaskId>,
3343        persistence: TaskPersistence,
3344        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3345    ) -> TaskId {
3346        self.0
3347            .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3348    }
3349
3350    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3351        self.0.invalidate_task(task_id, turbo_tasks);
3352    }
3353
3354    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3355        self.0.invalidate_tasks(tasks, turbo_tasks);
3356    }
3357
3358    fn invalidate_tasks_set(
3359        &self,
3360        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3361        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3362    ) {
3363        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3364    }
3365
3366    fn invalidate_serialization(
3367        &self,
3368        task_id: TaskId,
3369        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3370    ) {
3371        self.0.invalidate_serialization(task_id, turbo_tasks);
3372    }
3373
3374    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3375        self.0.task_execution_canceled(task, turbo_tasks)
3376    }
3377
3378    fn try_start_task_execution(
3379        &self,
3380        task_id: TaskId,
3381        priority: TaskPriority,
3382        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3383    ) -> Option<TaskExecutionSpec<'_>> {
3384        self.0
3385            .try_start_task_execution(task_id, priority, turbo_tasks)
3386    }
3387
3388    fn task_execution_completed(
3389        &self,
3390        task_id: TaskId,
3391        result: Result<RawVc, TurboTasksExecutionError>,
3392        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3393        #[cfg(feature = "verify_determinism")] stateful: bool,
3394        has_invalidator: bool,
3395        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3396    ) -> bool {
3397        self.0.task_execution_completed(
3398            task_id,
3399            result,
3400            cell_counters,
3401            #[cfg(feature = "verify_determinism")]
3402            stateful,
3403            has_invalidator,
3404            turbo_tasks,
3405        )
3406    }
3407
3408    type BackendJob = TurboTasksBackendJob;
3409
3410    fn run_backend_job<'a>(
3411        &'a self,
3412        job: Self::BackendJob,
3413        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3414    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3415        self.0.run_backend_job(job, turbo_tasks)
3416    }
3417
3418    fn try_read_task_output(
3419        &self,
3420        task_id: TaskId,
3421        reader: Option<TaskId>,
3422        options: ReadOutputOptions,
3423        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3424    ) -> Result<Result<RawVc, EventListener>> {
3425        self.0
3426            .try_read_task_output(task_id, reader, options, turbo_tasks)
3427    }
3428
3429    fn try_read_task_cell(
3430        &self,
3431        task_id: TaskId,
3432        cell: CellId,
3433        reader: Option<TaskId>,
3434        options: ReadCellOptions,
3435        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3436    ) -> Result<Result<TypedCellContent, EventListener>> {
3437        self.0
3438            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3439    }
3440
3441    fn try_read_own_task_cell(
3442        &self,
3443        task_id: TaskId,
3444        cell: CellId,
3445        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3446    ) -> Result<TypedCellContent> {
3447        self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3448    }
3449
3450    fn read_task_collectibles(
3451        &self,
3452        task_id: TaskId,
3453        collectible_type: TraitTypeId,
3454        reader: Option<TaskId>,
3455        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3456    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3457        self.0
3458            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3459    }
3460
3461    fn emit_collectible(
3462        &self,
3463        collectible_type: TraitTypeId,
3464        collectible: RawVc,
3465        task_id: TaskId,
3466        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3467    ) {
3468        self.0
3469            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3470    }
3471
3472    fn unemit_collectible(
3473        &self,
3474        collectible_type: TraitTypeId,
3475        collectible: RawVc,
3476        count: u32,
3477        task_id: TaskId,
3478        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3479    ) {
3480        self.0
3481            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3482    }
3483
3484    fn update_task_cell(
3485        &self,
3486        task_id: TaskId,
3487        cell: CellId,
3488        content: CellContent,
3489        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3490        content_hash: Option<CellHash>,
3491        verification_mode: VerificationMode,
3492        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3493    ) {
3494        self.0.update_task_cell(
3495            task_id,
3496            cell,
3497            content,
3498            updated_key_hashes,
3499            content_hash,
3500            verification_mode,
3501            turbo_tasks,
3502        );
3503    }
3504
3505    fn mark_own_task_as_finished(
3506        &self,
3507        task_id: TaskId,
3508        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3509    ) {
3510        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3511    }
3512
3513    fn mark_own_task_as_session_dependent(
3514        &self,
3515        task: TaskId,
3516        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3517    ) {
3518        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3519    }
3520
3521    fn connect_task(
3522        &self,
3523        task: TaskId,
3524        parent_task: Option<TaskId>,
3525        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3526    ) {
3527        self.0.connect_task(task, parent_task, turbo_tasks);
3528    }
3529
3530    fn create_transient_task(
3531        &self,
3532        task_type: TransientTaskType,
3533        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3534    ) -> TaskId {
3535        self.0.create_transient_task(task_type)
3536    }
3537
3538    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3539        self.0.dispose_root_task(task_id, turbo_tasks);
3540    }
3541
3542    fn task_statistics(&self) -> &TaskStatisticsApi {
3543        &self.0.task_statistics
3544    }
3545
3546    fn is_tracking_dependencies(&self) -> bool {
3547        self.0.options.dependency_tracking
3548    }
3549
3550    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3551        self.0.get_task_name(task, turbo_tasks)
3552    }
3553}
3554
3555enum DebugTraceTransientTask {
3556    Cached {
3557        task_name: &'static str,
3558        cell_type_id: Option<ValueTypeId>,
3559        cause_self: Option<Box<DebugTraceTransientTask>>,
3560        cause_args: Vec<DebugTraceTransientTask>,
3561    },
3562    /// This representation is used when this task is a duplicate of one previously shown
3563    Collapsed {
3564        task_name: &'static str,
3565        cell_type_id: Option<ValueTypeId>,
3566    },
3567    Uncached {
3568        cell_type_id: Option<ValueTypeId>,
3569    },
3570}
3571
3572impl DebugTraceTransientTask {
3573    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3574        let indent = "    ".repeat(level);
3575        f.write_str(&indent)?;
3576
3577        fn fmt_cell_type_id(
3578            f: &mut fmt::Formatter<'_>,
3579            cell_type_id: Option<ValueTypeId>,
3580        ) -> fmt::Result {
3581            if let Some(ty) = cell_type_id {
3582                write!(
3583                    f,
3584                    " (read cell of type {})",
3585                    get_value_type(ty).ty.global_name
3586                )
3587            } else {
3588                Ok(())
3589            }
3590        }
3591
3592        // write the name and type
3593        match self {
3594            Self::Cached {
3595                task_name,
3596                cell_type_id,
3597                ..
3598            }
3599            | Self::Collapsed {
3600                task_name,
3601                cell_type_id,
3602                ..
3603            } => {
3604                f.write_str(task_name)?;
3605                fmt_cell_type_id(f, *cell_type_id)?;
3606                if matches!(self, Self::Collapsed { .. }) {
3607                    f.write_str(" (collapsed)")?;
3608                }
3609            }
3610            Self::Uncached { cell_type_id } => {
3611                f.write_str("unknown transient task")?;
3612                fmt_cell_type_id(f, *cell_type_id)?;
3613            }
3614        }
3615        f.write_char('\n')?;
3616
3617        // write any extra "cause" information we might have
3618        if let Self::Cached {
3619            cause_self,
3620            cause_args,
3621            ..
3622        } = self
3623        {
3624            if let Some(c) = cause_self {
3625                writeln!(f, "{indent}  self:")?;
3626                c.fmt_indented(f, level + 1)?;
3627            }
3628            if !cause_args.is_empty() {
3629                writeln!(f, "{indent}  args:")?;
3630                for c in cause_args {
3631                    c.fmt_indented(f, level + 1)?;
3632                }
3633            }
3634        }
3635        Ok(())
3636    }
3637}
3638
3639impl fmt::Display for DebugTraceTransientTask {
3640    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3641        self.fmt_indented(f, 0)
3642    }
3643}
3644
3645// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3646fn far_future() -> Instant {
3647    // Roughly 30 years from now.
3648    // API does not provide a way to obtain max `Instant`
3649    // or convert specific date in the future to instant.
3650    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3651    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3652}
3653
3654/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3655/// buffer.
3656/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3657/// resulting buffer sizes.
3658///
3659/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3660/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3661/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3662/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3663/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3664/// spurious `Result` threading throughout the encode path.
3665fn encode_task_data(
3666    task: TaskId,
3667    data: &TaskStorage,
3668    category: SpecificTaskDataCategory,
3669    scratch_buffer: &mut TurboBincodeBuffer,
3670) -> Result<TurboBincodeBuffer> {
3671    scratch_buffer.clear();
3672    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3673    data.encode(category, &mut encoder)?;
3674
3675    if cfg!(feature = "verify_serialization") {
3676        TaskStorage::new()
3677            .decode(
3678                category,
3679                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3680            )
3681            .with_context(|| {
3682                format!(
3683                    "expected to be able to decode serialized data for '{category:?}' information \
3684                     for {task}"
3685                )
3686            })?;
3687    }
3688    Ok(SmallVec::from_slice(scratch_buffer))
3689}