Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9    borrow::Cow,
10    fmt::{self, Write},
11    future::Future,
12    hash::BuildHasherDefault,
13    mem::take,
14    pin::Pin,
15    sync::{
16        Arc, LazyLock,
17        atomic::{AtomicBool, Ordering},
18    },
19    time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32    CellId, DynTaskInputsStorage, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
33    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
34    TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasks, TurboTasksCallApi,
35    TurboTasksPanic, ValueTypeId,
36    backend::{
37        Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38        TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40        VerificationMode,
41    },
42    event::{Event, EventDescription, EventListener},
43    macro_helpers::NativeFunction,
44    message_queue::{TimingEvent, TraceEvent},
45    registry::get_value_type,
46    scope::scope_and_block,
47    task_statistics::TaskStatisticsApi,
48    trace::TraceRawVcs,
49    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51#[cfg(feature = "task_dirty_cause")]
52use turbo_tasks::{FunctionId, TaskDirtyCause};
53
54pub use self::{
55    operation::AnyOperation,
56    storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
57};
58use crate::{
59    backend::{
60        operation::{
61            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64            connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65            prepare_new_children,
66        },
67        snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68        storage::Storage,
69        storage_schema::{TaskStorage, TaskStorageAccessors},
70    },
71    backing_storage::{SnapshotItem, SnapshotMeta, compute_task_type_hash},
72    data::{
73        ActivenessState, CellDependency, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
74        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, TransientTask,
75    },
76    error::TaskError,
77    kv_backing_storage::TurboBackingStorage,
78    utils::{
79        dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
80        shard_amount::compute_shard_amount,
81    },
82};
83
84/// Threshold for parallelizing making dependent tasks dirty.
85/// If the number of dependent tasks exceeds this threshold,
86/// the operation will be parallelized.
87const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
88
89/// Configurable idle timeout for snapshot persistence.
90/// Defaults to 2 seconds if not set or if the value is invalid.
91static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
92    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
93        .ok()
94        .and_then(|v| v.parse::<u64>().ok())
95        .map(Duration::from_millis)
96        .unwrap_or(Duration::from_secs(2))
97});
98
99/// Priority used to re-schedule a task that became stale during execution.
100///
101/// Stale tasks must run again, but at a priority that reflects why they're being re-run rather
102/// than the (likely higher) priority of the original schedule. We use invalidation priority
103/// based on the task's leaf distance, parented under either the task's current dirty priority
104/// or `leaf()` if it is no longer dirty.
105fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
106    TaskPriority::invalidation(
107        task.get_leaf_distance()
108            .copied()
109            .unwrap_or_default()
110            .distance,
111    )
112    .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
113}
114
115pub enum StorageMode {
116    /// Queries the storage for cache entries that don't exist locally.
117    ReadOnly,
118    /// Queries the storage for cache entries that don't exist locally.
119    /// Regularly pushes changes to the backing storage.
120    ReadWrite,
121    /// Queries the storage for cache entries that don't exist locally.
122    /// On shutdown, pushes all changes to the backing storage.
123    ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127    /// Enables dependency tracking.
128    ///
129    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
130    /// forever.
131    pub dependency_tracking: bool,
132
133    /// Enables active tracking.
134    ///
135    /// Automatically disabled when `dependency_tracking` is disabled.
136    ///
137    /// When disabled: All tasks are considered as active.
138    pub active_tracking: bool,
139
140    /// Enables the backing storage.
141    pub storage_mode: Option<StorageMode>,
142
143    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
144    /// datastructures. If `None`, it will use the available parallelism.
145    pub num_workers: Option<usize>,
146
147    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
148    pub small_preallocation: bool,
149
150    /// When enabled, evict all evictable tasks from in-memory storage after every snapshot.
151    /// This reclaims memory by clearing persisted data that can be re-loaded from disk on demand.
152    /// This is an EXPERIMENTAL FEATURE under development
153    pub evict_after_snapshot: bool,
154}
155
156impl Default for BackendOptions {
157    fn default() -> Self {
158        Self {
159            dependency_tracking: true,
160            active_tracking: true,
161            storage_mode: Some(StorageMode::ReadWrite),
162            num_workers: None,
163            small_preallocation: false,
164            evict_after_snapshot: false,
165        }
166    }
167}
168
169pub enum TurboTasksBackendJob {
170    Snapshot,
171}
172
173pub struct TurboTasksBackend {
174    options: BackendOptions,
175
176    start_time: Instant,
177
178    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
179    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
180
181    storage: Storage,
182
183    /// Coordinates the operation/snapshot interleaving protocol. See
184    /// [`SnapshotCoordinator`] for details.
185    snapshot_coord: SnapshotCoordinator,
186    /// Serializes calls to `snapshot_and_persist`. The coordinator's
187    /// `begin_snapshot` asserts that snapshots don't overlap; this mutex
188    /// enforces that contract for our two callers (background loop and
189    /// `stop_and_wait`).
190    snapshot_in_progress: Mutex<()>,
191
192    stopping: AtomicBool,
193    stopping_event: Event,
194    idle_start_event: Event,
195    idle_end_event: Event,
196    #[cfg(feature = "verify_aggregation_graph")]
197    is_idle: AtomicBool,
198
199    task_statistics: TaskStatisticsApi,
200
201    backing_storage: TurboBackingStorage,
202
203    #[cfg(feature = "verify_aggregation_graph")]
204    root_tasks: Mutex<FxHashSet<TaskId>>,
205}
206
207impl TurboTasksBackend {
208    /// Invalidates the persistent storage so that it will be deleted the next time a turbopack
209    /// instance is created with the filesystem cache enabled.
210    ///
211    /// `reason_code` should be one of the codes in
212    /// [`crate::db_invalidation::invalidation_reasons`].
213    pub fn invalidate_storage(&self, reason_code: &str) -> Result<()> {
214        self.backing_storage.invalidate(reason_code)
215    }
216
217    pub fn new(mut options: BackendOptions, backing_storage: TurboBackingStorage) -> Self {
218        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
219        if !options.dependency_tracking {
220            options.active_tracking = false;
221        }
222        let small_preallocation = options.small_preallocation;
223        let next_task_id = backing_storage
224            .next_free_task_id()
225            .expect("Failed to get task id");
226        Self {
227            options,
228            start_time: Instant::now(),
229            persisted_task_id_factory: IdFactoryWithReuse::new(
230                next_task_id,
231                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
232            ),
233            transient_task_id_factory: IdFactoryWithReuse::new(
234                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
235                TaskId::MAX,
236            ),
237            storage: Storage::new(shard_amount, small_preallocation),
238            snapshot_coord: SnapshotCoordinator::new(),
239            snapshot_in_progress: Mutex::new(()),
240            stopping: AtomicBool::new(false),
241            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
242            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
243            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
244            #[cfg(feature = "verify_aggregation_graph")]
245            is_idle: AtomicBool::new(false),
246            task_statistics: TaskStatisticsApi::default(),
247            backing_storage,
248            #[cfg(feature = "verify_aggregation_graph")]
249            root_tasks: Default::default(),
250        }
251    }
252
253    fn execute_context<'a>(
254        &'a self,
255        turbo_tasks: &'a TurboTasks<TurboTasksBackend>,
256    ) -> impl ExecuteContext<'a> {
257        ExecuteContextImpl::new(self, turbo_tasks)
258    }
259
260    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
261        if self.should_persist() {
262            self.snapshot_coord.suspend_point(suspend);
263        }
264    }
265
266    pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
267        if !self.should_persist() {
268            return OperationGuard::noop();
269        }
270        self.snapshot_coord.begin_operation()
271    }
272
273    fn should_persist(&self) -> bool {
274        matches!(
275            self.options.storage_mode,
276            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
277        )
278    }
279
280    fn should_evict(&self) -> bool {
281        self.options.evict_after_snapshot && self.should_persist()
282    }
283
284    /// Perform a snapshot and then evict all evictable tasks from memory.
285    ///
286    /// This is exposed for integration tests that need to verify the
287    /// snapshot → evict → restore cycle works correctly.
288    ///
289    /// Returns `(snapshot_had_new_data, eviction_counts)`.
290    #[doc(hidden)]
291    pub fn snapshot_and_evict_for_testing(
292        &self,
293        turbo_tasks: &TurboTasks<TurboTasksBackend>,
294    ) -> (bool, EvictionCounts) {
295        assert!(
296            self.should_persist(),
297            "snapshot_and_evict requires persistence"
298        );
299        let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
300        let had_new_data = match snapshot_result {
301            Ok((_, new_data)) => new_data,
302            Err(_) => {
303                // Snapshot/persist failed — skip eviction since the data may not
304                // be on disk yet. Evicting now could lose in-memory state that
305                // can't be restored.
306                return (false, EvictionCounts::default());
307            }
308        };
309        let counts = self.storage.evict_after_snapshot(None);
310        (had_new_data, counts)
311    }
312
313    fn should_restore(&self) -> bool {
314        self.options.storage_mode.is_some()
315    }
316
317    fn should_track_dependencies(&self) -> bool {
318        self.options.dependency_tracking
319    }
320
321    fn should_track_activeness(&self) -> bool {
322        self.options.active_tracking
323    }
324
325    fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
326        self.task_statistics
327            .map(|stats| stats.increment_cache_hit(native_fn));
328    }
329
330    fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
331        self.task_statistics
332            .map(|stats| stats.increment_cache_miss(native_fn));
333    }
334
335    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
336    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
337    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
338    /// references for lazy name resolution.
339    fn task_error_to_turbo_tasks_execution_error(
340        &self,
341        error: &TaskError,
342        ctx: &mut impl ExecuteContext<'_>,
343    ) -> TurboTasksExecutionError {
344        match error {
345            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
346            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
347                message: item.message.clone(),
348                source: item
349                    .source
350                    .as_ref()
351                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
352            })),
353            TaskError::LocalTaskContext(local_task_context) => {
354                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
355                    name: local_task_context.name.clone(),
356                    source: local_task_context
357                        .source
358                        .as_ref()
359                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
360                }))
361            }
362            TaskError::TaskChain(chain) => {
363                let task_id = chain.last().unwrap();
364                let error = {
365                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
366                    if let Some(OutputValue::Error(error)) = task.get_output() {
367                        Some(error.clone())
368                    } else {
369                        None
370                    }
371                };
372                let error = error.map_or_else(
373                    || {
374                        // Eventual consistency will cause errors to no longer be available
375                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
376                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
377                                "Error no longer available",
378                            )),
379                            location: None,
380                        }))
381                    },
382                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
383                );
384                let mut current_error = error;
385                for &task_id in chain.iter().rev() {
386                    current_error =
387                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
388                            task_id,
389                            source: Some(current_error),
390                            turbo_tasks: ctx.turbo_tasks(),
391                        }));
392                }
393                current_error
394            }
395        }
396    }
397}
398
399/// Intermediate result of step 1 of task execution completion.
400struct TaskExecutionCompletePrepareResult {
401    pub new_children: FxHashSet<TaskId>,
402    pub is_now_immutable: bool,
403    #[cfg(feature = "verify_determinism")]
404    pub no_output_set: bool,
405    #[cfg(feature = "task_dirty_cause")]
406    pub function_id: Option<FunctionId>,
407    pub new_output: Option<OutputValue>,
408    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
409    pub is_recomputation: bool,
410    pub is_session_dependent: bool,
411}
412
413// Operations
414impl TurboTasksBackend {
415    fn try_read_task_output(
416        &self,
417        task_id: TaskId,
418        reader: Option<TaskId>,
419        options: ReadOutputOptions,
420        turbo_tasks: &TurboTasks<TurboTasksBackend>,
421    ) -> Result<Result<RawVc, EventListener>> {
422        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
423
424        let mut ctx = self.execute_context(turbo_tasks);
425        let need_reader_task = if self.should_track_dependencies()
426            && !matches!(options.tracking, ReadTracking::Untracked)
427            && reader.is_some_and(|reader_id| reader_id != task_id)
428            && let Some(reader_id) = reader
429            && reader_id != task_id
430        {
431            Some(reader_id)
432        } else {
433            None
434        };
435        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
436            // Having a task_pair here is not optimal, but otherwise this would lead to a race
437            // condition. See below.
438            // TODO(sokra): solve that in a more performant way.
439            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
440            (task, Some(reader))
441        } else {
442            (ctx.task(task_id, TaskDataCategory::All), None)
443        };
444
445        fn listen_to_done_event(
446            reader_description: Option<EventDescription>,
447            tracking: ReadTracking,
448            done_event: &Event,
449        ) -> EventListener {
450            done_event.listen_with_note(move || {
451                move || {
452                    if let Some(reader_description) = reader_description.as_ref() {
453                        format!(
454                            "try_read_task_output from {} ({})",
455                            reader_description, tracking
456                        )
457                    } else {
458                        format!("try_read_task_output ({})", tracking)
459                    }
460                }
461            })
462        }
463
464        fn check_in_progress(
465            task: &impl TaskGuard,
466            reader_description: Option<EventDescription>,
467            tracking: ReadTracking,
468        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
469        {
470            match task.get_in_progress() {
471                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
472                    listen_to_done_event(reader_description, tracking, done_event),
473                ))),
474                Some(InProgressState::InProgress(box InProgressStateInner {
475                    done_event, ..
476                })) => Some(Ok(Err(listen_to_done_event(
477                    reader_description,
478                    tracking,
479                    done_event,
480                )))),
481                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
482                    "{} was canceled",
483                    task.get_task_description()
484                ))),
485                None => None,
486            }
487        }
488
489        if matches!(options.consistency, ReadConsistency::Strong) {
490            if task
491                .get_persistent_task_type()
492                .is_some_and(|t| !t.native_fn.is_root)
493            {
494                drop(task);
495                drop(reader_task);
496                panic!(
497                    "Strongly consistent read of non-root task {} (reader: {}). The `root` \
498                     attribute is missing on the task.",
499                    self.debug_get_task_description(task_id),
500                    reader.map_or_else(
501                        || "unknown".to_string(),
502                        |r| self.debug_get_task_description(r)
503                    )
504                );
505            }
506
507            let is_dirty = task.is_dirty();
508
509            // Check the dirty count of the root node
510            let has_dirty_containers = task.has_dirty_containers();
511            if has_dirty_containers || is_dirty.is_some() {
512                let activeness = task.get_activeness_mut();
513                let mut task_ids_to_schedule: Vec<_> = Vec::new();
514                // When there are dirty task, subscribe to the all_clean_event
515                let activeness = if let Some(activeness) = activeness {
516                    // This makes sure all tasks stay active and this task won't stale.
517                    // active_until_clean is automatically removed when this
518                    // task is clean.
519                    activeness.set_active_until_clean();
520                    activeness
521                } else {
522                    // If we don't have a root state, add one. This also makes sure all tasks stay
523                    // active and this task won't stale. active_until_clean
524                    // is automatically removed when this task is clean.
525                    if ctx.should_track_activeness() {
526                        // A newly added Activeness need to make sure to schedule the tasks
527                        task_ids_to_schedule = task.dirty_containers().collect();
528                        task_ids_to_schedule.push(task_id);
529                    }
530                    let activeness =
531                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
532                    activeness.set_active_until_clean();
533                    activeness
534                };
535                let listener = activeness.all_clean_event.listen_with_note(move || {
536                    // Reach the backend through the pinned `turbo_tasks` handle rather than
537                    // cloning `self`: pinning keeps the backend alive for the closure's lifetime.
538                    let tt = turbo_tasks.pin();
539                    move || {
540                        let mut ctx = tt.backend().execute_context(&tt);
541                        let mut visited = FxHashSet::default();
542                        fn indent(s: &str) -> String {
543                            s.split_inclusive('\n')
544                                .flat_map(|line: &str| ["  ", line].into_iter())
545                                .collect::<String>()
546                        }
547                        fn get_info(
548                            ctx: &mut impl ExecuteContext<'_>,
549                            task_id: TaskId,
550                            parent_and_count: Option<(TaskId, i32)>,
551                            visited: &mut FxHashSet<TaskId>,
552                        ) -> String {
553                            let task = ctx.task(task_id, TaskDataCategory::All);
554                            let is_dirty = task.is_dirty();
555                            let in_progress =
556                                task.get_in_progress()
557                                    .map_or("not in progress", |p| match p {
558                                        InProgressState::InProgress(_) => "in progress",
559                                        InProgressState::Scheduled { .. } => "scheduled",
560                                        InProgressState::Canceled => "canceled",
561                                    });
562                            let activeness = task.get_activeness().map_or_else(
563                                || "not active".to_string(),
564                                |activeness| format!("{activeness:?}"),
565                            );
566                            let aggregation_number = get_aggregation_number(&task);
567                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
568                            {
569                                let uppers = get_uppers(&task);
570                                !uppers.contains(&parent_task_id)
571                            } else {
572                                false
573                            };
574
575                            // Check the dirty count of the root node
576                            let has_dirty_containers = task.has_dirty_containers();
577
578                            let task_description = task.get_task_description();
579                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
580                                format!(", dirty({parent_priority})")
581                            } else {
582                                String::new()
583                            };
584                            let has_dirty_containers_label = if has_dirty_containers {
585                                ", dirty containers"
586                            } else {
587                                ""
588                            };
589                            let count = if let Some((_, count)) = parent_and_count {
590                                format!(" {count}")
591                            } else {
592                                String::new()
593                            };
594                            let mut info = format!(
595                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
596                                 {in_progress}, \
597                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
598                            );
599                            let children: Vec<_> = task.dirty_containers_with_count().collect();
600                            drop(task);
601
602                            if missing_upper {
603                                info.push_str("\n  ERROR: missing upper connection");
604                            }
605
606                            if has_dirty_containers || !children.is_empty() {
607                                writeln!(info, "\n  dirty tasks:").unwrap();
608
609                                for (child_task_id, count) in children {
610                                    let task_description = ctx
611                                        .task(child_task_id, TaskDataCategory::Data)
612                                        .get_task_description();
613                                    if visited.insert(child_task_id) {
614                                        let child_info = get_info(
615                                            ctx,
616                                            child_task_id,
617                                            Some((task_id, count)),
618                                            visited,
619                                        );
620                                        info.push_str(&indent(&child_info));
621                                        if !info.ends_with('\n') {
622                                            info.push('\n');
623                                        }
624                                    } else {
625                                        writeln!(
626                                            info,
627                                            "  {child_task_id} {task_description} {count} \
628                                             (already visited)"
629                                        )
630                                        .unwrap();
631                                    }
632                                }
633                            }
634                            info
635                        }
636                        let info = get_info(&mut ctx, task_id, None, &mut visited);
637                        format!(
638                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
639                        )
640                    }
641                });
642                drop(reader_task);
643                drop(task);
644                if !task_ids_to_schedule.is_empty() {
645                    let mut queue = AggregationUpdateQueue::new();
646                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
647                    queue.execute(&mut ctx);
648                }
649
650                return Ok(Err(listener));
651            }
652        }
653
654        let reader_description = reader_task
655            .as_ref()
656            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
657        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
658        {
659            return value;
660        }
661
662        if let Some(output) = task.get_output() {
663            let result = match output {
664                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
665                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
666                OutputValue::Error(error) => Err(error.clone()),
667            };
668            if let Some(mut reader_task) = reader_task.take()
669                && options.tracking.should_track(result.is_err())
670                && (!task.immutable() || cfg!(feature = "verify_immutable"))
671            {
672                #[cfg(feature = "trace_task_output_dependencies")]
673                let _span = tracing::trace_span!(
674                    "add output dependency",
675                    task = %task_id,
676                    dependent_task = ?reader
677                )
678                .entered();
679                let mut queue = LeafDistanceUpdateQueue::new();
680                let reader = reader.unwrap();
681                if task.add_output_dependent(reader) {
682                    // Ensure that dependent leaf distance is strictly monotonic increasing
683                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
684                    let reader_leaf_distance =
685                        reader_task.get_leaf_distance().copied().unwrap_or_default();
686                    if reader_leaf_distance.distance <= leaf_distance.distance {
687                        queue.push(
688                            reader,
689                            leaf_distance.distance,
690                            leaf_distance.max_distance_in_buffer,
691                        );
692                    }
693                }
694
695                drop(task);
696
697                // Note: We use `task_pair` earlier to lock the task and its reader at the same
698                // time. If we didn't and just locked the reader here, an invalidation could occur
699                // between grabbing the locks. If that happened, and if the task is "outdated" or
700                // doesn't have the dependency edge yet, the invalidation would be lost.
701
702                if !reader_task.remove_outdated_output_dependencies(&task_id) {
703                    let _ = reader_task.add_output_dependencies(task_id);
704                }
705                drop(reader_task);
706
707                queue.execute(&mut ctx);
708            } else {
709                drop(task);
710            }
711
712            return result.map_err(|error| {
713                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
714                    .with_task_context(task_id, turbo_tasks.pin())
715                    .into()
716            });
717        }
718        drop(reader_task);
719
720        let note = EventDescription::new(|| {
721            move || {
722                if let Some(reader) = reader_description.as_ref() {
723                    format!("try_read_task_output (recompute) from {reader}",)
724                } else {
725                    "try_read_task_output (recompute, untracked)".to_string()
726                }
727            }
728        });
729
730        // Output doesn't exist. We need to schedule the task to compute it.
731        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
732            TaskExecutionReason::OutputNotAvailable,
733            EventDescription::new(|| task.get_task_desc_fn()),
734            note,
735        );
736
737        // It's not possible that the task is InProgress at this point. If it is InProgress {
738        // done: true } it must have Output and would early return.
739        let old = task.set_in_progress(in_progress_state);
740        debug_assert!(old.is_none(), "InProgress already exists");
741        ctx.schedule_task(task, TaskPriority::Recomputation);
742
743        Ok(Err(listener))
744    }
745
746    fn try_read_task_cell(
747        &self,
748        task_id: TaskId,
749        reader: Option<TaskId>,
750        cell: CellId,
751        options: ReadCellOptions,
752        turbo_tasks: &TurboTasks<TurboTasksBackend>,
753    ) -> Result<Result<TypedCellContent, EventListener>> {
754        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
755
756        fn add_cell_dependency(
757            task_id: TaskId,
758            mut task: impl TaskGuard,
759            reader: Option<TaskId>,
760            reader_task: Option<impl TaskGuard>,
761            cell: CellId,
762            key: Option<u64>,
763        ) {
764            if let Some(mut reader_task) = reader_task
765                && (!task.immutable() || cfg!(feature = "verify_immutable"))
766            {
767                let reader = reader.unwrap();
768                let _ = task
769                    .add_cell_dependents(CellDependency::new(CellRef { task: reader, cell }, key));
770                drop(task);
771
772                // Note: We use `task_pair` earlier to lock the task and its reader at the same
773                // time. If we didn't and just locked the reader here, an invalidation could occur
774                // between grabbing the locks. If that happened, and if the task is "outdated" or
775                // doesn't have the dependency edge yet, the invalidation would be lost.
776
777                let target = CellRef {
778                    task: task_id,
779                    cell,
780                };
781                let dep = CellDependency::new(target, key);
782                if !reader_task.remove_outdated_cell_dependencies(&dep) {
783                    let _ = reader_task.add_cell_dependencies(dep);
784                }
785                drop(reader_task);
786            }
787        }
788
789        let ReadCellOptions {
790            tracking,
791            final_read_hint,
792        } = options;
793
794        let mut ctx = self.execute_context(turbo_tasks);
795        let (mut task, reader_task) = if self.should_track_dependencies()
796            && !matches!(tracking, ReadCellTracking::Untracked)
797            && let Some(reader_id) = reader
798            && reader_id != task_id
799        {
800            // Having a task_pair here is not optimal, but otherwise this would lead to a race
801            // condition. See below.
802            // TODO(sokra): solve that in a more performant way.
803            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
804            (task, Some(reader))
805        } else {
806            (ctx.task(task_id, TaskDataCategory::All), None)
807        };
808
809        let content = if final_read_hint {
810            task.remove_cell_data(&cell)
811        } else {
812            task.get_cell_data(&cell).cloned()
813        };
814        if let Some(content) = content {
815            if tracking.should_track(false) {
816                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
817            }
818            return Ok(Ok(TypedCellContent(
819                cell.type_id,
820                CellContent(Some(content)),
821            )));
822        }
823
824        let in_progress = task.get_in_progress();
825        if matches!(
826            in_progress,
827            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
828        ) {
829            return Ok(Err(self
830                .listen_to_cell(&mut task, task_id, &reader_task, cell)
831                .0));
832        }
833        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
834
835        // Check cell index range (cell might not exist at all)
836        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
837        let Some(max_id) = max_id else {
838            let task_desc = task.get_task_description();
839            if tracking.should_track(true) {
840                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
841            }
842            bail!(
843                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
844            );
845        };
846        if cell.index >= max_id {
847            let task_desc = task.get_task_description();
848            if tracking.should_track(true) {
849                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
850            }
851            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
852        }
853
854        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
855        // task to get the cell content.
856
857        // Bail early if the task was cancelled — no point in registering a listener
858        // on a task that won't execute again.
859        if is_cancelled {
860            bail!("{} was canceled", task.get_task_description());
861        }
862
863        // Listen to the cell and potentially schedule the task
864        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
865        drop(reader_task);
866        if !new_listener {
867            return Ok(Err(listener));
868        }
869
870        let _span = tracing::trace_span!(
871            "recomputation",
872            cell_type = get_value_type(cell.type_id).ty.global_name,
873            cell_index = cell.index
874        )
875        .entered();
876
877        let _ = task.add_scheduled(
878            TaskExecutionReason::CellNotAvailable,
879            EventDescription::new(|| task.get_task_desc_fn()),
880        );
881        ctx.schedule_task(task, TaskPriority::Recomputation);
882
883        Ok(Err(listener))
884    }
885
886    fn listen_to_cell(
887        &self,
888        task: &mut impl TaskGuard,
889        task_id: TaskId,
890        reader_task: &Option<impl TaskGuard>,
891        cell: CellId,
892    ) -> (EventListener, bool) {
893        let note = || {
894            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
895            move || {
896                if let Some(reader_desc) = reader_desc.as_ref() {
897                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
898                } else {
899                    "try_read_task_cell (in progress, untracked)".to_string()
900                }
901            }
902        };
903        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
904            // Someone else is already computing the cell
905            let listener = in_progress.event.listen_with_note(note);
906            return (listener, false);
907        }
908        let in_progress = InProgressCellState::new(task_id, cell);
909        let listener = in_progress.event.listen_with_note(note);
910        let old = task.insert_in_progress_cells(cell, in_progress);
911        debug_assert!(old.is_none(), "InProgressCell already exists");
912        (listener, true)
913    }
914
915    fn snapshot_and_persist(
916        &self,
917        parent_span: Option<tracing::Id>,
918        reason: &str,
919        turbo_tasks: &TurboTasks<TurboTasksBackend>,
920    ) -> Result<(Instant, bool), anyhow::Error> {
921        let snapshot_span =
922            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
923                .entered();
924        // Serialize snapshots. The internal protocol (snapshot_mode, snapshot
925        // request bit, suspended_operations) assumes only one snapshot runs at
926        // a time. Held for the entire snapshot lifecycle.
927        let _snapshot_in_progress = self.snapshot_in_progress.lock();
928        let start = Instant::now();
929        // SystemTime for wall-clock timestamps in trace events (milliseconds
930        // since epoch). Instant is monotonic but has no defined epoch, so it
931        // can't be used for cross-process trace correlation.
932        let wall_start = SystemTime::now();
933        debug_assert!(self.should_persist());
934
935        let mut snapshot_phase = {
936            let _span = tracing::info_span!("blocking").entered();
937            self.snapshot_coord.begin_snapshot()
938        };
939        // Enter snapshot mode, which atomically reads and resets the modified count.
940        // Checking after start_snapshot ensures no concurrent increments can race.
941        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
942
943        let suspended_operations = snapshot_phase.take_suspended_operations();
944
945        let snapshot_time = Instant::now();
946        drop(snapshot_phase);
947
948        if !has_modifications {
949            // No tasks modified since the last snapshot — drop the guard (which
950            // calls end_snapshot) and skip the expensive O(N) scan.
951            drop(snapshot_guard);
952            return Ok((start, false));
953        }
954
955        #[cfg(feature = "print_cache_item_size")]
956        #[derive(Default)]
957        struct TaskCacheStats {
958            data: usize,
959            #[cfg(feature = "print_cache_item_size_with_compressed")]
960            data_compressed: usize,
961            data_count: usize,
962            meta: usize,
963            #[cfg(feature = "print_cache_item_size_with_compressed")]
964            meta_compressed: usize,
965            meta_count: usize,
966            upper_count: usize,
967            collectibles_count: usize,
968            aggregated_collectibles_count: usize,
969            children_count: usize,
970            followers_count: usize,
971            collectibles_dependents_count: usize,
972            aggregated_dirty_containers_count: usize,
973            output_size: usize,
974        }
975        /// Formats a byte size, optionally including the compressed size when the
976        /// `print_cache_item_size_with_compressed` feature is enabled.
977        #[cfg(feature = "print_cache_item_size")]
978        struct FormatSizes {
979            size: usize,
980            #[cfg(feature = "print_cache_item_size_with_compressed")]
981            compressed_size: usize,
982        }
983        #[cfg(feature = "print_cache_item_size")]
984        impl std::fmt::Display for FormatSizes {
985            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
986                use turbo_tasks::util::FormatBytes;
987                #[cfg(feature = "print_cache_item_size_with_compressed")]
988                {
989                    write!(
990                        f,
991                        "{} ({} compressed)",
992                        FormatBytes(self.size),
993                        FormatBytes(self.compressed_size)
994                    )
995                }
996                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
997                {
998                    write!(f, "{}", FormatBytes(self.size))
999                }
1000            }
1001        }
1002        #[cfg(feature = "print_cache_item_size")]
1003        impl TaskCacheStats {
1004            #[cfg(feature = "print_cache_item_size_with_compressed")]
1005            fn compressed_size(data: &[u8]) -> Result<usize> {
1006                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1007                    data,
1008                    &mut Vec::new(),
1009                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1010                )?)
1011            }
1012
1013            fn add_data(&mut self, data: &[u8]) {
1014                self.data += data.len();
1015                #[cfg(feature = "print_cache_item_size_with_compressed")]
1016                {
1017                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1018                }
1019                self.data_count += 1;
1020            }
1021
1022            fn add_meta(&mut self, data: &[u8]) {
1023                self.meta += data.len();
1024                #[cfg(feature = "print_cache_item_size_with_compressed")]
1025                {
1026                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1027                }
1028                self.meta_count += 1;
1029            }
1030
1031            fn add_counts(&mut self, storage: &TaskStorage) {
1032                let counts = storage.meta_counts();
1033                self.upper_count += counts.upper;
1034                self.collectibles_count += counts.collectibles;
1035                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1036                self.children_count += counts.children;
1037                self.followers_count += counts.followers;
1038                self.collectibles_dependents_count += counts.collectibles_dependents;
1039                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1040                if let Some(output) = storage.get_output() {
1041                    use turbo_bincode::turbo_bincode_encode;
1042
1043                    self.output_size += turbo_bincode_encode(&output)
1044                        .map(|data| data.len())
1045                        .unwrap_or(0);
1046                }
1047            }
1048
1049            /// Returns the task name used as the stats grouping key.
1050            fn task_name(storage: &TaskStorage) -> String {
1051                storage
1052                    .get_persistent_task_type()
1053                    .map(|t| t.to_string())
1054                    .unwrap_or_else(|| "<unknown>".to_string())
1055            }
1056
1057            /// Returns the primary sort key: compressed total when
1058            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1059            fn sort_key(&self) -> usize {
1060                #[cfg(feature = "print_cache_item_size_with_compressed")]
1061                {
1062                    self.data_compressed + self.meta_compressed
1063                }
1064                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1065                {
1066                    self.data + self.meta
1067                }
1068            }
1069
1070            fn format_total(&self) -> FormatSizes {
1071                FormatSizes {
1072                    size: self.data + self.meta,
1073                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1074                    compressed_size: self.data_compressed + self.meta_compressed,
1075                }
1076            }
1077
1078            fn format_data(&self) -> FormatSizes {
1079                FormatSizes {
1080                    size: self.data,
1081                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1082                    compressed_size: self.data_compressed,
1083                }
1084            }
1085
1086            fn format_avg_data(&self) -> FormatSizes {
1087                FormatSizes {
1088                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1089                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1090                    compressed_size: self
1091                        .data_compressed
1092                        .checked_div(self.data_count)
1093                        .unwrap_or(0),
1094                }
1095            }
1096
1097            fn format_meta(&self) -> FormatSizes {
1098                FormatSizes {
1099                    size: self.meta,
1100                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1101                    compressed_size: self.meta_compressed,
1102                }
1103            }
1104
1105            fn format_avg_meta(&self) -> FormatSizes {
1106                FormatSizes {
1107                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1108                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1109                    compressed_size: self
1110                        .meta_compressed
1111                        .checked_div(self.meta_count)
1112                        .unwrap_or(0),
1113                }
1114            }
1115        }
1116        #[cfg(feature = "print_cache_item_size")]
1117        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1118            Mutex::new(FxHashMap::default());
1119
1120        // Encode each task's modified categories. We only encode categories with `modified` set,
1121        // meaning the category was actually dirtied. Categories restored from disk but never
1122        // modified don't need re-persisting since the on-disk version is still valid.
1123        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1124        // flags were copied from the live task at snapshot creation time, reflecting which
1125        // categories were dirtied before the snapshot was taken.
1126        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1127            let encode_category = |task_id: TaskId,
1128                                   data: &TaskStorage,
1129                                   category: SpecificTaskDataCategory,
1130                                   buffer: &mut TurboBincodeBuffer|
1131             -> Option<TurboBincodeBuffer> {
1132                match encode_task_data(task_id, data, category, buffer) {
1133                    Ok(encoded) => {
1134                        #[cfg(feature = "print_cache_item_size")]
1135                        {
1136                            let mut stats = task_cache_stats.lock();
1137                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1138                            match category {
1139                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1140                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1141                            }
1142                        }
1143                        Some(encoded)
1144                    }
1145                    Err(err) => {
1146                        panic!(
1147                            "Serializing task {} failed ({:?}): {:?}",
1148                            self.debug_get_task_description(task_id),
1149                            category,
1150                            err
1151                        );
1152                    }
1153                }
1154            };
1155            if task_id.is_transient() {
1156                unreachable!("transient task_ids should never be enqueued to be persisted");
1157            }
1158
1159            let encode_meta = inner.flags.meta_modified();
1160            let encode_data = inner.flags.data_modified();
1161
1162            #[cfg(feature = "print_cache_item_size")]
1163            if encode_data || encode_meta {
1164                task_cache_stats
1165                    .lock()
1166                    .entry(TaskCacheStats::task_name(inner))
1167                    .or_default()
1168                    .add_counts(inner);
1169            }
1170
1171            let meta = if encode_meta {
1172                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1173            } else {
1174                None
1175            };
1176
1177            let data = if encode_data {
1178                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1179            } else {
1180                None
1181            };
1182            let task_type_hash = if inner.flags.new_task() {
1183                let task_type = inner.get_persistent_task_type().expect(
1184                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1185                     creation for persistent tasks uses a single ExecutionContextImpl for \
1186                     creating the task (which sets new_task) and connect_child (which sets \
1187                     persistent_task_type) and take_snapshot waits for all operations to complete \
1188                     or suspend before we start snapshotting.  So task creation will always set \
1189                     the task_type.",
1190                );
1191                Some(compute_task_type_hash(task_type))
1192            } else {
1193                None
1194            };
1195
1196            SnapshotItem {
1197                task_id,
1198                meta,
1199                data,
1200                task_type_hash,
1201            }
1202        };
1203
1204        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1205
1206        drop(snapshot_span);
1207        let snapshot_duration = start.elapsed();
1208        let task_count = task_snapshots.len();
1209
1210        if task_snapshots.is_empty() {
1211            // This should be impossible — if we got here, modified_count was nonzero, and every
1212            // modification that increments the count also failed during encoding.
1213            std::hint::cold_path();
1214            return Ok((snapshot_time, false));
1215        }
1216
1217        let persist_start = Instant::now();
1218        let span = tracing::info_span!(
1219            parent: parent_span,
1220            "persist",
1221            reason = reason,
1222            data_items= tracing::field::Empty,
1223            meta_items= tracing::field::Empty,
1224            task_cache_items= tracing::field::Empty,
1225            next_task_id= tracing::field::Empty,)
1226        .entered();
1227        {
1228            // Tasks were already consumed by take_snapshot, so a future snapshot
1229            // would not re-persist them — returning an error signals to the caller
1230            // that further persist attempts would corrupt the task graph in storage.
1231            let SnapshotMeta {
1232                task_cache_items,
1233                data_items,
1234                meta_items,
1235                max_next_task_id,
1236            } = self
1237                .backing_storage
1238                .save_snapshot(suspended_operations, task_snapshots)?;
1239            span.record("data_items", data_items);
1240            span.record("meta_items", meta_items);
1241            span.record("task_cache_items", task_cache_items);
1242            span.record("next_task_id", max_next_task_id);
1243
1244            #[cfg(feature = "print_cache_item_size")]
1245            {
1246                let mut task_cache_stats = task_cache_stats
1247                    .into_inner()
1248                    .into_iter()
1249                    .collect::<Vec<_>>();
1250                if !task_cache_stats.is_empty() {
1251                    use turbo_tasks::util::FormatBytes;
1252
1253                    use crate::utils::markdown_table::print_markdown_table;
1254
1255                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1256                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1257                    });
1258
1259                    println!(
1260                        "Task cache stats: {}",
1261                        FormatSizes {
1262                            size: task_cache_stats
1263                                .iter()
1264                                .map(|(_, s)| s.data + s.meta)
1265                                .sum::<usize>(),
1266                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1267                            compressed_size: task_cache_stats
1268                                .iter()
1269                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1270                                .sum::<usize>()
1271                        },
1272                    );
1273
1274                    print_markdown_table(
1275                        [
1276                            "Task",
1277                            " Total Size",
1278                            " Data Size",
1279                            " Data Count x Avg",
1280                            " Data Count x Avg",
1281                            " Meta Size",
1282                            " Meta Count x Avg",
1283                            " Meta Count x Avg",
1284                            " Uppers",
1285                            " Coll",
1286                            " Agg Coll",
1287                            " Children",
1288                            " Followers",
1289                            " Coll Deps",
1290                            " Agg Dirty",
1291                            " Output Size",
1292                        ],
1293                        task_cache_stats.iter(),
1294                        |(task_desc, stats)| {
1295                            [
1296                                task_desc.to_string(),
1297                                format!(" {}", stats.format_total()),
1298                                format!(" {}", stats.format_data()),
1299                                format!(" {} x", stats.data_count),
1300                                format!("{}", stats.format_avg_data()),
1301                                format!(" {}", stats.format_meta()),
1302                                format!(" {} x", stats.meta_count),
1303                                format!("{}", stats.format_avg_meta()),
1304                                format!(" {}", stats.upper_count),
1305                                format!(" {}", stats.collectibles_count),
1306                                format!(" {}", stats.aggregated_collectibles_count),
1307                                format!(" {}", stats.children_count),
1308                                format!(" {}", stats.followers_count),
1309                                format!(" {}", stats.collectibles_dependents_count),
1310                                format!(" {}", stats.aggregated_dirty_containers_count),
1311                                format!(" {}", FormatBytes(stats.output_size)),
1312                            ]
1313                        },
1314                    );
1315                }
1316            }
1317        }
1318
1319        let elapsed = start.elapsed();
1320        let persist_duration = persist_start.elapsed();
1321        // avoid spamming the event queue with information about fast operations
1322        if elapsed > Duration::from_secs(10) {
1323            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1324                "Finished writing to filesystem cache".to_string(),
1325                elapsed,
1326            )));
1327        }
1328
1329        let wall_start_ms = wall_start
1330            .duration_since(SystemTime::UNIX_EPOCH)
1331            .unwrap_or_default()
1332            // as_millis_f64 is not stable yet
1333            .as_secs_f64()
1334            * 1000.0;
1335        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1336        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1337            "turbopack-persistence",
1338            wall_start_ms,
1339            wall_end_ms,
1340            vec![
1341                ("reason", serde_json::Value::from(reason)),
1342                (
1343                    "snapshot_duration_ms",
1344                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1345                ),
1346                (
1347                    "persist_duration_ms",
1348                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1349                ),
1350                ("task_count", serde_json::Value::from(task_count)),
1351            ],
1352        )));
1353
1354        Ok((snapshot_time, true))
1355    }
1356
1357    fn startup(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1358        if self.should_restore() {
1359            // Continue all uncompleted operations
1360            // They can't be interrupted by a snapshot since the snapshotting job has not been
1361            // scheduled yet.
1362            let uncompleted_operations = self
1363                .backing_storage
1364                .uncompleted_operations()
1365                .expect("Failed to get uncompleted operations");
1366            if !uncompleted_operations.is_empty() {
1367                let mut ctx = self.execute_context(turbo_tasks);
1368                for op in uncompleted_operations {
1369                    op.execute(&mut ctx);
1370                }
1371            }
1372        }
1373
1374        // Only when it should write regularly to the storage, we schedule the initial snapshot
1375        // job.
1376        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1377            // Schedule the snapshot job
1378            let _span = trace_span!("persisting background job").entered();
1379            let _span = tracing::info_span!("thread").entered();
1380            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1381        }
1382    }
1383
1384    fn stopping(&self) {
1385        self.stopping.store(true, Ordering::Release);
1386        self.stopping_event.notify(usize::MAX);
1387    }
1388
1389    #[allow(unused_variables)]
1390    fn stop(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1391        #[cfg(feature = "verify_aggregation_graph")]
1392        {
1393            self.is_idle.store(false, Ordering::Release);
1394            self.verify_aggregation_graph(turbo_tasks, false);
1395        }
1396        if self.should_persist()
1397            && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1398        {
1399            eprintln!("Persisting failed during shutdown: {err:?}");
1400        }
1401        self.storage.drop_contents();
1402        if let Err(err) = self.backing_storage.shutdown() {
1403            println!("Shutting down failed: {err}");
1404        }
1405    }
1406
1407    #[allow(unused_variables)]
1408    fn idle_start(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1409        self.idle_start_event.notify(usize::MAX);
1410
1411        #[cfg(feature = "verify_aggregation_graph")]
1412        {
1413            use tokio::select;
1414
1415            self.is_idle.store(true, Ordering::Release);
1416            // The spawned task reaches the backend through the pinned `turbo_tasks` handle
1417            // rather than cloning `self`: pinning keeps the `TurboTasks` (and therefore the
1418            // backend it owns) alive for the task's lifetime.
1419            let turbo_tasks = turbo_tasks.pin();
1420            tokio::task::spawn(async move {
1421                let backend = &turbo_tasks.backend();
1422                select! {
1423                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1424                        // do nothing
1425                    }
1426                    _ = backend.idle_end_event.listen() => {
1427                        return;
1428                    }
1429                }
1430                if !backend.is_idle.load(Ordering::Relaxed) {
1431                    return;
1432                }
1433                backend.verify_aggregation_graph(&turbo_tasks, true);
1434            });
1435        }
1436    }
1437
1438    fn idle_end(&self) {
1439        #[cfg(feature = "verify_aggregation_graph")]
1440        self.is_idle.store(false, Ordering::Release);
1441        self.idle_end_event.notify(usize::MAX);
1442    }
1443
1444    fn get_or_create_task(
1445        &self,
1446        native_fn: &'static NativeFunction,
1447        this: Option<RawVc>,
1448        arg: &mut dyn DynTaskInputsStorage,
1449        parent_task: Option<TaskId>,
1450        persistence: TaskPersistence,
1451        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1452    ) -> TaskId {
1453        let transient = matches!(persistence, TaskPersistence::Transient);
1454
1455        if transient
1456            && let Some(parent_task) = parent_task
1457            && !parent_task.is_transient()
1458        {
1459            let task_type = CachedTaskType {
1460                native_fn,
1461                this,
1462                arg: arg.take_box(),
1463            };
1464            self.panic_persistent_calling_transient(
1465                self.debug_get_task_description(parent_task),
1466                Some(&task_type),
1467                /* cell_id */ None,
1468            );
1469        }
1470
1471        let is_root = native_fn.is_root;
1472
1473        // Compute hash and shard index once from borrowed components (no heap allocation).
1474        let arg_ref = arg.as_ref();
1475        let hash = CachedTaskType::hash_from_components(
1476            self.storage.task_cache.hasher(),
1477            native_fn,
1478            this,
1479            arg_ref,
1480        );
1481        // Locate the shard once so that the read-only lookup and any
1482        // write-lock retry below share the same reference (saves a modulo +
1483        // memory lookup on the miss path).
1484        let shard = get_shard(&self.storage.task_cache, hash);
1485
1486        let mut ctx = self.execute_context(turbo_tasks);
1487        // Step 1: Fast read-only cache lookup (read lock, no allocation).
1488        // Use a read lock rather than a write lock to avoid contention. connect_child
1489        // may re-enter task_cache with a write lock, so we must not hold a write lock here.
1490        if let Some(task_id) =
1491            raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1492        {
1493            self.track_cache_hit_by_fn(native_fn);
1494            operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1495            return task_id;
1496        }
1497
1498        // Step 2: Check backing storage using borrowed components (no box needed yet).
1499
1500        // Task exists in backing storage.
1501        // We only need to insert it into the in-memory cache.
1502        let task_id = if !transient
1503            && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1504        {
1505            self.track_cache_hit_by_fn(native_fn);
1506            // Step 3a: Insert into in-memory cache using the pre-located shard.
1507            // Use the existing Arc from storage to avoid a duplicate allocation.
1508            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1509                k.eq_components(native_fn, this, arg_ref)
1510            }) {
1511                RawEntry::Occupied(_) => {}
1512                RawEntry::Vacant(e) => {
1513                    e.insert(stored_type, task_id);
1514                }
1515            };
1516            task_id
1517        } else {
1518            match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1519                k.eq_components(native_fn, this, arg_ref)
1520            }) {
1521                RawEntry::Occupied(e) => {
1522                    // Another thread beat us to creating this task — use their task_id.
1523                    // They will handle logging the new task as modified.
1524                    let task_id = *e.get();
1525                    drop(e);
1526                    self.track_cache_hit_by_fn(native_fn);
1527                    task_id
1528                }
1529                RawEntry::Vacant(e) => {
1530                    // Only now do we force the allocation.
1531                    // NOTE: if our caller had to perform resolution, then this will have already
1532                    // been boxed and take_box just takes it.
1533                    let task_type = CachedTaskTypeArc::new(CachedTaskType {
1534                        native_fn,
1535                        this,
1536                        arg: arg.take_box(),
1537                    });
1538                    let task_id = if transient {
1539                        self.transient_task_id_factory.get()
1540                    } else {
1541                        self.persisted_task_id_factory.get()
1542                    };
1543                    // Initialize storage BEFORE making task_id visible in the cache.
1544                    // This ensures any thread that reads task_id from the cache sees
1545                    // the storage entry already initialized (restored flags set).
1546                    self.storage
1547                        .initialize_new_task(task_id, Some(task_type.clone()));
1548                    // insert() consumes e, releasing the shard write lock.
1549                    e.insert(task_type, task_id);
1550                    self.track_cache_miss_by_fn(native_fn);
1551                    // Update the aggregation number before connecting the child
1552                    // We don't need this on any of the task recovery paths above because the
1553                    // aggregation number will already be set.
1554                    if is_root {
1555                        AggregationUpdateQueue::run(
1556                            AggregationUpdateJob::UpdateAggregationNumber {
1557                                task_id,
1558                                base_aggregation_number: u32::MAX,
1559                                distance: None,
1560                            },
1561                            &mut ctx,
1562                        );
1563                    } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1564                        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1565                        AggregationUpdateQueue::run(
1566                            AggregationUpdateJob::UpdateAggregationNumber {
1567                                task_id,
1568                                base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1569                                distance: None,
1570                            },
1571                            &mut ctx,
1572                        );
1573                    };
1574
1575                    task_id
1576                }
1577            }
1578        };
1579
1580        operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1581
1582        task_id
1583    }
1584
1585    /// Generate an object that implements [`fmt::Display`] explaining why the given
1586    /// [`CachedTaskType`] is transient.
1587    fn debug_trace_transient_task(
1588        &self,
1589        task_type: &CachedTaskType,
1590        cell_id: Option<CellId>,
1591    ) -> DebugTraceTransientTask {
1592        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1593        // from tracing the same task many times, so use a visited_set
1594        fn inner_id(
1595            backend: &TurboTasksBackend,
1596            task_id: TaskId,
1597            cell_type_id: Option<ValueTypeId>,
1598            visited_set: &mut FxHashSet<TaskId>,
1599        ) -> DebugTraceTransientTask {
1600            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1601                if visited_set.contains(&task_id) {
1602                    let task_name = task_type.get_name();
1603                    DebugTraceTransientTask::Collapsed {
1604                        task_name,
1605                        cell_type_id,
1606                    }
1607                } else {
1608                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1609                }
1610            } else {
1611                DebugTraceTransientTask::Uncached { cell_type_id }
1612            }
1613        }
1614        fn inner_cached(
1615            backend: &TurboTasksBackend,
1616            task_type: &CachedTaskType,
1617            cell_type_id: Option<ValueTypeId>,
1618            visited_set: &mut FxHashSet<TaskId>,
1619        ) -> DebugTraceTransientTask {
1620            let task_name = task_type.get_name();
1621
1622            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1623                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1624                    // `task_id` should never be `None` at this point, as that would imply a
1625                    // non-local task is returning a local `Vc`...
1626                    // Just ignore if it happens, as we're likely already panicking.
1627                    return None;
1628                };
1629                if task_id.is_transient() {
1630                    Some(Box::new(inner_id(
1631                        backend,
1632                        task_id,
1633                        cause_self_raw_vc.try_get_type_id(),
1634                        visited_set,
1635                    )))
1636                } else {
1637                    None
1638                }
1639            });
1640            let cause_args = task_type
1641                .arg
1642                .get_raw_vcs()
1643                .into_iter()
1644                .filter_map(|raw_vc| {
1645                    let Some(task_id) = raw_vc.try_get_task_id() else {
1646                        // `task_id` should never be `None` (see comment above)
1647                        return None;
1648                    };
1649                    if !task_id.is_transient() {
1650                        return None;
1651                    }
1652                    Some((task_id, raw_vc.try_get_type_id()))
1653                })
1654                .collect::<IndexSet<_>>() // dedupe
1655                .into_iter()
1656                .map(|(task_id, cell_type_id)| {
1657                    inner_id(backend, task_id, cell_type_id, visited_set)
1658                })
1659                .collect();
1660
1661            DebugTraceTransientTask::Cached {
1662                task_name,
1663                cell_type_id,
1664                cause_self,
1665                cause_args,
1666            }
1667        }
1668        inner_cached(
1669            self,
1670            task_type,
1671            cell_id.map(|c| c.type_id),
1672            &mut FxHashSet::default(),
1673        )
1674    }
1675
1676    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1677        if !self.should_track_dependencies() {
1678            panic!("Dependency tracking is disabled so invalidation is not allowed");
1679        }
1680        operation::InvalidateOperation::run(
1681            smallvec![task_id],
1682            #[cfg(feature = "task_dirty_cause")]
1683            TaskDirtyCause::Invalidator,
1684            self.execute_context(turbo_tasks),
1685        );
1686    }
1687
1688    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1689        if !self.should_track_dependencies() {
1690            panic!("Dependency tracking is disabled so invalidation is not allowed");
1691        }
1692        operation::InvalidateOperation::run(
1693            tasks.iter().copied().collect(),
1694            #[cfg(feature = "task_dirty_cause")]
1695            TaskDirtyCause::Unknown,
1696            self.execute_context(turbo_tasks),
1697        );
1698    }
1699
1700    fn invalidate_tasks_set(
1701        &self,
1702        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1703        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1704    ) {
1705        if !self.should_track_dependencies() {
1706            panic!("Dependency tracking is disabled so invalidation is not allowed");
1707        }
1708        operation::InvalidateOperation::run(
1709            tasks.iter().copied().collect(),
1710            #[cfg(feature = "task_dirty_cause")]
1711            TaskDirtyCause::Unknown,
1712            self.execute_context(turbo_tasks),
1713        );
1714    }
1715
1716    fn invalidate_serialization(
1717        &self,
1718        task_id: TaskId,
1719        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1720    ) {
1721        if task_id.is_transient() {
1722            return;
1723        }
1724        let mut ctx = self.execute_context(turbo_tasks);
1725        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1726        task.invalidate_serialization();
1727    }
1728
1729    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1730        let task = self.storage.access_mut(task_id);
1731        if let Some(value) = task.get_persistent_task_type() {
1732            format!("{task_id:?} {}", value)
1733        } else if let Some(value) = task.get_transient_task_type() {
1734            format!("{task_id:?} {}", value)
1735        } else {
1736            format!("{task_id:?} unknown")
1737        }
1738    }
1739
1740    fn get_task_name(
1741        &self,
1742        task_id: TaskId,
1743        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1744    ) -> String {
1745        let mut ctx = self.execute_context(turbo_tasks);
1746        let task = ctx.task(task_id, TaskDataCategory::Data);
1747        if let Some(value) = task.get_persistent_task_type() {
1748            value.to_string()
1749        } else if let Some(value) = task.get_transient_task_type() {
1750            value.to_string()
1751        } else {
1752            "unknown".to_string()
1753        }
1754    }
1755
1756    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1757        let task = self.storage.access_mut(task_id);
1758        task.get_persistent_task_type().cloned()
1759    }
1760
1761    fn task_execution_canceled(
1762        &self,
1763        task_id: TaskId,
1764        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1765    ) {
1766        let mut ctx = self.execute_context(turbo_tasks);
1767        let mut task = ctx.task(task_id, TaskDataCategory::All);
1768        if let Some(in_progress) = task.take_in_progress() {
1769            match in_progress {
1770                InProgressState::Scheduled {
1771                    done_event,
1772                    reason: _,
1773                } => done_event.notify(usize::MAX),
1774                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1775                    done_event.notify(usize::MAX)
1776                }
1777                InProgressState::Canceled => {}
1778            }
1779        }
1780        // Notify any readers waiting on in-progress cells so their listeners
1781        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1782        let in_progress_cells = task.take_in_progress_cells();
1783        if let Some(ref cells) = in_progress_cells {
1784            for state in cells.values() {
1785                state.event.notify(usize::MAX);
1786            }
1787        }
1788
1789        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1790        // in the next session. Without this, any reader that encounters the cancelled task
1791        // records an error in its output. That error is persisted and would poison
1792        // subsequent builds. By marking the task session-dependent dirty, the next build
1793        // re-executes it, which invalidates dependents and corrects the stale errors.
1794        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1795            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1796        } else {
1797            None
1798        };
1799
1800        let old = task.set_in_progress(InProgressState::Canceled);
1801        debug_assert!(old.is_none(), "InProgress already exists");
1802        drop(task);
1803
1804        if let Some(data_update) = data_update {
1805            AggregationUpdateQueue::run(data_update, &mut ctx);
1806        }
1807
1808        drop(in_progress_cells);
1809    }
1810
1811    fn try_start_task_execution(
1812        &self,
1813        task_id: TaskId,
1814        priority: TaskPriority,
1815        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1816    ) -> Option<TaskExecutionSpec<'_>> {
1817        let execution_reason;
1818        let task_type;
1819        #[cfg(feature = "task_dirty_cause")]
1820        let cause;
1821        {
1822            let mut ctx = self.execute_context(turbo_tasks);
1823            let mut task = ctx.task(task_id, TaskDataCategory::All);
1824            task_type = task.get_task_type().to_owned();
1825            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1826            if let Some(tasks) = task.prefetch() {
1827                drop(task);
1828                ctx.prepare_tasks(tasks, "prefetch");
1829                task = ctx.task(task_id, TaskDataCategory::All);
1830            }
1831            let in_progress = task.take_in_progress()?;
1832            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1833                let old = task.set_in_progress(in_progress);
1834                debug_assert!(old.is_none(), "InProgress already exists");
1835                return None;
1836            };
1837            execution_reason = reason;
1838            #[cfg(feature = "task_dirty_cause")]
1839            {
1840                cause = match task.get_dirty() {
1841                    Some(Dirtyness::Dirty { cause, .. }) => Some(cause.clone()),
1842                    _ => None,
1843                };
1844            }
1845            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1846                InProgressStateInner {
1847                    stale: false,
1848                    once_task,
1849                    done_event,
1850                    marked_as_completed: false,
1851                    new_children: Default::default(),
1852                },
1853            )));
1854            debug_assert!(old.is_none(), "InProgress already exists");
1855
1856            // Make all current collectibles outdated (remove left-over outdated collectibles)
1857            enum Collectible {
1858                Current(CollectibleRef, i32),
1859                Outdated(CollectibleRef),
1860            }
1861            let collectibles = task
1862                .iter_collectibles()
1863                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1864                .chain(
1865                    task.iter_outdated_collectibles()
1866                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1867                )
1868                .collect::<Vec<_>>();
1869            for collectible in collectibles {
1870                match collectible {
1871                    Collectible::Current(collectible, value) => {
1872                        let _ = task.insert_outdated_collectible(collectible, value);
1873                    }
1874                    Collectible::Outdated(collectible) => {
1875                        if task
1876                            .collectibles()
1877                            .is_none_or(|m| m.get(&collectible).is_none())
1878                        {
1879                            task.remove_outdated_collectibles(&collectible);
1880                        }
1881                    }
1882                }
1883            }
1884
1885            if self.should_track_dependencies() {
1886                // Make all dependencies outdated
1887                let cell_dependencies = task.iter_cell_dependencies().collect();
1888                task.set_outdated_cell_dependencies(cell_dependencies);
1889
1890                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1891                task.set_outdated_output_dependencies(outdated_output_dependencies);
1892            }
1893        }
1894
1895        let (span, future) = match task_type {
1896            TaskType::Cached(task_type) => {
1897                let CachedTaskType {
1898                    native_fn,
1899                    this,
1900                    arg,
1901                } = &*task_type;
1902                (
1903                    native_fn.span(
1904                        task_id.persistence(),
1905                        execution_reason,
1906                        priority,
1907                        #[cfg(feature = "task_dirty_cause")]
1908                        cause.as_ref(),
1909                    ),
1910                    native_fn.execute(*this, &**arg),
1911                )
1912            }
1913            TaskType::Transient(task_type) => {
1914                let span = tracing::trace_span!("turbo_tasks::root_task");
1915                let future = match &*task_type {
1916                    TransientTask::Root(f) => f(),
1917                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1918                };
1919                (span, future)
1920            }
1921        };
1922        Some(TaskExecutionSpec { future, span })
1923    }
1924
1925    /// Returns `Some(priority)` if the task became stale during execution and needs to be
1926    /// re-scheduled at the given priority. The caller (turbo-tasks manager) hands it to the
1927    /// priority runner so re-execution doesn't inherit the original schedule priority.
1928    fn task_execution_completed(
1929        &self,
1930        task_id: TaskId,
1931        result: Result<RawVc, TurboTasksExecutionError>,
1932        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1933        #[cfg(feature = "verify_determinism")] stateful: bool,
1934        has_invalidator: bool,
1935        turbo_tasks: &TurboTasks<TurboTasksBackend>,
1936    ) -> Option<TaskPriority> {
1937        // Task completion is a 4 step process:
1938        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1939        //    aggregation number of the task and the new children.
1940        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1941        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1942        // 4. Shrink the task memory to reduce footprint of the task.
1943
1944        // Due to persistence it is possible that the process is cancelled after any step. This is
1945        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1946        // in-memory representation.
1947
1948        // The task might be invalidated during this process, so we need to check the stale flag
1949        // at the start of every step.
1950
1951        #[cfg(not(feature = "trace_task_details"))]
1952        let span = tracing::trace_span!(
1953            "task execution completed",
1954            new_children = tracing::field::Empty
1955        )
1956        .entered();
1957        #[cfg(feature = "trace_task_details")]
1958        let span = tracing::trace_span!(
1959            "task execution completed",
1960            task_id = display(task_id),
1961            result = match result.as_ref() {
1962                Ok(value) => display(either::Either::Left(value)),
1963                Err(err) => display(either::Either::Right(err)),
1964            },
1965            new_children = tracing::field::Empty,
1966            immutable = tracing::field::Empty,
1967            new_output = tracing::field::Empty,
1968            output_dependents = tracing::field::Empty,
1969            aggregation_number = tracing::field::Empty,
1970            stale = tracing::field::Empty,
1971        )
1972        .entered();
1973
1974        let is_error = result.is_err();
1975
1976        let mut ctx = self.execute_context(turbo_tasks);
1977
1978        let TaskExecutionCompletePrepareResult {
1979            new_children,
1980            is_now_immutable,
1981            #[cfg(feature = "verify_determinism")]
1982            no_output_set,
1983            new_output,
1984            #[cfg(feature = "task_dirty_cause")]
1985            function_id,
1986            output_dependent_tasks,
1987            is_recomputation,
1988            is_session_dependent,
1989        } = match self.task_execution_completed_prepare(
1990            &mut ctx,
1991            #[cfg(feature = "trace_task_details")]
1992            &span,
1993            task_id,
1994            result,
1995            cell_counters,
1996            #[cfg(feature = "verify_determinism")]
1997            stateful,
1998            has_invalidator,
1999        ) {
2000            Ok(r) => r,
2001            Err(stale_priority) => {
2002                // Task was stale and has been rescheduled
2003                #[cfg(feature = "trace_task_details")]
2004                span.record("stale", "prepare");
2005                return Some(stale_priority);
2006            }
2007        };
2008
2009        #[cfg(feature = "trace_task_details")]
2010        span.record("new_output", new_output.is_some());
2011        #[cfg(feature = "trace_task_details")]
2012        span.record("output_dependents", output_dependent_tasks.len());
2013
2014        // When restoring from filesystem cache the following might not be executed (since we can
2015        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2016        // would be executed again.
2017
2018        if !output_dependent_tasks.is_empty() {
2019            self.task_execution_completed_invalidate_output_dependent(
2020                &mut ctx,
2021                task_id,
2022                #[cfg(feature = "task_dirty_cause")]
2023                function_id,
2024                output_dependent_tasks,
2025            );
2026        }
2027
2028        let has_new_children = !new_children.is_empty();
2029        span.record("new_children", new_children.len());
2030
2031        if has_new_children {
2032            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2033        }
2034
2035        if has_new_children
2036            && let Some(stale_priority) =
2037                self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2038        {
2039            // Task was stale and has been rescheduled
2040            #[cfg(feature = "trace_task_details")]
2041            span.record("stale", "connect");
2042            return Some(stale_priority);
2043        }
2044
2045        let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2046            &mut ctx,
2047            task_id,
2048            #[cfg(feature = "verify_determinism")]
2049            no_output_set,
2050            new_output,
2051            is_now_immutable,
2052            is_session_dependent,
2053        );
2054        if let Some(stale_priority) = stale_priority {
2055            // Task was stale and has been rescheduled
2056            #[cfg(feature = "trace_task_details")]
2057            span.record("stale", "finish");
2058            return Some(stale_priority);
2059        }
2060
2061        let removed_data = self.task_execution_completed_cleanup(
2062            &mut ctx,
2063            task_id,
2064            cell_counters,
2065            is_error,
2066            is_recomputation,
2067        );
2068
2069        // Drop data outside of critical sections
2070        drop(removed_data);
2071        drop(in_progress_cells);
2072
2073        None
2074    }
2075
2076    fn task_execution_completed_prepare(
2077        &self,
2078        ctx: &mut impl ExecuteContext<'_>,
2079        #[cfg(feature = "trace_task_details")] span: &Span,
2080        task_id: TaskId,
2081        result: Result<RawVc, TurboTasksExecutionError>,
2082        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2083        #[cfg(feature = "verify_determinism")] stateful: bool,
2084        has_invalidator: bool,
2085    ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2086        let mut task = ctx.task(task_id, TaskDataCategory::All);
2087        let is_recomputation = task.is_dirty().is_none();
2088        // Without dependency tracking, the SessionDependent dirty state is never read (no session
2089        // restore), so skip the work
2090        let is_session_dependent = self.should_track_dependencies()
2091            && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2092        let Some(in_progress) = task.get_in_progress_mut() else {
2093            panic!("Task execution completed, but task is not in progress: {task:#?}");
2094        };
2095        if matches!(in_progress, InProgressState::Canceled) {
2096            return Ok(TaskExecutionCompletePrepareResult {
2097                new_children: Default::default(),
2098                is_now_immutable: false,
2099                #[cfg(feature = "verify_determinism")]
2100                no_output_set: false,
2101                #[cfg(feature = "task_dirty_cause")]
2102                function_id: None,
2103                new_output: None,
2104                output_dependent_tasks: Default::default(),
2105                is_recomputation,
2106                is_session_dependent,
2107            });
2108        }
2109        let &mut InProgressState::InProgress(box InProgressStateInner {
2110            stale,
2111            ref mut new_children,
2112            once_task: is_once_task,
2113            ..
2114        }) = in_progress
2115        else {
2116            panic!("Task execution completed, but task is not in progress: {task:#?}");
2117        };
2118
2119        // If the task is stale, reschedule it
2120        #[cfg(not(feature = "no_fast_stale"))]
2121        if stale && !is_once_task {
2122            let stale_priority = compute_stale_priority(&task);
2123            let Some(InProgressState::InProgress(box InProgressStateInner {
2124                done_event,
2125                mut new_children,
2126                ..
2127            })) = task.take_in_progress()
2128            else {
2129                unreachable!();
2130            };
2131            let old = task.set_in_progress(InProgressState::Scheduled {
2132                done_event,
2133                reason: TaskExecutionReason::Stale,
2134            });
2135            debug_assert!(old.is_none(), "InProgress already exists");
2136            // Remove old children from new_children to leave only the children that had their
2137            // active count increased
2138            for task in task.iter_children() {
2139                new_children.remove(&task);
2140            }
2141            drop(task);
2142
2143            // We need to undo the active count increase for the children since we throw away the
2144            // new_children list now.
2145            AggregationUpdateQueue::run(
2146                AggregationUpdateJob::DecreaseActiveCounts {
2147                    task_ids: new_children.into_iter().collect(),
2148                },
2149                ctx,
2150            );
2151            return Err(stale_priority);
2152        }
2153
2154        // take the children from the task to process them
2155        let mut new_children = take(new_children);
2156
2157        // get the function id for the dirty cause
2158        #[cfg(feature = "task_dirty_cause")]
2159        let function_id = match task.get_task_type() {
2160            TaskTypeRef::Cached(task_type) => {
2161                Some(turbo_tasks::registry::get_function_id(task_type.native_fn))
2162            }
2163            TaskTypeRef::Transient(_) => None,
2164        };
2165
2166        // handle stateful (only tracked when verify_determinism is enabled)
2167        #[cfg(feature = "verify_determinism")]
2168        if stateful {
2169            task.set_stateful(true);
2170        }
2171
2172        // handle has_invalidator
2173        if has_invalidator {
2174            task.set_invalidator(true);
2175        }
2176
2177        // handle cell counters: update max index and remove cells that are no longer used
2178        // On error, skip this update: the task may have failed before creating all cells it
2179        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2180        // based on partial counters would cause "cell no longer exists" errors for tasks that
2181        // still hold dependencies on those cells. The old cell data is preserved on error
2182        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2183        // that data is correct.
2184        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2185        // skips cell data removal on error.
2186        if result.is_ok() || is_recomputation {
2187            let old_counters: FxHashMap<_, _> = task
2188                .iter_cell_type_max_index()
2189                .map(|(&k, &v)| (k, v))
2190                .collect();
2191            let mut counters_to_remove = old_counters.clone();
2192
2193            for (&cell_type, &max_index) in cell_counters.iter() {
2194                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2195                    if old_max_index != max_index {
2196                        task.insert_cell_type_max_index(cell_type, max_index);
2197                    }
2198                } else {
2199                    task.insert_cell_type_max_index(cell_type, max_index);
2200                }
2201            }
2202            for (cell_type, _) in counters_to_remove {
2203                task.remove_cell_type_max_index(&cell_type);
2204            }
2205        }
2206
2207        let mut queue = AggregationUpdateQueue::new();
2208
2209        let mut old_edges = Vec::new();
2210
2211        let has_children = !new_children.is_empty();
2212        let is_immutable = task.immutable();
2213        let task_dependencies_for_immutable =
2214            // Task was previously marked as immutable
2215            if !is_immutable
2216            // Task is not session dependent (session dependent tasks can change between sessions)
2217            && !is_session_dependent
2218            // Task has no invalidator
2219            && !task.invalidator()
2220            // Task has no dependencies on collectibles
2221            && task.is_collectibles_dependencies_empty()
2222        {
2223            Some(
2224                // Collect all dependencies on tasks to check if all dependencies are immutable
2225                task.iter_output_dependencies()
2226                    .chain(task.iter_cell_dependencies().map(|dep| dep.cell_ref().task))
2227                    .collect::<FxHashSet<_>>(),
2228            )
2229        } else {
2230            None
2231        };
2232
2233        if has_children {
2234            // Prepare all new children
2235            let _aggregation_number =
2236                prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2237
2238            #[cfg(feature = "trace_task_details")]
2239            span.record("aggregation_number", _aggregation_number);
2240
2241            // Filter actual new children
2242            old_edges.extend(
2243                task.iter_children()
2244                    .filter(|task| !new_children.remove(task))
2245                    .map(OutdatedEdge::Child),
2246            );
2247        } else {
2248            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2249        }
2250
2251        old_edges.extend(
2252            task.iter_outdated_collectibles()
2253                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2254        );
2255
2256        if self.should_track_dependencies() {
2257            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2258            // At execution start, active deps are copied to outdated as a "before" snapshot.
2259            // During execution, new deps are added to active.
2260            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2261            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2262            // breaking dependency tracking.
2263            old_edges.extend(
2264                task.iter_outdated_cell_dependencies()
2265                    .map(OutdatedEdge::CellDependency),
2266            );
2267            old_edges.extend(
2268                task.iter_outdated_output_dependencies()
2269                    .map(OutdatedEdge::OutputDependency),
2270            );
2271        }
2272
2273        // Check if output need to be updated
2274        let current_output = task.get_output();
2275        #[cfg(feature = "verify_determinism")]
2276        let no_output_set = current_output.is_none();
2277        let new_output = match result {
2278            Ok(RawVc::TaskOutput(output_task_id)) => {
2279                if let Some(OutputValue::Output(current_task_id)) = current_output
2280                    && *current_task_id == output_task_id
2281                {
2282                    None
2283                } else {
2284                    Some(OutputValue::Output(output_task_id))
2285                }
2286            }
2287            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2288                if let Some(OutputValue::Cell(CellRef {
2289                    task: current_task_id,
2290                    cell: current_cell,
2291                })) = current_output
2292                    && *current_task_id == output_task_id
2293                    && *current_cell == cell
2294                {
2295                    None
2296                } else {
2297                    Some(OutputValue::Cell(CellRef {
2298                        task: output_task_id,
2299                        cell,
2300                    }))
2301                }
2302            }
2303            Ok(RawVc::LocalOutput(..)) => {
2304                panic!("Non-local tasks must not return a local Vc");
2305            }
2306            Err(err) => {
2307                if let Some(OutputValue::Error(old_error)) = current_output
2308                    && **old_error == err
2309                {
2310                    None
2311                } else {
2312                    Some(OutputValue::Error(Arc::new((&err).into())))
2313                }
2314            }
2315        };
2316        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2317        // When output has changed, grab the dependent tasks
2318        if new_output.is_some() && ctx.should_track_dependencies() {
2319            output_dependent_tasks = task.iter_output_dependent().collect();
2320        }
2321
2322        drop(task);
2323
2324        // Check if the task can be marked as immutable
2325        let mut is_now_immutable = false;
2326        if let Some(dependencies) = task_dependencies_for_immutable
2327            && dependencies
2328                .iter()
2329                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2330        {
2331            is_now_immutable = true;
2332        }
2333        #[cfg(feature = "trace_task_details")]
2334        span.record("immutable", is_immutable || is_now_immutable);
2335
2336        if !queue.is_empty() || !old_edges.is_empty() {
2337            #[cfg(any(
2338                feature = "trace_task_completion",
2339                feature = "trace_aggregation_update_stats"
2340            ))]
2341            let _span =
2342                tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2343                    .entered();
2344            // Remove outdated edges first, before removing in_progress+dirty flag.
2345            // We need to make sure all outdated edges are removed before the task can potentially
2346            // be scheduled and executed again
2347            #[cfg(feature = "trace_aggregation_update_stats")]
2348            {
2349                let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2350                _span.record("stats", tracing::field::debug(stats));
2351            }
2352            #[cfg(not(feature = "trace_aggregation_update_stats"))]
2353            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2354        }
2355
2356        Ok(TaskExecutionCompletePrepareResult {
2357            new_children,
2358            is_now_immutable,
2359            #[cfg(feature = "verify_determinism")]
2360            no_output_set,
2361            #[cfg(feature = "task_dirty_cause")]
2362            function_id,
2363            new_output,
2364            output_dependent_tasks,
2365            is_recomputation,
2366            is_session_dependent,
2367        })
2368    }
2369
2370    fn task_execution_completed_invalidate_output_dependent(
2371        &self,
2372        ctx: &mut impl ExecuteContext<'_>,
2373        task_id: TaskId,
2374        #[cfg(feature = "task_dirty_cause")] function_id: Option<FunctionId>,
2375        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2376    ) {
2377        debug_assert!(!output_dependent_tasks.is_empty());
2378
2379        #[cfg(feature = "task_dirty_cause")]
2380        let cause = match function_id {
2381            Some(function) => TaskDirtyCause::OutputChange { function },
2382            None => TaskDirtyCause::RootOutputChange,
2383        };
2384
2385        if output_dependent_tasks.len() > 1 {
2386            ctx.prepare_tasks(
2387                output_dependent_tasks
2388                    .iter()
2389                    .map(|&id| (id, TaskDataCategory::All)),
2390                "invalidate output dependents",
2391            );
2392        }
2393
2394        fn process_output_dependents(
2395            ctx: &mut impl ExecuteContext<'_>,
2396            task_id: TaskId,
2397            #[cfg(feature = "task_dirty_cause")] cause: &TaskDirtyCause,
2398            dependent_task_id: TaskId,
2399            queue: &mut AggregationUpdateQueue,
2400        ) {
2401            #[cfg(feature = "trace_task_output_dependencies")]
2402            let span = tracing::trace_span!(
2403                "invalidate output dependency",
2404                task = %task_id,
2405                dependent_task = %dependent_task_id,
2406                result = tracing::field::Empty,
2407            )
2408            .entered();
2409            let mut make_stale = true;
2410            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2411            let transient_task_type = dependent.get_transient_task_type();
2412            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2413                // once tasks are never invalidated
2414                #[cfg(feature = "trace_task_output_dependencies")]
2415                span.record("result", "once task");
2416                return;
2417            }
2418            if dependent.outdated_output_dependencies_contains(&task_id) {
2419                #[cfg(feature = "trace_task_output_dependencies")]
2420                span.record("result", "outdated dependency");
2421                // output dependency is outdated, so it hasn't read the output yet
2422                // and doesn't need to be invalidated
2423                // But importantly we still need to make the task dirty as it should no longer
2424                // be considered as "recomputation".
2425                make_stale = false;
2426            } else if !dependent.output_dependencies_contains(&task_id) {
2427                // output dependency has been removed, so the task doesn't depend on the
2428                // output anymore and doesn't need to be invalidated
2429                #[cfg(feature = "trace_task_output_dependencies")]
2430                span.record("result", "no backward dependency");
2431                return;
2432            }
2433            make_task_dirty_internal(
2434                dependent,
2435                dependent_task_id,
2436                make_stale,
2437                #[cfg(feature = "task_dirty_cause")]
2438                cause.clone(),
2439                queue,
2440                ctx,
2441            );
2442            #[cfg(feature = "trace_task_output_dependencies")]
2443            span.record("result", "marked dirty");
2444        }
2445
2446        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2447            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2448            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2449            let _ = scope_and_block(chunks.len(), |scope| {
2450                for chunk in chunks {
2451                    let child_ctx = ctx.child_context();
2452                    #[cfg(feature = "task_dirty_cause")]
2453                    let cause = &cause;
2454                    scope.spawn(move || {
2455                        let mut ctx = child_ctx.create();
2456                        let mut queue = AggregationUpdateQueue::new();
2457                        for dependent_task_id in chunk {
2458                            process_output_dependents(
2459                                &mut ctx,
2460                                task_id,
2461                                #[cfg(feature = "task_dirty_cause")]
2462                                cause,
2463                                dependent_task_id,
2464                                &mut queue,
2465                            )
2466                        }
2467                        queue.execute(&mut ctx);
2468                    });
2469                }
2470            });
2471        } else {
2472            let mut queue = AggregationUpdateQueue::new();
2473            for dependent_task_id in output_dependent_tasks {
2474                process_output_dependents(
2475                    ctx,
2476                    task_id,
2477                    #[cfg(feature = "task_dirty_cause")]
2478                    &cause,
2479                    dependent_task_id,
2480                    &mut queue,
2481                );
2482            }
2483            queue.execute(ctx);
2484        }
2485    }
2486
2487    fn task_execution_completed_unfinished_children_dirty(
2488        &self,
2489        ctx: &mut impl ExecuteContext<'_>,
2490        new_children: &FxHashSet<TaskId>,
2491    ) {
2492        debug_assert!(!new_children.is_empty());
2493
2494        let mut queue = AggregationUpdateQueue::new();
2495        ctx.for_each_task_all(
2496            new_children.iter().copied(),
2497            "unfinished children dirty",
2498            |child_task, ctx| {
2499                if !child_task.has_output() {
2500                    let child_id = child_task.id();
2501                    make_task_dirty_internal(
2502                        child_task,
2503                        child_id,
2504                        false,
2505                        #[cfg(feature = "task_dirty_cause")]
2506                        TaskDirtyCause::InitialDirty,
2507                        &mut queue,
2508                        ctx,
2509                    );
2510                }
2511            },
2512        );
2513
2514        queue.execute(ctx);
2515    }
2516
2517    fn task_execution_completed_connect(
2518        &self,
2519        ctx: &mut impl ExecuteContext<'_>,
2520        task_id: TaskId,
2521        new_children: FxHashSet<TaskId>,
2522    ) -> Option<TaskPriority> {
2523        debug_assert!(!new_children.is_empty());
2524
2525        let mut task = ctx.task(task_id, TaskDataCategory::All);
2526        let Some(in_progress) = task.get_in_progress() else {
2527            panic!("Task execution completed, but task is not in progress: {task:#?}");
2528        };
2529        if matches!(in_progress, InProgressState::Canceled) {
2530            // Task was canceled in the meantime, so we don't connect the children
2531            return None;
2532        }
2533        let InProgressState::InProgress(box InProgressStateInner {
2534            #[cfg(not(feature = "no_fast_stale"))]
2535            stale,
2536            once_task: is_once_task,
2537            ..
2538        }) = in_progress
2539        else {
2540            panic!("Task execution completed, but task is not in progress: {task:#?}");
2541        };
2542
2543        // If the task is stale, reschedule it
2544        #[cfg(not(feature = "no_fast_stale"))]
2545        if *stale && !is_once_task {
2546            let stale_priority = compute_stale_priority(&task);
2547            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2548                task.take_in_progress()
2549            else {
2550                unreachable!();
2551            };
2552            let old = task.set_in_progress(InProgressState::Scheduled {
2553                done_event,
2554                reason: TaskExecutionReason::Stale,
2555            });
2556            debug_assert!(old.is_none(), "InProgress already exists");
2557            drop(task);
2558
2559            // All `new_children` are currently hold active with an active count and we need to undo
2560            // that. (We already filtered out the old children from that list)
2561            AggregationUpdateQueue::run(
2562                AggregationUpdateJob::DecreaseActiveCounts {
2563                    task_ids: new_children.into_iter().collect(),
2564                },
2565                ctx,
2566            );
2567            return Some(stale_priority);
2568        }
2569
2570        let has_active_count = ctx.should_track_activeness()
2571            && task
2572                .get_activeness()
2573                .is_some_and(|activeness| activeness.active_counter > 0);
2574        connect_children(
2575            ctx,
2576            task_id,
2577            task,
2578            new_children,
2579            has_active_count,
2580            ctx.should_track_activeness(),
2581        );
2582
2583        None
2584    }
2585
2586    #[allow(clippy::type_complexity)]
2587    fn task_execution_completed_finish(
2588        &self,
2589        ctx: &mut impl ExecuteContext<'_>,
2590        task_id: TaskId,
2591        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2592        new_output: Option<OutputValue>,
2593        is_now_immutable: bool,
2594        is_session_dependent: bool,
2595    ) -> (
2596        Option<TaskPriority>,
2597        Option<
2598            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2599        >,
2600    ) {
2601        let mut task = ctx.task(task_id, TaskDataCategory::All);
2602        let Some(in_progress) = task.take_in_progress() else {
2603            panic!("Task execution completed, but task is not in progress: {task:#?}");
2604        };
2605        if matches!(in_progress, InProgressState::Canceled) {
2606            // Task was canceled in the meantime, so we don't finish it
2607            return (None, None);
2608        }
2609        let InProgressState::InProgress(box InProgressStateInner {
2610            done_event,
2611            once_task: is_once_task,
2612            stale,
2613            marked_as_completed: _,
2614            new_children,
2615        }) = in_progress
2616        else {
2617            panic!("Task execution completed, but task is not in progress: {task:#?}");
2618        };
2619        debug_assert!(new_children.is_empty());
2620
2621        // If the task is stale, reschedule it
2622        if stale && !is_once_task {
2623            let stale_priority = compute_stale_priority(&task);
2624            let old = task.set_in_progress(InProgressState::Scheduled {
2625                done_event,
2626                reason: TaskExecutionReason::Stale,
2627            });
2628            debug_assert!(old.is_none(), "InProgress already exists");
2629            return (Some(stale_priority), None);
2630        }
2631
2632        // Set the output if it has changed
2633        let mut old_content = None;
2634        if let Some(value) = new_output {
2635            old_content = task.set_output(value);
2636        }
2637
2638        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2639        // to be invalidated and we can mark it as immutable.
2640        if is_now_immutable {
2641            task.set_immutable(true);
2642        }
2643
2644        // Notify in progress cells and remove all of them
2645        let in_progress_cells = task.take_in_progress_cells();
2646        if let Some(ref cells) = in_progress_cells {
2647            for state in cells.values() {
2648                state.event.notify(usize::MAX);
2649            }
2650        }
2651
2652        // Compute and apply the new dirty state, propagating to aggregating ancestors
2653        let new_dirtyness = if is_session_dependent {
2654            Some(Dirtyness::SessionDependent)
2655        } else {
2656            None
2657        };
2658        #[cfg(feature = "verify_determinism")]
2659        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2660        let data_update = task.update_dirty_state(new_dirtyness);
2661
2662        // Under verify_determinism we re-run a task whose dirty state or output unexpectedly
2663        // changed during execution to confirm determinism. The leaf priority keeps these
2664        // verification reruns at the highest priority.
2665        #[cfg(feature = "verify_determinism")]
2666        let stale_priority: Option<TaskPriority> =
2667            ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2668                .then(TaskPriority::leaf);
2669        #[cfg(not(feature = "verify_determinism"))]
2670        let stale_priority: Option<TaskPriority> = None;
2671        if stale_priority.is_some() {
2672            let old = task.set_in_progress(InProgressState::Scheduled {
2673                done_event,
2674                reason: TaskExecutionReason::Stale,
2675            });
2676            debug_assert!(old.is_none(), "InProgress already exists");
2677            drop(task);
2678        } else {
2679            drop(task);
2680
2681            // Notify dependent tasks that are waiting for this task to finish
2682            done_event.notify(usize::MAX);
2683        }
2684
2685        drop(old_content);
2686
2687        if let Some(data_update) = data_update {
2688            AggregationUpdateQueue::run(data_update, ctx);
2689        }
2690
2691        // We return so the data can be dropped outside of critical sections
2692        (stale_priority, in_progress_cells)
2693    }
2694
2695    fn task_execution_completed_cleanup(
2696        &self,
2697        ctx: &mut impl ExecuteContext<'_>,
2698        task_id: TaskId,
2699        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2700        is_error: bool,
2701        is_recomputation: bool,
2702    ) -> Vec<SharedReference> {
2703        let mut task = ctx.task(task_id, TaskDataCategory::All);
2704        let mut removed_cell_data = Vec::new();
2705        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2706        // after an error as it is likely transient and we want to keep the dependent tasks
2707        // clean to avoid re-executions.
2708        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2709        // skips cell_type_max_index updates on error.
2710        if !is_error || is_recomputation {
2711            // Remove no longer existing cells and
2712            // find all outdated data items (removed cells, outdated edges)
2713            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2714            // anyway and we want to avoid needless re-executions. When the cells become
2715            // used again, they are invalidated from the update cell operation.
2716            let to_remove: Vec<_> = task
2717                .iter_cell_data()
2718                .filter_map(|(cell, _)| {
2719                    cell_counters
2720                        .get(&cell.type_id)
2721                        .is_none_or(|start_index| cell.index >= *start_index)
2722                        .then_some(*cell)
2723                })
2724                .collect();
2725            removed_cell_data.reserve_exact(to_remove.len());
2726            for cell in to_remove {
2727                if let Some(data) = task.remove_cell_data(&cell) {
2728                    removed_cell_data.push(data);
2729                }
2730            }
2731            // Remove cell_data_hash entries for cells that no longer exist
2732            let to_remove_hash: Vec<_> = task
2733                .iter_cell_data_hash()
2734                .filter_map(|(cell, _)| {
2735                    cell_counters
2736                        .get(&cell.type_id)
2737                        .is_none_or(|start_index| cell.index >= *start_index)
2738                        .then_some(*cell)
2739                })
2740                .collect();
2741            for cell in to_remove_hash {
2742                task.remove_cell_data_hash(&cell);
2743            }
2744        }
2745
2746        // Clean up task storage after execution:
2747        // - Shrink collections marked with shrink_on_completion
2748        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2749        task.cleanup_after_execution();
2750
2751        drop(task);
2752
2753        // Return so we can drop outside of critical sections
2754        removed_cell_data
2755    }
2756
2757    /// Prints the standard message emitted when the background persisting process stops due to an
2758    /// unrecoverable write error. The caller is responsible for returning from the background job.
2759    fn log_unrecoverable_persist_error() {
2760        eprintln!(
2761            "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2762             background persisting process."
2763        );
2764    }
2765
2766    fn run_backend_job<'a>(
2767        &'a self,
2768        job: TurboTasksBackendJob,
2769        turbo_tasks: &'a TurboTasks<TurboTasksBackend>,
2770    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2771        Box::pin(async move {
2772            match job {
2773                TurboTasksBackendJob::Snapshot => {
2774                    debug_assert!(self.should_persist());
2775
2776                    let mut last_snapshot = self.start_time;
2777                    let mut idle_start_listener = self.idle_start_event.listen();
2778                    let mut idle_end_listener = self.idle_end_event.listen();
2779                    // Whether to immediately set an idle timeout if possible.
2780                    // Set to false if we don't persist anything in a cycle.
2781                    let mut fresh_idle = true;
2782                    let mut evicted = false;
2783                    let mut is_first = true;
2784                    'outer: loop {
2785                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2786                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2787                        let idle_timeout = *IDLE_TIMEOUT;
2788                        let (time, mut reason) = if is_first {
2789                            (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2790                        } else {
2791                            (SNAPSHOT_INTERVAL, "regular snapshot interval")
2792                        };
2793
2794                        let until = last_snapshot + time;
2795                        if until > Instant::now() {
2796                            let mut stop_listener = self.stopping_event.listen();
2797                            if self.stopping.load(Ordering::Acquire) {
2798                                return;
2799                            }
2800                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2801                                Instant::now() + idle_timeout
2802                            } else {
2803                                far_future()
2804                            };
2805                            loop {
2806                                tokio::select! {
2807                                    _ = &mut stop_listener => {
2808                                        return;
2809                                    },
2810                                    _ = &mut idle_start_listener => {
2811                                        idle_time = Instant::now() + idle_timeout;
2812                                        idle_start_listener = self.idle_start_event.listen()
2813                                    },
2814                                    _ = &mut idle_end_listener => {
2815                                        idle_time = far_future();
2816                                        idle_end_listener = self.idle_end_event.listen()
2817                                    },
2818                                    _ = tokio::time::sleep_until(until) => {
2819                                        break;
2820                                    },
2821                                    _ = tokio::time::sleep_until(idle_time) => {
2822                                        if turbo_tasks.is_idle() {
2823                                            reason = "idle timeout";
2824                                            break;
2825                                        }
2826                                    },
2827                                }
2828                            }
2829                        }
2830
2831                        // Create a root span shared by both the snapshot/persist
2832                        // work and the subsequent compaction so they appear
2833                        // grouped together in trace viewers.
2834                        let background_span =
2835                            tracing::info_span!(parent: None, "background snapshot");
2836                        match self.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2837                            Err(err) => {
2838                                // save_snapshot consumed persisted_task_cache_log entries;
2839                                // further snapshots would corrupt the task graph.
2840                                eprintln!("Persisting failed: {err:?}");
2841                                Self::log_unrecoverable_persist_error();
2842                                return;
2843                            }
2844                            Ok((snapshot_start, new_data)) => {
2845                                // if we see 'new_data' then the next idle transition is 'fresh'
2846                                fresh_idle = new_data;
2847                                is_first = false;
2848                                last_snapshot = snapshot_start;
2849
2850                                // Polls the idle-end event without blocking. Returns
2851                                // `true` and refreshes the listener if idle has ended,
2852                                // `false` if we are still idle.
2853                                macro_rules! check_idle_ended {
2854                                    () => {{
2855                                        tokio::select! {
2856                                            biased;
2857                                            _ = &mut idle_end_listener => {
2858                                                idle_end_listener = self.idle_end_event.listen();
2859                                                true
2860                                            },
2861                                            _ = std::future::ready(()) => false,
2862                                        }
2863                                    }};
2864                                }
2865                                // Evict persisted tasks from memory to reclaim space.
2866                                // Like compaction, this runs after snapshot_and_persist
2867                                // as a separate concern.
2868                                //
2869                                // TODO: improve eviction policy — current approach is a full sweep
2870                                // after every snapshot. Better strategies to consider:
2871                                //   - Memory pressure signals: only evict when RSS exceeds a
2872                                //     threshold rather than unconditionally.
2873                                //   - Recency data: track last-access time per task and evict
2874                                //     least-recently-used entries first rather than all at once.
2875                                //   - Eviction intensity: partial sweeps (evict a fraction of
2876                                //     eligible tasks per cycle) to reduce latency spikes.
2877                                // Evict when there is new data to persist (the common
2878                                // case) or on the very first snapshot after startup
2879                                // (data was already on disk from a prior run, so
2880                                // new_data may be false but in-memory state can still
2881                                // be evicted).
2882                                let mut ran_eviction = false;
2883                                if self.should_evict() && (new_data || !evicted) {
2884                                    if check_idle_ended!() {
2885                                        // need to start all the way over so we catch the next
2886                                        // signal
2887                                        continue 'outer;
2888                                    }
2889                                    evicted = true;
2890                                    ran_eviction = true;
2891                                    self.storage.evict_after_snapshot(background_span.id());
2892                                }
2893
2894                                // Compact while idle (up to limit), regardless of
2895                                // whether the snapshot had new data.
2896                                // `background_span` is not entered here because
2897                                // `EnteredSpan` is `!Send` and would prevent the
2898                                // future from being sent across threads when it
2899                                // suspends at the `select!` await below.
2900                                let mut ran_compaction = false;
2901                                const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2902                                for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2903                                    if check_idle_ended!() {
2904                                        continue 'outer;
2905                                    }
2906                                    // Enter the span only around the synchronous
2907                                    // compact() call so we never hold an
2908                                    // `EnteredSpan` across an await point.
2909                                    let _compact_span = tracing::info_span!(
2910                                        parent: background_span.id(),
2911                                        "compact database"
2912                                    )
2913                                    .entered();
2914                                    match self.backing_storage.compact() {
2915                                        Ok(true) => {
2916                                            ran_compaction = true;
2917                                        }
2918                                        Ok(false) => break,
2919                                        Err(err) => {
2920                                            eprintln!("Compaction failed: {err:?}");
2921                                            if self.backing_storage.has_unrecoverable_write_error()
2922                                            {
2923                                                Self::log_unrecoverable_persist_error();
2924                                                return;
2925                                            }
2926                                            break;
2927                                        }
2928                                    }
2929                                }
2930                                if check_idle_ended!() {
2931                                    continue 'outer;
2932                                }
2933                                // After running snapshotting/eviction/compaction we have churned a
2934                                // _lot_ of memory if we are still
2935                                // idle tell `mimalloc` that now would be a good time to release
2936                                // memory back to the OS
2937                                if new_data || ran_compaction || ran_eviction {
2938                                    turbo_tasks_malloc::TurboMalloc::collect(true);
2939                                }
2940                            }
2941                        }
2942                    }
2943                }
2944            }
2945        })
2946    }
2947
2948    fn try_read_own_task_cell(
2949        &self,
2950        task_id: TaskId,
2951        cell: CellId,
2952        turbo_tasks: &TurboTasks<TurboTasksBackend>,
2953    ) -> Result<TypedCellContent> {
2954        let mut ctx = self.execute_context(turbo_tasks);
2955        let task = ctx.task(task_id, TaskDataCategory::Data);
2956        if let Some(content) = task.get_cell_data(&cell).cloned() {
2957            Ok(CellContent(Some(content)).into_typed(cell.type_id))
2958        } else {
2959            Ok(CellContent(None).into_typed(cell.type_id))
2960        }
2961    }
2962
2963    fn read_task_collectibles(
2964        &self,
2965        task_id: TaskId,
2966        collectible_type: TraitTypeId,
2967        reader_id: Option<TaskId>,
2968        turbo_tasks: &TurboTasks<TurboTasksBackend>,
2969    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2970        let mut ctx = self.execute_context(turbo_tasks);
2971        let mut collectibles = AutoMap::default();
2972        {
2973            let mut task = ctx.task(task_id, TaskDataCategory::All);
2974            if task
2975                .get_persistent_task_type()
2976                .is_some_and(|t| !t.native_fn.is_root)
2977            {
2978                drop(task);
2979                panic!(
2980                    "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
2981                     is missing on the task.",
2982                    self.debug_get_task_description(task_id),
2983                    reader_id.map_or_else(
2984                        || "unknown".to_string(),
2985                        |r| self.debug_get_task_description(r)
2986                    )
2987                );
2988            }
2989            for (collectible, count) in task.iter_aggregated_collectibles() {
2990                if *count > 0 && collectible.collectible_type == collectible_type {
2991                    *collectibles
2992                        .entry(RawVc::TaskCell(
2993                            collectible.cell.task,
2994                            collectible.cell.cell,
2995                        ))
2996                        .or_insert(0) += 1;
2997                }
2998            }
2999            for (&collectible, &count) in task.iter_collectibles() {
3000                if collectible.collectible_type == collectible_type {
3001                    *collectibles
3002                        .entry(RawVc::TaskCell(
3003                            collectible.cell.task,
3004                            collectible.cell.cell,
3005                        ))
3006                        .or_insert(0) += count;
3007                }
3008            }
3009            if let Some(reader_id) = reader_id {
3010                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3011            }
3012        }
3013        if let Some(reader_id) = reader_id {
3014            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3015            let target = CollectiblesRef {
3016                task: task_id,
3017                collectible_type,
3018            };
3019            if !reader.remove_outdated_collectibles_dependencies(&target) {
3020                let _ = reader.add_collectibles_dependencies(target);
3021            }
3022        }
3023        collectibles
3024    }
3025
3026    fn emit_collectible(
3027        &self,
3028        collectible_type: TraitTypeId,
3029        collectible: RawVc,
3030        task_id: TaskId,
3031        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3032    ) {
3033        self.assert_valid_collectible(task_id, collectible);
3034
3035        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3036            panic!("Collectibles need to be resolved");
3037        };
3038        let cell = CellRef {
3039            task: collectible_task,
3040            cell,
3041        };
3042        operation::UpdateCollectibleOperation::run(
3043            task_id,
3044            CollectibleRef {
3045                collectible_type,
3046                cell,
3047            },
3048            1,
3049            self.execute_context(turbo_tasks),
3050        );
3051    }
3052
3053    fn unemit_collectible(
3054        &self,
3055        collectible_type: TraitTypeId,
3056        collectible: RawVc,
3057        count: u32,
3058        task_id: TaskId,
3059        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3060    ) {
3061        self.assert_valid_collectible(task_id, collectible);
3062
3063        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3064            panic!("Collectibles need to be resolved");
3065        };
3066        let cell = CellRef {
3067            task: collectible_task,
3068            cell,
3069        };
3070        operation::UpdateCollectibleOperation::run(
3071            task_id,
3072            CollectibleRef {
3073                collectible_type,
3074                cell,
3075            },
3076            -(i32::try_from(count).unwrap()),
3077            self.execute_context(turbo_tasks),
3078        );
3079    }
3080
3081    fn update_task_cell(
3082        &self,
3083        task_id: TaskId,
3084        cell: CellId,
3085        content: CellContent,
3086        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3087        content_hash: Option<CellHash>,
3088        verification_mode: VerificationMode,
3089        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3090    ) {
3091        operation::UpdateCellOperation::run(
3092            task_id,
3093            cell,
3094            content,
3095            updated_key_hashes,
3096            content_hash,
3097            verification_mode,
3098            self.execute_context(turbo_tasks),
3099        );
3100    }
3101
3102    fn mark_own_task_as_finished(&self, task: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
3103        let mut ctx = self.execute_context(turbo_tasks);
3104        let mut task = ctx.task(task, TaskDataCategory::Data);
3105        if let Some(InProgressState::InProgress(box InProgressStateInner {
3106            marked_as_completed,
3107            ..
3108        })) = task.get_in_progress_mut()
3109        {
3110            *marked_as_completed = true;
3111            // TODO this should remove the dirty state (also check session_dependent)
3112            // but this would break some assumptions for strongly consistent reads.
3113            // Client tasks are not connected yet, so we wouldn't wait for them.
3114            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3115        }
3116    }
3117
3118    fn connect_task(
3119        &self,
3120        task: TaskId,
3121        parent_task: Option<TaskId>,
3122        turbo_tasks: &TurboTasks<TurboTasksBackend>,
3123    ) {
3124        self.assert_not_persistent_calling_transient(parent_task, task, None);
3125        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3126    }
3127
3128    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3129        let task_id = self.transient_task_id_factory.get();
3130        {
3131            let mut task = self.storage.access_mut(task_id);
3132            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3133        }
3134        #[cfg(feature = "verify_aggregation_graph")]
3135        self.root_tasks.lock().insert(task_id);
3136        task_id
3137    }
3138
3139    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
3140        #[cfg(feature = "verify_aggregation_graph")]
3141        self.root_tasks.lock().remove(&task_id);
3142
3143        let mut ctx = self.execute_context(turbo_tasks);
3144        let mut task = ctx.task(task_id, TaskDataCategory::All);
3145        let is_dirty = task.is_dirty();
3146        let has_dirty_containers = task.has_dirty_containers();
3147        if is_dirty.is_some() || has_dirty_containers {
3148            if let Some(activeness_state) = task.get_activeness_mut() {
3149                // We will finish the task, but it would be removed after the task is done
3150                activeness_state.unset_root_type();
3151                activeness_state.set_active_until_clean();
3152            };
3153        } else if let Some(activeness_state) = task.take_activeness() {
3154            // Technically nobody should be listening to this event, but just in case
3155            // we notify it anyway
3156            activeness_state.all_clean_event.notify(usize::MAX);
3157        }
3158    }
3159
3160    #[cfg(feature = "verify_aggregation_graph")]
3161    fn verify_aggregation_graph(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>, idle: bool) {
3162        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3163            return;
3164        }
3165        use std::{collections::VecDeque, env, io::stdout};
3166
3167        use crate::backend::operation::{get_uppers, is_aggregating_node};
3168
3169        let mut ctx = self.execute_context(turbo_tasks);
3170        let root_tasks = self.root_tasks.lock().clone();
3171
3172        for task_id in root_tasks.into_iter() {
3173            let mut queue = VecDeque::new();
3174            let mut visited = FxHashSet::default();
3175            let mut aggregated_nodes = FxHashSet::default();
3176            let mut collectibles = FxHashMap::default();
3177            let root_task_id = task_id;
3178            visited.insert(task_id);
3179            aggregated_nodes.insert(task_id);
3180            queue.push_back(task_id);
3181            let mut counter = 0;
3182            while let Some(task_id) = queue.pop_front() {
3183                counter += 1;
3184                if counter % 100000 == 0 {
3185                    println!(
3186                        "queue={}, visited={}, aggregated_nodes={}",
3187                        queue.len(),
3188                        visited.len(),
3189                        aggregated_nodes.len()
3190                    );
3191                }
3192                let task = ctx.task(task_id, TaskDataCategory::All);
3193                if idle && !self.is_idle.load(Ordering::Relaxed) {
3194                    return;
3195                }
3196
3197                let uppers = get_uppers(&task);
3198                if task_id != root_task_id
3199                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3200                {
3201                    panic!(
3202                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3203                         {:?})",
3204                        task_id,
3205                        task.get_task_description(),
3206                        uppers
3207                    );
3208                }
3209
3210                for (collectible, _) in task.iter_aggregated_collectibles() {
3211                    collectibles
3212                        .entry(*collectible)
3213                        .or_insert_with(|| (false, Vec::new()))
3214                        .1
3215                        .push(task_id);
3216                }
3217
3218                for (&collectible, &value) in task.iter_collectibles() {
3219                    if value > 0 {
3220                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3221                            *flag = true
3222                        } else {
3223                            panic!(
3224                                "Task {} has a collectible {:?} that is not in any upper task",
3225                                task_id, collectible
3226                            );
3227                        }
3228                    }
3229                }
3230
3231                let is_dirty = task.has_dirty();
3232                let has_dirty_container = task.has_dirty_containers();
3233                let should_be_in_upper = is_dirty || has_dirty_container;
3234
3235                let aggregation_number = get_aggregation_number(&task);
3236                if is_aggregating_node(aggregation_number) {
3237                    aggregated_nodes.insert(task_id);
3238                }
3239                // println!(
3240                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3241                //     ctx.get_task_description(task_id),
3242                //     uppers
3243                // );
3244
3245                for child_id in task.iter_children() {
3246                    // println!("{task_id}: child -> {child_id}");
3247                    if visited.insert(child_id) {
3248                        queue.push_back(child_id);
3249                    }
3250                }
3251                drop(task);
3252
3253                if should_be_in_upper {
3254                    for upper_id in uppers {
3255                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3256                        let in_upper = upper
3257                            .get_aggregated_dirty_containers(&task_id)
3258                            .is_some_and(|&dirty| dirty > 0);
3259                        if !in_upper {
3260                            let containers: Vec<_> = upper
3261                                .iter_aggregated_dirty_containers()
3262                                .map(|(&k, &v)| (k, v))
3263                                .collect();
3264                            let upper_task_desc = upper.get_task_description();
3265                            drop(upper);
3266                            panic!(
3267                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3268                                 ({})\nThese dirty containers are present:\n{:#?}",
3269                                task_id,
3270                                ctx.task(task_id, TaskDataCategory::Data)
3271                                    .get_task_description(),
3272                                upper_id,
3273                                upper_task_desc,
3274                                containers,
3275                            );
3276                        }
3277                    }
3278                }
3279            }
3280
3281            for (collectible, (flag, task_ids)) in collectibles {
3282                if !flag {
3283                    use std::io::Write;
3284                    let mut stdout = stdout().lock();
3285                    writeln!(
3286                        stdout,
3287                        "{:?} that is not emitted in any child task but in these aggregated \
3288                         tasks: {:#?}",
3289                        collectible,
3290                        task_ids
3291                            .iter()
3292                            .map(|t| format!(
3293                                "{t} {}",
3294                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3295                            ))
3296                            .collect::<Vec<_>>()
3297                    )
3298                    .unwrap();
3299
3300                    let task_id = collectible.cell.task;
3301                    let mut queue = {
3302                        let task = ctx.task(task_id, TaskDataCategory::All);
3303                        get_uppers(&task)
3304                    };
3305                    let mut visited = FxHashSet::default();
3306                    for &upper_id in queue.iter() {
3307                        visited.insert(upper_id);
3308                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3309                    }
3310                    while let Some(task_id) = queue.pop() {
3311                        let task = ctx.task(task_id, TaskDataCategory::All);
3312                        let desc = task.get_task_description();
3313                        let aggregated_collectible = task
3314                            .get_aggregated_collectibles(&collectible)
3315                            .copied()
3316                            .unwrap_or_default();
3317                        let uppers = get_uppers(&task);
3318                        drop(task);
3319                        writeln!(
3320                            stdout,
3321                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3322                        )
3323                        .unwrap();
3324                        if task_ids.contains(&task_id) {
3325                            writeln!(
3326                                stdout,
3327                                "Task has an upper connection to an aggregated task that doesn't \
3328                                 reference it. Upper connection is invalid!"
3329                            )
3330                            .unwrap();
3331                        }
3332                        for upper_id in uppers {
3333                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3334                            if !visited.contains(&upper_id) {
3335                                queue.push(upper_id);
3336                            }
3337                        }
3338                    }
3339                    panic!("See stdout for more details");
3340                }
3341            }
3342        }
3343    }
3344
3345    fn assert_not_persistent_calling_transient(
3346        &self,
3347        parent_id: Option<TaskId>,
3348        child_id: TaskId,
3349        cell_id: Option<CellId>,
3350    ) {
3351        if let Some(parent_id) = parent_id
3352            && !parent_id.is_transient()
3353            && child_id.is_transient()
3354        {
3355            self.panic_persistent_calling_transient(
3356                self.debug_get_task_description(parent_id),
3357                self.debug_get_cached_task_type(child_id).as_deref(),
3358                cell_id,
3359            );
3360        }
3361    }
3362
3363    fn panic_persistent_calling_transient(
3364        &self,
3365        parent: String,
3366        child: Option<&CachedTaskType>,
3367        cell_id: Option<CellId>,
3368    ) -> ! {
3369        let transient_reason = if let Some(child) = child {
3370            Cow::Owned(format!(
3371                " The callee is transient because it depends on:\n{}",
3372                self.debug_trace_transient_task(child, cell_id),
3373            ))
3374        } else {
3375            Cow::Borrowed("")
3376        };
3377        panic!(
3378            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3379            parent,
3380            child.map_or("unknown", |t| t.get_name()),
3381            transient_reason,
3382        );
3383    }
3384
3385    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3386        // these checks occur in a potentially hot codepath, but they're cheap
3387        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3388            // This should never happen: The collectible APIs use ResolvedVc
3389            let task_info = if let Some(col_task_ty) = collectible
3390                .try_get_task_id()
3391                .map(|t| self.debug_get_task_description(t))
3392            {
3393                Cow::Owned(format!(" (return type of {col_task_ty})"))
3394            } else {
3395                Cow::Borrowed("")
3396            };
3397            panic!("Collectible{task_info} must be a ResolvedVc")
3398        };
3399        if col_task_id.is_transient() && !task_id.is_transient() {
3400            let transient_reason =
3401                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3402                    Cow::Owned(format!(
3403                        ". The collectible is transient because it depends on:\n{}",
3404                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3405                    ))
3406                } else {
3407                    Cow::Borrowed("")
3408                };
3409            // this should never happen: How would a persistent function get a transient Vc?
3410            panic!(
3411                "Collectible is transient, transient collectibles cannot be emitted from \
3412                 persistent tasks{transient_reason}",
3413            )
3414        }
3415    }
3416}
3417
3418impl Backend for TurboTasksBackend {
3419    fn startup(&self, turbo_tasks: &TurboTasks<Self>) {
3420        self.startup(turbo_tasks);
3421    }
3422
3423    fn stopping(&self, _turbo_tasks: &TurboTasks<Self>) {
3424        self.stopping();
3425    }
3426
3427    fn stop(&self, turbo_tasks: &TurboTasks<Self>) {
3428        self.stop(turbo_tasks);
3429    }
3430
3431    fn idle_start(&self, turbo_tasks: &TurboTasks<Self>) {
3432        self.idle_start(turbo_tasks);
3433    }
3434
3435    fn idle_end(&self, _turbo_tasks: &TurboTasks<Self>) {
3436        self.idle_end();
3437    }
3438
3439    fn get_or_create_task(
3440        &self,
3441        native_fn: &'static NativeFunction,
3442        this: Option<RawVc>,
3443        arg: &mut dyn DynTaskInputsStorage,
3444        parent_task: Option<TaskId>,
3445        persistence: TaskPersistence,
3446        turbo_tasks: &TurboTasks<Self>,
3447    ) -> TaskId {
3448        self.get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3449    }
3450
3451    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3452        self.invalidate_task(task_id, turbo_tasks);
3453    }
3454
3455    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<Self>) {
3456        self.invalidate_tasks(tasks, turbo_tasks);
3457    }
3458
3459    fn invalidate_tasks_set(
3460        &self,
3461        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3462        turbo_tasks: &TurboTasks<Self>,
3463    ) {
3464        self.invalidate_tasks_set(tasks, turbo_tasks);
3465    }
3466
3467    fn invalidate_serialization(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3468        self.invalidate_serialization(task_id, turbo_tasks);
3469    }
3470
3471    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) {
3472        self.task_execution_canceled(task, turbo_tasks)
3473    }
3474
3475    fn try_start_task_execution(
3476        &self,
3477        task_id: TaskId,
3478        priority: TaskPriority,
3479        turbo_tasks: &TurboTasks<Self>,
3480    ) -> Option<TaskExecutionSpec<'_>> {
3481        self.try_start_task_execution(task_id, priority, turbo_tasks)
3482    }
3483
3484    fn task_execution_completed(
3485        &self,
3486        task_id: TaskId,
3487        result: Result<RawVc, TurboTasksExecutionError>,
3488        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3489        #[cfg(feature = "verify_determinism")] stateful: bool,
3490        has_invalidator: bool,
3491        turbo_tasks: &TurboTasks<Self>,
3492    ) -> Option<TaskPriority> {
3493        self.task_execution_completed(
3494            task_id,
3495            result,
3496            cell_counters,
3497            #[cfg(feature = "verify_determinism")]
3498            stateful,
3499            has_invalidator,
3500            turbo_tasks,
3501        )
3502    }
3503
3504    type BackendJob = TurboTasksBackendJob;
3505
3506    fn run_backend_job<'a>(
3507        &'a self,
3508        job: Self::BackendJob,
3509        turbo_tasks: &'a TurboTasks<Self>,
3510    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3511        self.run_backend_job(job, turbo_tasks)
3512    }
3513
3514    fn try_read_task_output(
3515        &self,
3516        task_id: TaskId,
3517        reader: Option<TaskId>,
3518        options: ReadOutputOptions,
3519        turbo_tasks: &TurboTasks<Self>,
3520    ) -> Result<Result<RawVc, EventListener>> {
3521        self.try_read_task_output(task_id, reader, options, turbo_tasks)
3522    }
3523
3524    fn try_read_task_cell(
3525        &self,
3526        task_id: TaskId,
3527        cell: CellId,
3528        reader: Option<TaskId>,
3529        options: ReadCellOptions,
3530        turbo_tasks: &TurboTasks<Self>,
3531    ) -> Result<Result<TypedCellContent, EventListener>> {
3532        self.try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3533    }
3534
3535    fn try_read_own_task_cell(
3536        &self,
3537        task_id: TaskId,
3538        cell: CellId,
3539        turbo_tasks: &TurboTasks<Self>,
3540    ) -> Result<TypedCellContent> {
3541        self.try_read_own_task_cell(task_id, cell, turbo_tasks)
3542    }
3543
3544    fn read_task_collectibles(
3545        &self,
3546        task_id: TaskId,
3547        collectible_type: TraitTypeId,
3548        reader: Option<TaskId>,
3549        turbo_tasks: &TurboTasks<Self>,
3550    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3551        self.read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3552    }
3553
3554    fn emit_collectible(
3555        &self,
3556        collectible_type: TraitTypeId,
3557        collectible: RawVc,
3558        task_id: TaskId,
3559        turbo_tasks: &TurboTasks<Self>,
3560    ) {
3561        self.emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3562    }
3563
3564    fn unemit_collectible(
3565        &self,
3566        collectible_type: TraitTypeId,
3567        collectible: RawVc,
3568        count: u32,
3569        task_id: TaskId,
3570        turbo_tasks: &TurboTasks<Self>,
3571    ) {
3572        self.unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3573    }
3574
3575    fn update_task_cell(
3576        &self,
3577        task_id: TaskId,
3578        cell: CellId,
3579        content: CellContent,
3580        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3581        content_hash: Option<CellHash>,
3582        verification_mode: VerificationMode,
3583        turbo_tasks: &TurboTasks<Self>,
3584    ) {
3585        self.update_task_cell(
3586            task_id,
3587            cell,
3588            content,
3589            updated_key_hashes,
3590            content_hash,
3591            verification_mode,
3592            turbo_tasks,
3593        );
3594    }
3595
3596    fn mark_own_task_as_finished(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3597        self.mark_own_task_as_finished(task_id, turbo_tasks);
3598    }
3599
3600    fn connect_task(
3601        &self,
3602        task: TaskId,
3603        parent_task: Option<TaskId>,
3604        turbo_tasks: &TurboTasks<Self>,
3605    ) {
3606        self.connect_task(task, parent_task, turbo_tasks);
3607    }
3608
3609    fn create_transient_task(
3610        &self,
3611        task_type: TransientTaskType,
3612        _turbo_tasks: &TurboTasks<Self>,
3613    ) -> TaskId {
3614        self.create_transient_task(task_type)
3615    }
3616
3617    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3618        self.dispose_root_task(task_id, turbo_tasks);
3619    }
3620
3621    fn task_statistics(&self) -> &TaskStatisticsApi {
3622        &self.task_statistics
3623    }
3624
3625    fn is_tracking_dependencies(&self) -> bool {
3626        self.options.dependency_tracking
3627    }
3628
3629    fn get_task_name(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) -> String {
3630        self.get_task_name(task, turbo_tasks)
3631    }
3632}
3633
3634enum DebugTraceTransientTask {
3635    Cached {
3636        task_name: &'static str,
3637        cell_type_id: Option<ValueTypeId>,
3638        cause_self: Option<Box<DebugTraceTransientTask>>,
3639        cause_args: Vec<DebugTraceTransientTask>,
3640    },
3641    /// This representation is used when this task is a duplicate of one previously shown
3642    Collapsed {
3643        task_name: &'static str,
3644        cell_type_id: Option<ValueTypeId>,
3645    },
3646    Uncached {
3647        cell_type_id: Option<ValueTypeId>,
3648    },
3649}
3650
3651impl DebugTraceTransientTask {
3652    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3653        let indent = "    ".repeat(level);
3654        f.write_str(&indent)?;
3655
3656        fn fmt_cell_type_id(
3657            f: &mut fmt::Formatter<'_>,
3658            cell_type_id: Option<ValueTypeId>,
3659        ) -> fmt::Result {
3660            if let Some(ty) = cell_type_id {
3661                write!(
3662                    f,
3663                    " (read cell of type {})",
3664                    get_value_type(ty).ty.global_name
3665                )
3666            } else {
3667                Ok(())
3668            }
3669        }
3670
3671        // write the name and type
3672        match self {
3673            Self::Cached {
3674                task_name,
3675                cell_type_id,
3676                ..
3677            }
3678            | Self::Collapsed {
3679                task_name,
3680                cell_type_id,
3681                ..
3682            } => {
3683                f.write_str(task_name)?;
3684                fmt_cell_type_id(f, *cell_type_id)?;
3685                if matches!(self, Self::Collapsed { .. }) {
3686                    f.write_str(" (collapsed)")?;
3687                }
3688            }
3689            Self::Uncached { cell_type_id } => {
3690                f.write_str("unknown transient task")?;
3691                fmt_cell_type_id(f, *cell_type_id)?;
3692            }
3693        }
3694        f.write_char('\n')?;
3695
3696        // write any extra "cause" information we might have
3697        if let Self::Cached {
3698            cause_self,
3699            cause_args,
3700            ..
3701        } = self
3702        {
3703            if let Some(c) = cause_self {
3704                writeln!(f, "{indent}  self:")?;
3705                c.fmt_indented(f, level + 1)?;
3706            }
3707            if !cause_args.is_empty() {
3708                writeln!(f, "{indent}  args:")?;
3709                for c in cause_args {
3710                    c.fmt_indented(f, level + 1)?;
3711                }
3712            }
3713        }
3714        Ok(())
3715    }
3716}
3717
3718impl fmt::Display for DebugTraceTransientTask {
3719    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3720        self.fmt_indented(f, 0)
3721    }
3722}
3723
3724// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3725fn far_future() -> Instant {
3726    // Roughly 30 years from now.
3727    // API does not provide a way to obtain max `Instant`
3728    // or convert specific date in the future to instant.
3729    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3730    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3731}
3732
3733/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3734/// buffer.
3735/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3736/// resulting buffer sizes.
3737///
3738/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3739/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3740/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3741/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3742/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3743/// spurious `Result` threading throughout the encode path.
3744fn encode_task_data(
3745    task: TaskId,
3746    data: &TaskStorage,
3747    category: SpecificTaskDataCategory,
3748    scratch_buffer: &mut TurboBincodeBuffer,
3749) -> Result<TurboBincodeBuffer> {
3750    scratch_buffer.clear();
3751    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3752    data.encode(category, &mut encoder)?;
3753
3754    if cfg!(feature = "verify_serialization") {
3755        TaskStorage::new()
3756            .decode(
3757                category,
3758                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3759            )
3760            .with_context(|| {
3761                format!(
3762                    "expected to be able to decode serialized data for '{category:?}' information \
3763                     for {task}"
3764                )
3765            })?;
3766    }
3767    Ok(SmallVec::from_slice(scratch_buffer))
3768}