Skip to main content

turbo_tasks_backend/backend/
mod.rs

1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7    borrow::Cow,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    pin::Pin,
13    sync::{
14        Arc, LazyLock,
15        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16    },
17    time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32    TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33    backend::{
34        Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
35        TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36        TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37        VerificationMode,
38    },
39    event::{Event, EventDescription, EventListener},
40    message_queue::{TimingEvent, TraceEvent},
41    registry::get_value_type,
42    scope::scope_and_block,
43    task_statistics::TaskStatisticsApi,
44    trace::TraceRawVcs,
45    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49    operation::AnyOperation,
50    storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55    backend::{
56        operation::{
57            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
59            LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
60            connect_children, get_aggregation_number, get_uppers, is_root_node,
61            make_task_dirty_internal, prepare_new_children,
62        },
63        storage::Storage,
64        storage_schema::{TaskStorage, TaskStorageAccessors},
65    },
66    backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
67    data::{
68        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69        InProgressState, InProgressStateInner, OutputValue, TransientTask,
70    },
71    error::TaskError,
72    utils::{
73        arc_or_owned::ArcOrOwned,
74        dash_map_drop_contents::drop_contents,
75        dash_map_raw_entry::{RawEntry, raw_entry},
76        ptr_eq_arc::PtrEqArc,
77        shard_amount::compute_shard_amount,
78    },
79};
80
81/// Threshold for parallelizing making dependent tasks dirty.
82/// If the number of dependent tasks exceeds this threshold,
83/// the operation will be parallelized.
84const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
85
86const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
87
88/// Configurable idle timeout for snapshot persistence.
89/// Defaults to 2 seconds if not set or if the value is invalid.
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92        .ok()
93        .and_then(|v| v.parse::<u64>().ok())
94        .map(Duration::from_millis)
95        .unwrap_or(Duration::from_secs(2))
96});
97
98struct SnapshotRequest {
99    snapshot_requested: bool,
100    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
101}
102
103impl SnapshotRequest {
104    fn new() -> Self {
105        Self {
106            snapshot_requested: false,
107            suspended_operations: FxHashSet::default(),
108        }
109    }
110}
111
112pub enum StorageMode {
113    /// Queries the storage for cache entries that don't exist locally.
114    ReadOnly,
115    /// Queries the storage for cache entries that don't exist locally.
116    /// Regularly pushes changes to the backing storage.
117    ReadWrite,
118    /// Queries the storage for cache entries that don't exist locally.
119    /// On shutdown, pushes all changes to the backing storage.
120    ReadWriteOnShutdown,
121}
122
123pub struct BackendOptions {
124    /// Enables dependency tracking.
125    ///
126    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
127    /// forever.
128    pub dependency_tracking: bool,
129
130    /// Enables active tracking.
131    ///
132    /// Automatically disabled when `dependency_tracking` is disabled.
133    ///
134    /// When disabled: All tasks are considered as active.
135    pub active_tracking: bool,
136
137    /// Enables the backing storage.
138    pub storage_mode: Option<StorageMode>,
139
140    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
141    /// datastructures. If `None`, it will use the available parallelism.
142    pub num_workers: Option<usize>,
143
144    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
145    pub small_preallocation: bool,
146}
147
148impl Default for BackendOptions {
149    fn default() -> Self {
150        Self {
151            dependency_tracking: true,
152            active_tracking: true,
153            storage_mode: Some(StorageMode::ReadWrite),
154            num_workers: None,
155            small_preallocation: false,
156        }
157    }
158}
159
160pub enum TurboTasksBackendJob {
161    InitialSnapshot,
162    FollowUpSnapshot,
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167struct TurboTasksBackendInner<B: BackingStorage> {
168    options: BackendOptions,
169
170    start_time: Instant,
171
172    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
173    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
174
175    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
176
177    storage: Storage,
178
179    /// Number of executing operations + Highest bit is set when snapshot is
180    /// requested. When that bit is set, operations should pause until the
181    /// snapshot is completed. When the bit is set and in progress counter
182    /// reaches zero, `operations_completed_when_snapshot_requested` is
183    /// triggered.
184    in_progress_operations: AtomicUsize,
185
186    snapshot_request: Mutex<SnapshotRequest>,
187    /// Condition Variable that is triggered when `in_progress_operations`
188    /// reaches zero while snapshot is requested. All operations are either
189    /// completed or suspended.
190    operations_suspended: Condvar,
191    /// Condition Variable that is triggered when a snapshot is completed and
192    /// operations can continue.
193    snapshot_completed: Condvar,
194    /// The timestamp of the last started snapshot since [`Self::start_time`].
195    last_snapshot: AtomicU64,
196
197    stopping: AtomicBool,
198    stopping_event: Event,
199    idle_start_event: Event,
200    idle_end_event: Event,
201    #[cfg(feature = "verify_aggregation_graph")]
202    is_idle: AtomicBool,
203
204    task_statistics: TaskStatisticsApi,
205
206    backing_storage: B,
207
208    #[cfg(feature = "verify_aggregation_graph")]
209    root_tasks: Mutex<FxHashSet<TaskId>>,
210}
211
212impl<B: BackingStorage> TurboTasksBackend<B> {
213    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
214        Self(Arc::new(TurboTasksBackendInner::new(
215            options,
216            backing_storage,
217        )))
218    }
219
220    pub fn backing_storage(&self) -> &B {
221        &self.0.backing_storage
222    }
223}
224
225impl<B: BackingStorage> TurboTasksBackendInner<B> {
226    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
227        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
228        if !options.dependency_tracking {
229            options.active_tracking = false;
230        }
231        let small_preallocation = options.small_preallocation;
232        let next_task_id = backing_storage
233            .next_free_task_id()
234            .expect("Failed to get task id");
235        Self {
236            options,
237            start_time: Instant::now(),
238            persisted_task_id_factory: IdFactoryWithReuse::new(
239                next_task_id,
240                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
241            ),
242            transient_task_id_factory: IdFactoryWithReuse::new(
243                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
244                TaskId::MAX,
245            ),
246            task_cache: FxDashMap::default(),
247            storage: Storage::new(shard_amount, small_preallocation),
248            in_progress_operations: AtomicUsize::new(0),
249            snapshot_request: Mutex::new(SnapshotRequest::new()),
250            operations_suspended: Condvar::new(),
251            snapshot_completed: Condvar::new(),
252            last_snapshot: AtomicU64::new(0),
253            stopping: AtomicBool::new(false),
254            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
255            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
256            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
257            #[cfg(feature = "verify_aggregation_graph")]
258            is_idle: AtomicBool::new(false),
259            task_statistics: TaskStatisticsApi::default(),
260            backing_storage,
261            #[cfg(feature = "verify_aggregation_graph")]
262            root_tasks: Default::default(),
263        }
264    }
265
266    fn execute_context<'a>(
267        &'a self,
268        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
269    ) -> impl ExecuteContext<'a> {
270        ExecuteContextImpl::new(self, turbo_tasks)
271    }
272
273    fn suspending_requested(&self) -> bool {
274        self.should_persist()
275            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
276    }
277
278    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
279        #[cold]
280        fn operation_suspend_point_cold<B: BackingStorage>(
281            this: &TurboTasksBackendInner<B>,
282            suspend: impl FnOnce() -> AnyOperation,
283        ) {
284            let operation = Arc::new(suspend());
285            let mut snapshot_request = this.snapshot_request.lock();
286            if snapshot_request.snapshot_requested {
287                snapshot_request
288                    .suspended_operations
289                    .insert(operation.clone().into());
290                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
291                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
292                if value == SNAPSHOT_REQUESTED_BIT {
293                    this.operations_suspended.notify_all();
294                }
295                this.snapshot_completed
296                    .wait_while(&mut snapshot_request, |snapshot_request| {
297                        snapshot_request.snapshot_requested
298                    });
299                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
300                snapshot_request
301                    .suspended_operations
302                    .remove(&operation.into());
303            }
304        }
305
306        if self.suspending_requested() {
307            operation_suspend_point_cold(self, suspend);
308        }
309    }
310
311    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
312        if !self.should_persist() {
313            return OperationGuard { backend: None };
314        }
315        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
317            let mut snapshot_request = self.snapshot_request.lock();
318            if snapshot_request.snapshot_requested {
319                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
320                if value == SNAPSHOT_REQUESTED_BIT {
321                    self.operations_suspended.notify_all();
322                }
323                self.snapshot_completed
324                    .wait_while(&mut snapshot_request, |snapshot_request| {
325                        snapshot_request.snapshot_requested
326                    });
327                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
328            }
329        }
330        OperationGuard {
331            backend: Some(self),
332        }
333    }
334
335    fn should_persist(&self) -> bool {
336        matches!(
337            self.options.storage_mode,
338            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
339        )
340    }
341
342    fn should_restore(&self) -> bool {
343        self.options.storage_mode.is_some()
344    }
345
346    fn should_track_dependencies(&self) -> bool {
347        self.options.dependency_tracking
348    }
349
350    fn should_track_activeness(&self) -> bool {
351        self.options.active_tracking
352    }
353
354    fn track_cache_hit(&self, task_type: &CachedTaskType) {
355        self.task_statistics
356            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
357    }
358
359    fn track_cache_miss(&self, task_type: &CachedTaskType) {
360        self.task_statistics
361            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
362    }
363
364    /// Reconstructs a full [`TurboTasksExecutionError`] from the compact [`TaskError`] storage
365    /// representation. For [`TaskError::TaskChain`], this looks up the source error from the last
366    /// task's output and rebuilds the nested `TaskContext` wrappers with `TurboTasksCallApi`
367    /// references for lazy name resolution.
368    fn task_error_to_turbo_tasks_execution_error(
369        &self,
370        error: &TaskError,
371        ctx: &mut impl ExecuteContext<'_>,
372    ) -> TurboTasksExecutionError {
373        match error {
374            TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
375            TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
376                message: item.message.clone(),
377                source: item
378                    .source
379                    .as_ref()
380                    .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
381            })),
382            TaskError::LocalTaskContext(local_task_context) => {
383                TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
384                    name: local_task_context.name.clone(),
385                    source: local_task_context
386                        .source
387                        .as_ref()
388                        .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
389                }))
390            }
391            TaskError::TaskChain(chain) => {
392                let task_id = chain.last().unwrap();
393                let error = {
394                    let task = ctx.task(*task_id, TaskDataCategory::Meta);
395                    if let Some(OutputValue::Error(error)) = task.get_output() {
396                        Some(error.clone())
397                    } else {
398                        None
399                    }
400                };
401                let error = error.map_or_else(
402                    || {
403                        // Eventual consistency will cause errors to no longer be available
404                        TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
405                            message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
406                                "Error no longer available",
407                            )),
408                            location: None,
409                        }))
410                    },
411                    |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
412                );
413                let mut current_error = error;
414                for &task_id in chain.iter().rev() {
415                    current_error =
416                        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
417                            task_id,
418                            source: Some(current_error),
419                            turbo_tasks: ctx.turbo_tasks(),
420                        }));
421                }
422                current_error
423            }
424        }
425    }
426}
427
428pub(crate) struct OperationGuard<'a, B: BackingStorage> {
429    backend: Option<&'a TurboTasksBackendInner<B>>,
430}
431
432impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
433    fn drop(&mut self) {
434        if let Some(backend) = self.backend {
435            let fetch_sub = backend
436                .in_progress_operations
437                .fetch_sub(1, Ordering::AcqRel);
438            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
439                backend.operations_suspended.notify_all();
440            }
441        }
442    }
443}
444
445/// Intermediate result of step 1 of task execution completion.
446struct TaskExecutionCompletePrepareResult {
447    pub new_children: FxHashSet<TaskId>,
448    pub is_now_immutable: bool,
449    #[cfg(feature = "verify_determinism")]
450    pub no_output_set: bool,
451    pub new_output: Option<OutputValue>,
452    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
453}
454
455// Operations
456impl<B: BackingStorage> TurboTasksBackendInner<B> {
457    fn connect_child(
458        &self,
459        parent_task: Option<TaskId>,
460        child_task: TaskId,
461        task_type: Option<ArcOrOwned<CachedTaskType>>,
462        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
463    ) {
464        operation::ConnectChildOperation::run(
465            parent_task,
466            child_task,
467            task_type,
468            self.execute_context(turbo_tasks),
469        );
470    }
471
472    fn try_read_task_output(
473        self: &Arc<Self>,
474        task_id: TaskId,
475        reader: Option<TaskId>,
476        options: ReadOutputOptions,
477        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
478    ) -> Result<Result<RawVc, EventListener>> {
479        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
480
481        let mut ctx = self.execute_context(turbo_tasks);
482        let need_reader_task = if self.should_track_dependencies()
483            && !matches!(options.tracking, ReadTracking::Untracked)
484            && reader.is_some_and(|reader_id| reader_id != task_id)
485            && let Some(reader_id) = reader
486            && reader_id != task_id
487        {
488            Some(reader_id)
489        } else {
490            None
491        };
492        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
493            // Having a task_pair here is not optimal, but otherwise this would lead to a race
494            // condition. See below.
495            // TODO(sokra): solve that in a more performant way.
496            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
497            (task, Some(reader))
498        } else {
499            (ctx.task(task_id, TaskDataCategory::All), None)
500        };
501
502        fn listen_to_done_event(
503            reader_description: Option<EventDescription>,
504            tracking: ReadTracking,
505            done_event: &Event,
506        ) -> EventListener {
507            done_event.listen_with_note(move || {
508                move || {
509                    if let Some(reader_description) = reader_description.as_ref() {
510                        format!(
511                            "try_read_task_output from {} ({})",
512                            reader_description, tracking
513                        )
514                    } else {
515                        format!("try_read_task_output ({})", tracking)
516                    }
517                }
518            })
519        }
520
521        fn check_in_progress(
522            task: &impl TaskGuard,
523            reader_description: Option<EventDescription>,
524            tracking: ReadTracking,
525        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
526        {
527            match task.get_in_progress() {
528                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
529                    listen_to_done_event(reader_description, tracking, done_event),
530                ))),
531                Some(InProgressState::InProgress(box InProgressStateInner {
532                    done_event, ..
533                })) => Some(Ok(Err(listen_to_done_event(
534                    reader_description,
535                    tracking,
536                    done_event,
537                )))),
538                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
539                    "{} was canceled",
540                    task.get_task_description()
541                ))),
542                None => None,
543            }
544        }
545
546        if matches!(options.consistency, ReadConsistency::Strong) {
547            // Ensure it's an root node
548            loop {
549                let aggregation_number = get_aggregation_number(&task);
550                if is_root_node(aggregation_number) {
551                    break;
552                }
553                drop(task);
554                drop(reader_task);
555                {
556                    let _span = tracing::trace_span!(
557                        "make root node for strongly consistent read",
558                        %task_id
559                    )
560                    .entered();
561                    AggregationUpdateQueue::run(
562                        AggregationUpdateJob::UpdateAggregationNumber {
563                            task_id,
564                            base_aggregation_number: u32::MAX,
565                            distance: None,
566                        },
567                        &mut ctx,
568                    );
569                }
570                (task, reader_task) = if let Some(reader_id) = need_reader_task {
571                    // TODO(sokra): see comment above
572                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
573                    (task, Some(reader))
574                } else {
575                    (ctx.task(task_id, TaskDataCategory::All), None)
576                }
577            }
578
579            let is_dirty = task.is_dirty();
580
581            // Check the dirty count of the root node
582            let has_dirty_containers = task.has_dirty_containers();
583            if has_dirty_containers || is_dirty.is_some() {
584                let activeness = task.get_activeness_mut();
585                let mut task_ids_to_schedule: Vec<_> = Vec::new();
586                // When there are dirty task, subscribe to the all_clean_event
587                let activeness = if let Some(activeness) = activeness {
588                    // This makes sure all tasks stay active and this task won't stale.
589                    // active_until_clean is automatically removed when this
590                    // task is clean.
591                    activeness.set_active_until_clean();
592                    activeness
593                } else {
594                    // If we don't have a root state, add one. This also makes sure all tasks stay
595                    // active and this task won't stale. active_until_clean
596                    // is automatically removed when this task is clean.
597                    if ctx.should_track_activeness() {
598                        // A newly added Activeness need to make sure to schedule the tasks
599                        task_ids_to_schedule = task.dirty_containers().collect();
600                        task_ids_to_schedule.push(task_id);
601                    }
602                    let activeness =
603                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
604                    activeness.set_active_until_clean();
605                    activeness
606                };
607                let listener = activeness.all_clean_event.listen_with_note(move || {
608                    let this = self.clone();
609                    let tt = turbo_tasks.pin();
610                    move || {
611                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
612                        let mut ctx = this.execute_context(tt);
613                        let mut visited = FxHashSet::default();
614                        fn indent(s: &str) -> String {
615                            s.split_inclusive('\n')
616                                .flat_map(|line: &str| ["  ", line].into_iter())
617                                .collect::<String>()
618                        }
619                        fn get_info(
620                            ctx: &mut impl ExecuteContext<'_>,
621                            task_id: TaskId,
622                            parent_and_count: Option<(TaskId, i32)>,
623                            visited: &mut FxHashSet<TaskId>,
624                        ) -> String {
625                            let task = ctx.task(task_id, TaskDataCategory::All);
626                            let is_dirty = task.is_dirty();
627                            let in_progress =
628                                task.get_in_progress()
629                                    .map_or("not in progress", |p| match p {
630                                        InProgressState::InProgress(_) => "in progress",
631                                        InProgressState::Scheduled { .. } => "scheduled",
632                                        InProgressState::Canceled => "canceled",
633                                    });
634                            let activeness = task.get_activeness().map_or_else(
635                                || "not active".to_string(),
636                                |activeness| format!("{activeness:?}"),
637                            );
638                            let aggregation_number = get_aggregation_number(&task);
639                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
640                            {
641                                let uppers = get_uppers(&task);
642                                !uppers.contains(&parent_task_id)
643                            } else {
644                                false
645                            };
646
647                            // Check the dirty count of the root node
648                            let has_dirty_containers = task.has_dirty_containers();
649
650                            let task_description = task.get_task_description();
651                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
652                                format!(", dirty({parent_priority})")
653                            } else {
654                                String::new()
655                            };
656                            let has_dirty_containers_label = if has_dirty_containers {
657                                ", dirty containers"
658                            } else {
659                                ""
660                            };
661                            let count = if let Some((_, count)) = parent_and_count {
662                                format!(" {count}")
663                            } else {
664                                String::new()
665                            };
666                            let mut info = format!(
667                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
668                                 {in_progress}, \
669                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
670                            );
671                            let children: Vec<_> = task.dirty_containers_with_count().collect();
672                            drop(task);
673
674                            if missing_upper {
675                                info.push_str("\n  ERROR: missing upper connection");
676                            }
677
678                            if has_dirty_containers || !children.is_empty() {
679                                writeln!(info, "\n  dirty tasks:").unwrap();
680
681                                for (child_task_id, count) in children {
682                                    let task_description = ctx
683                                        .task(child_task_id, TaskDataCategory::Data)
684                                        .get_task_description();
685                                    if visited.insert(child_task_id) {
686                                        let child_info = get_info(
687                                            ctx,
688                                            child_task_id,
689                                            Some((task_id, count)),
690                                            visited,
691                                        );
692                                        info.push_str(&indent(&child_info));
693                                        if !info.ends_with('\n') {
694                                            info.push('\n');
695                                        }
696                                    } else {
697                                        writeln!(
698                                            info,
699                                            "  {child_task_id} {task_description} {count} \
700                                             (already visited)"
701                                        )
702                                        .unwrap();
703                                    }
704                                }
705                            }
706                            info
707                        }
708                        let info = get_info(&mut ctx, task_id, None, &mut visited);
709                        format!(
710                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
711                        )
712                    }
713                });
714                drop(reader_task);
715                drop(task);
716                if !task_ids_to_schedule.is_empty() {
717                    let mut queue = AggregationUpdateQueue::new();
718                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
719                    queue.execute(&mut ctx);
720                }
721
722                return Ok(Err(listener));
723            }
724        }
725
726        let reader_description = reader_task
727            .as_ref()
728            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
729        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
730        {
731            return value;
732        }
733
734        if let Some(output) = task.get_output() {
735            let result = match output {
736                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
737                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
738                OutputValue::Error(error) => Err(error.clone()),
739            };
740            if let Some(mut reader_task) = reader_task.take()
741                && options.tracking.should_track(result.is_err())
742                && (!task.immutable() || cfg!(feature = "verify_immutable"))
743            {
744                #[cfg(feature = "trace_task_output_dependencies")]
745                let _span = tracing::trace_span!(
746                    "add output dependency",
747                    task = %task_id,
748                    dependent_task = ?reader
749                )
750                .entered();
751                let mut queue = LeafDistanceUpdateQueue::new();
752                let reader = reader.unwrap();
753                if task.add_output_dependent(reader) {
754                    // Ensure that dependent leaf distance is strictly monotonic increasing
755                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
756                    let reader_leaf_distance =
757                        reader_task.get_leaf_distance().copied().unwrap_or_default();
758                    if reader_leaf_distance.distance <= leaf_distance.distance {
759                        queue.push(
760                            reader,
761                            leaf_distance.distance,
762                            leaf_distance.max_distance_in_buffer,
763                        );
764                    }
765                }
766
767                drop(task);
768
769                // Note: We use `task_pair` earlier to lock the task and its reader at the same
770                // time. If we didn't and just locked the reader here, an invalidation could occur
771                // between grabbing the locks. If that happened, and if the task is "outdated" or
772                // doesn't have the dependency edge yet, the invalidation would be lost.
773
774                if !reader_task.remove_outdated_output_dependencies(&task_id) {
775                    let _ = reader_task.add_output_dependencies(task_id);
776                }
777                drop(reader_task);
778
779                queue.execute(&mut ctx);
780            } else {
781                drop(task);
782            }
783
784            return result.map_err(|error| {
785                self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
786                    .with_task_context(task_id, turbo_tasks.pin())
787                    .into()
788            });
789        }
790        drop(reader_task);
791
792        let note = EventDescription::new(|| {
793            move || {
794                if let Some(reader) = reader_description.as_ref() {
795                    format!("try_read_task_output (recompute) from {reader}",)
796                } else {
797                    "try_read_task_output (recompute, untracked)".to_string()
798                }
799            }
800        });
801
802        // Output doesn't exist. We need to schedule the task to compute it.
803        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
804            TaskExecutionReason::OutputNotAvailable,
805            EventDescription::new(|| task.get_task_desc_fn()),
806            note,
807        );
808
809        // It's not possible that the task is InProgress at this point. If it is InProgress {
810        // done: true } it must have Output and would early return.
811        let old = task.set_in_progress(in_progress_state);
812        debug_assert!(old.is_none(), "InProgress already exists");
813        ctx.schedule_task(task, TaskPriority::Initial);
814
815        Ok(Err(listener))
816    }
817
818    fn try_read_task_cell(
819        &self,
820        task_id: TaskId,
821        reader: Option<TaskId>,
822        cell: CellId,
823        options: ReadCellOptions,
824        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
825    ) -> Result<Result<TypedCellContent, EventListener>> {
826        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
827
828        fn add_cell_dependency(
829            task_id: TaskId,
830            mut task: impl TaskGuard,
831            reader: Option<TaskId>,
832            reader_task: Option<impl TaskGuard>,
833            cell: CellId,
834            key: Option<u64>,
835        ) {
836            if let Some(mut reader_task) = reader_task
837                && (!task.immutable() || cfg!(feature = "verify_immutable"))
838            {
839                let reader = reader.unwrap();
840                let _ = task.add_cell_dependents((cell, key, reader));
841                drop(task);
842
843                // Note: We use `task_pair` earlier to lock the task and its reader at the same
844                // time. If we didn't and just locked the reader here, an invalidation could occur
845                // between grabbing the locks. If that happened, and if the task is "outdated" or
846                // doesn't have the dependency edge yet, the invalidation would be lost.
847
848                let target = CellRef {
849                    task: task_id,
850                    cell,
851                };
852                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
853                    let _ = reader_task.add_cell_dependencies((target, key));
854                }
855                drop(reader_task);
856            }
857        }
858
859        let ReadCellOptions {
860            is_serializable_cell_content,
861            tracking,
862            final_read_hint,
863        } = options;
864
865        let mut ctx = self.execute_context(turbo_tasks);
866        let (mut task, reader_task) = if self.should_track_dependencies()
867            && !matches!(tracking, ReadCellTracking::Untracked)
868            && let Some(reader_id) = reader
869            && reader_id != task_id
870        {
871            // Having a task_pair here is not optimal, but otherwise this would lead to a race
872            // condition. See below.
873            // TODO(sokra): solve that in a more performant way.
874            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
875            (task, Some(reader))
876        } else {
877            (ctx.task(task_id, TaskDataCategory::All), None)
878        };
879
880        let content = if final_read_hint {
881            task.remove_cell_data(is_serializable_cell_content, cell)
882        } else {
883            task.get_cell_data(is_serializable_cell_content, cell)
884        };
885        if let Some(content) = content {
886            if tracking.should_track(false) {
887                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
888            }
889            return Ok(Ok(TypedCellContent(
890                cell.type_id,
891                CellContent(Some(content.reference)),
892            )));
893        }
894
895        let in_progress = task.get_in_progress();
896        if matches!(
897            in_progress,
898            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
899        ) {
900            return Ok(Err(self
901                .listen_to_cell(&mut task, task_id, &reader_task, cell)
902                .0));
903        }
904        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
905
906        // Check cell index range (cell might not exist at all)
907        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
908        let Some(max_id) = max_id else {
909            let task_desc = task.get_task_description();
910            if tracking.should_track(true) {
911                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
912            }
913            bail!(
914                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
915            );
916        };
917        if cell.index >= max_id {
918            let task_desc = task.get_task_description();
919            if tracking.should_track(true) {
920                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
921            }
922            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
923        }
924
925        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
926        // task to get the cell content.
927
928        // Bail early if the task was cancelled — no point in registering a listener
929        // on a task that won't execute again.
930        if is_cancelled {
931            bail!("{} was canceled", task.get_task_description());
932        }
933
934        // Listen to the cell and potentially schedule the task
935        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
936        drop(reader_task);
937        if !new_listener {
938            return Ok(Err(listener));
939        }
940
941        let _span = tracing::trace_span!(
942            "recomputation",
943            cell_type = get_value_type(cell.type_id).ty.global_name,
944            cell_index = cell.index
945        )
946        .entered();
947
948        let _ = task.add_scheduled(
949            TaskExecutionReason::CellNotAvailable,
950            EventDescription::new(|| task.get_task_desc_fn()),
951        );
952        ctx.schedule_task(task, TaskPriority::Initial);
953
954        Ok(Err(listener))
955    }
956
957    fn listen_to_cell(
958        &self,
959        task: &mut impl TaskGuard,
960        task_id: TaskId,
961        reader_task: &Option<impl TaskGuard>,
962        cell: CellId,
963    ) -> (EventListener, bool) {
964        let note = || {
965            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
966            move || {
967                if let Some(reader_desc) = reader_desc.as_ref() {
968                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
969                } else {
970                    "try_read_task_cell (in progress, untracked)".to_string()
971                }
972            }
973        };
974        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
975            // Someone else is already computing the cell
976            let listener = in_progress.event.listen_with_note(note);
977            return (listener, false);
978        }
979        let in_progress = InProgressCellState::new(task_id, cell);
980        let listener = in_progress.event.listen_with_note(note);
981        let old = task.insert_in_progress_cells(cell, in_progress);
982        debug_assert!(old.is_none(), "InProgressCell already exists");
983        (listener, true)
984    }
985
986    fn snapshot_and_persist(
987        &self,
988        parent_span: Option<tracing::Id>,
989        reason: &str,
990        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
991    ) -> Option<(Instant, bool)> {
992        let snapshot_span =
993            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
994                .entered();
995        let start = Instant::now();
996        // SystemTime for wall-clock timestamps in trace events (milliseconds
997        // since epoch). Instant is monotonic but has no defined epoch, so it
998        // can't be used for cross-process trace correlation.
999        let wall_start = SystemTime::now();
1000        debug_assert!(self.should_persist());
1001
1002        let suspended_operations;
1003        {
1004            let _span = tracing::info_span!("blocking").entered();
1005            let mut snapshot_request = self.snapshot_request.lock();
1006            snapshot_request.snapshot_requested = true;
1007            let active_operations = self
1008                .in_progress_operations
1009                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1010            if active_operations != 0 {
1011                self.operations_suspended
1012                    .wait_while(&mut snapshot_request, |_| {
1013                        self.in_progress_operations.load(Ordering::Relaxed)
1014                            != SNAPSHOT_REQUESTED_BIT
1015                    });
1016            }
1017            suspended_operations = snapshot_request
1018                .suspended_operations
1019                .iter()
1020                .map(|op| op.arc().clone())
1021                .collect::<Vec<_>>();
1022        }
1023        // Enter snapshot mode, which atomically reads and resets the modified count.
1024        // Checking after start_snapshot ensures no concurrent increments can race.
1025        let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
1026        let mut snapshot_request = self.snapshot_request.lock();
1027        snapshot_request.snapshot_requested = false;
1028        self.in_progress_operations
1029            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1030        self.snapshot_completed.notify_all();
1031        let snapshot_time = Instant::now();
1032        drop(snapshot_request);
1033
1034        if !has_modifications {
1035            // No tasks modified since the last snapshot — drop the guard (which
1036            // calls end_snapshot) and skip the expensive O(N) scan.
1037            drop(snapshot_guard);
1038            return Some((start, false));
1039        }
1040
1041        #[cfg(feature = "print_cache_item_size")]
1042        #[derive(Default)]
1043        struct TaskCacheStats {
1044            data: usize,
1045            #[cfg(feature = "print_cache_item_size_with_compressed")]
1046            data_compressed: usize,
1047            data_count: usize,
1048            meta: usize,
1049            #[cfg(feature = "print_cache_item_size_with_compressed")]
1050            meta_compressed: usize,
1051            meta_count: usize,
1052            upper_count: usize,
1053            collectibles_count: usize,
1054            aggregated_collectibles_count: usize,
1055            children_count: usize,
1056            followers_count: usize,
1057            collectibles_dependents_count: usize,
1058            aggregated_dirty_containers_count: usize,
1059            output_size: usize,
1060        }
1061        /// Formats a byte size, optionally including the compressed size when the
1062        /// `print_cache_item_size_with_compressed` feature is enabled.
1063        #[cfg(feature = "print_cache_item_size")]
1064        struct FormatSizes {
1065            size: usize,
1066            #[cfg(feature = "print_cache_item_size_with_compressed")]
1067            compressed_size: usize,
1068        }
1069        #[cfg(feature = "print_cache_item_size")]
1070        impl std::fmt::Display for FormatSizes {
1071            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1072                use turbo_tasks::util::FormatBytes;
1073                #[cfg(feature = "print_cache_item_size_with_compressed")]
1074                {
1075                    write!(
1076                        f,
1077                        "{} ({} compressed)",
1078                        FormatBytes(self.size),
1079                        FormatBytes(self.compressed_size)
1080                    )
1081                }
1082                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1083                {
1084                    write!(f, "{}", FormatBytes(self.size))
1085                }
1086            }
1087        }
1088        #[cfg(feature = "print_cache_item_size")]
1089        impl TaskCacheStats {
1090            #[cfg(feature = "print_cache_item_size_with_compressed")]
1091            fn compressed_size(data: &[u8]) -> Result<usize> {
1092                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1093                    data,
1094                    &mut Vec::new(),
1095                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1096                )?)
1097            }
1098
1099            fn add_data(&mut self, data: &[u8]) {
1100                self.data += data.len();
1101                #[cfg(feature = "print_cache_item_size_with_compressed")]
1102                {
1103                    self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1104                }
1105                self.data_count += 1;
1106            }
1107
1108            fn add_meta(&mut self, data: &[u8]) {
1109                self.meta += data.len();
1110                #[cfg(feature = "print_cache_item_size_with_compressed")]
1111                {
1112                    self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1113                }
1114                self.meta_count += 1;
1115            }
1116
1117            fn add_counts(&mut self, storage: &TaskStorage) {
1118                let counts = storage.meta_counts();
1119                self.upper_count += counts.upper;
1120                self.collectibles_count += counts.collectibles;
1121                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1122                self.children_count += counts.children;
1123                self.followers_count += counts.followers;
1124                self.collectibles_dependents_count += counts.collectibles_dependents;
1125                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1126                if let Some(output) = storage.get_output() {
1127                    use turbo_bincode::turbo_bincode_encode;
1128
1129                    self.output_size += turbo_bincode_encode(&output)
1130                        .map(|data| data.len())
1131                        .unwrap_or(0);
1132                }
1133            }
1134
1135            /// Returns the task name used as the stats grouping key.
1136            fn task_name(storage: &TaskStorage) -> String {
1137                storage
1138                    .get_persistent_task_type()
1139                    .map(|t| t.to_string())
1140                    .unwrap_or_else(|| "<unknown>".to_string())
1141            }
1142
1143            /// Returns the primary sort key: compressed total when
1144            /// `print_cache_item_size_with_compressed` is enabled, raw total otherwise.
1145            fn sort_key(&self) -> usize {
1146                #[cfg(feature = "print_cache_item_size_with_compressed")]
1147                {
1148                    self.data_compressed + self.meta_compressed
1149                }
1150                #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1151                {
1152                    self.data + self.meta
1153                }
1154            }
1155
1156            fn format_total(&self) -> FormatSizes {
1157                FormatSizes {
1158                    size: self.data + self.meta,
1159                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1160                    compressed_size: self.data_compressed + self.meta_compressed,
1161                }
1162            }
1163
1164            fn format_data(&self) -> FormatSizes {
1165                FormatSizes {
1166                    size: self.data,
1167                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1168                    compressed_size: self.data_compressed,
1169                }
1170            }
1171
1172            fn format_avg_data(&self) -> FormatSizes {
1173                FormatSizes {
1174                    size: self.data.checked_div(self.data_count).unwrap_or(0),
1175                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1176                    compressed_size: self
1177                        .data_compressed
1178                        .checked_div(self.data_count)
1179                        .unwrap_or(0),
1180                }
1181            }
1182
1183            fn format_meta(&self) -> FormatSizes {
1184                FormatSizes {
1185                    size: self.meta,
1186                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1187                    compressed_size: self.meta_compressed,
1188                }
1189            }
1190
1191            fn format_avg_meta(&self) -> FormatSizes {
1192                FormatSizes {
1193                    size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1194                    #[cfg(feature = "print_cache_item_size_with_compressed")]
1195                    compressed_size: self
1196                        .meta_compressed
1197                        .checked_div(self.meta_count)
1198                        .unwrap_or(0),
1199                }
1200            }
1201        }
1202        #[cfg(feature = "print_cache_item_size")]
1203        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1204            Mutex::new(FxHashMap::default());
1205
1206        // Encode each task's modified categories. We only encode categories with `modified` set,
1207        // meaning the category was actually dirtied. Categories restored from disk but never
1208        // modified don't need re-persisting since the on-disk version is still valid.
1209        // For tasks accessed during snapshot mode, a frozen copy was made and its `modified`
1210        // flags were copied from the live task at snapshot creation time, reflecting which
1211        // categories were dirtied before the snapshot was taken.
1212        let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1213            let encode_category = |task_id: TaskId,
1214                                   data: &TaskStorage,
1215                                   category: SpecificTaskDataCategory,
1216                                   buffer: &mut TurboBincodeBuffer|
1217             -> Option<TurboBincodeBuffer> {
1218                match encode_task_data(task_id, data, category, buffer) {
1219                    Ok(encoded) => {
1220                        #[cfg(feature = "print_cache_item_size")]
1221                        {
1222                            let mut stats = task_cache_stats.lock();
1223                            let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1224                            match category {
1225                                SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1226                                SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1227                            }
1228                        }
1229                        Some(encoded)
1230                    }
1231                    Err(err) => {
1232                        panic!(
1233                            "Serializing task {} failed ({:?}): {:?}",
1234                            self.debug_get_task_description(task_id),
1235                            category,
1236                            err
1237                        );
1238                    }
1239                }
1240            };
1241            if task_id.is_transient() {
1242                unreachable!("transient task_ids should never be enqueued to be persisted");
1243            }
1244
1245            let encode_meta = inner.flags.meta_modified();
1246            let encode_data = inner.flags.data_modified();
1247
1248            #[cfg(feature = "print_cache_item_size")]
1249            if encode_data || encode_meta {
1250                task_cache_stats
1251                    .lock()
1252                    .entry(TaskCacheStats::task_name(inner))
1253                    .or_default()
1254                    .add_counts(inner);
1255            }
1256
1257            let meta = if encode_meta {
1258                encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1259            } else {
1260                None
1261            };
1262
1263            let data = if encode_data {
1264                encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1265            } else {
1266                None
1267            };
1268            let task_type_hash = if inner.flags.new_task() {
1269                let task_type = inner.get_persistent_task_type().expect(
1270                    "It is not possible for a new_task to not have a persistent_task_type.  Task \
1271                     creation for persistent tasks uses a single ExecutionContextImpl for \
1272                     creating the task (which sets new_task) and connect_child (which sets \
1273                     persistent_task_type) and take_snapshot waits for all operations to complete \
1274                     or suspend before we start snapshotting.  So task creation will always set \
1275                     the task_type.",
1276                );
1277                Some(compute_task_type_hash(task_type))
1278            } else {
1279                None
1280            };
1281
1282            SnapshotItem {
1283                task_id,
1284                meta,
1285                data,
1286                task_type_hash,
1287            }
1288        };
1289
1290        let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1291
1292        drop(snapshot_span);
1293        let snapshot_duration = start.elapsed();
1294        let task_count = task_snapshots.len();
1295
1296        if task_snapshots.is_empty() {
1297            // This should be impossible — if we got here, modified_count was nonzero, and every
1298            // modification that increments the count also failed during encoding.
1299            std::hint::cold_path();
1300            return Some((snapshot_time, false));
1301        }
1302
1303        let persist_start = Instant::now();
1304        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1305        {
1306            if let Err(err) = self
1307                .backing_storage
1308                .save_snapshot(suspended_operations, task_snapshots)
1309            {
1310                eprintln!("Persisting failed: {err:?}");
1311                return None;
1312            }
1313            #[cfg(feature = "print_cache_item_size")]
1314            {
1315                let mut task_cache_stats = task_cache_stats
1316                    .into_inner()
1317                    .into_iter()
1318                    .collect::<Vec<_>>();
1319                if !task_cache_stats.is_empty() {
1320                    use turbo_tasks::util::FormatBytes;
1321
1322                    use crate::utils::markdown_table::print_markdown_table;
1323
1324                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1325                        (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1326                    });
1327
1328                    println!(
1329                        "Task cache stats: {}",
1330                        FormatSizes {
1331                            size: task_cache_stats
1332                                .iter()
1333                                .map(|(_, s)| s.data + s.meta)
1334                                .sum::<usize>(),
1335                            #[cfg(feature = "print_cache_item_size_with_compressed")]
1336                            compressed_size: task_cache_stats
1337                                .iter()
1338                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1339                                .sum::<usize>()
1340                        },
1341                    );
1342
1343                    print_markdown_table(
1344                        [
1345                            "Task",
1346                            " Total Size",
1347                            " Data Size",
1348                            " Data Count x Avg",
1349                            " Data Count x Avg",
1350                            " Meta Size",
1351                            " Meta Count x Avg",
1352                            " Meta Count x Avg",
1353                            " Uppers",
1354                            " Coll",
1355                            " Agg Coll",
1356                            " Children",
1357                            " Followers",
1358                            " Coll Deps",
1359                            " Agg Dirty",
1360                            " Output Size",
1361                        ],
1362                        task_cache_stats.iter(),
1363                        |(task_desc, stats)| {
1364                            [
1365                                task_desc.to_string(),
1366                                format!(" {}", stats.format_total()),
1367                                format!(" {}", stats.format_data()),
1368                                format!(" {} x", stats.data_count),
1369                                format!("{}", stats.format_avg_data()),
1370                                format!(" {}", stats.format_meta()),
1371                                format!(" {} x", stats.meta_count),
1372                                format!("{}", stats.format_avg_meta()),
1373                                format!(" {}", stats.upper_count),
1374                                format!(" {}", stats.collectibles_count),
1375                                format!(" {}", stats.aggregated_collectibles_count),
1376                                format!(" {}", stats.children_count),
1377                                format!(" {}", stats.followers_count),
1378                                format!(" {}", stats.collectibles_dependents_count),
1379                                format!(" {}", stats.aggregated_dirty_containers_count),
1380                                format!(" {}", FormatBytes(stats.output_size)),
1381                            ]
1382                        },
1383                    );
1384                }
1385            }
1386        }
1387
1388        let elapsed = start.elapsed();
1389        let persist_duration = persist_start.elapsed();
1390        // avoid spamming the event queue with information about fast operations
1391        if elapsed > Duration::from_secs(10) {
1392            turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1393                "Finished writing to filesystem cache".to_string(),
1394                elapsed,
1395            )));
1396        }
1397
1398        let wall_start_ms = wall_start
1399            .duration_since(SystemTime::UNIX_EPOCH)
1400            .unwrap_or_default()
1401            // as_millis_f64 is not stable yet
1402            .as_secs_f64()
1403            * 1000.0;
1404        let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1405        turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1406            "turbopack-persistence",
1407            wall_start_ms,
1408            wall_end_ms,
1409            vec![
1410                ("reason", serde_json::Value::from(reason)),
1411                (
1412                    "snapshot_duration_ms",
1413                    serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1414                ),
1415                (
1416                    "persist_duration_ms",
1417                    serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1418                ),
1419                ("task_count", serde_json::Value::from(task_count)),
1420            ],
1421        )));
1422
1423        Some((snapshot_time, true))
1424    }
1425
1426    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1427        if self.should_restore() {
1428            // Continue all uncompleted operations
1429            // They can't be interrupted by a snapshot since the snapshotting job has not been
1430            // scheduled yet.
1431            let uncompleted_operations = self
1432                .backing_storage
1433                .uncompleted_operations()
1434                .expect("Failed to get uncompleted operations");
1435            if !uncompleted_operations.is_empty() {
1436                let mut ctx = self.execute_context(turbo_tasks);
1437                for op in uncompleted_operations {
1438                    op.execute(&mut ctx);
1439                }
1440            }
1441        }
1442
1443        // Only when it should write regularly to the storage, we schedule the initial snapshot
1444        // job.
1445        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1446            // Schedule the snapshot job
1447            let _span = trace_span!("persisting background job").entered();
1448            let _span = tracing::info_span!("thread").entered();
1449            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1450        }
1451    }
1452
1453    fn stopping(&self) {
1454        self.stopping.store(true, Ordering::Release);
1455        self.stopping_event.notify(usize::MAX);
1456    }
1457
1458    #[allow(unused_variables)]
1459    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1460        #[cfg(feature = "verify_aggregation_graph")]
1461        {
1462            self.is_idle.store(false, Ordering::Release);
1463            self.verify_aggregation_graph(turbo_tasks, false);
1464        }
1465        if self.should_persist() {
1466            self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1467        }
1468        drop_contents(&self.task_cache);
1469        self.storage.drop_contents();
1470        if let Err(err) = self.backing_storage.shutdown() {
1471            println!("Shutting down failed: {err}");
1472        }
1473    }
1474
1475    #[allow(unused_variables)]
1476    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1477        self.idle_start_event.notify(usize::MAX);
1478
1479        #[cfg(feature = "verify_aggregation_graph")]
1480        {
1481            use tokio::select;
1482
1483            self.is_idle.store(true, Ordering::Release);
1484            let this = self.clone();
1485            let turbo_tasks = turbo_tasks.pin();
1486            tokio::task::spawn(async move {
1487                select! {
1488                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1489                        // do nothing
1490                    }
1491                    _ = this.idle_end_event.listen() => {
1492                        return;
1493                    }
1494                }
1495                if !this.is_idle.load(Ordering::Relaxed) {
1496                    return;
1497                }
1498                this.verify_aggregation_graph(&*turbo_tasks, true);
1499            });
1500        }
1501    }
1502
1503    fn idle_end(&self) {
1504        #[cfg(feature = "verify_aggregation_graph")]
1505        self.is_idle.store(false, Ordering::Release);
1506        self.idle_end_event.notify(usize::MAX);
1507    }
1508
1509    fn get_or_create_persistent_task(
1510        &self,
1511        task_type: CachedTaskType,
1512        parent_task: Option<TaskId>,
1513        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1514    ) -> TaskId {
1515        let is_root = task_type.native_fn.is_root;
1516
1517        // First check if the task exists in the cache which only uses a read lock
1518        // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
1519        // before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
1520        if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1521            self.track_cache_hit(&task_type);
1522            self.connect_child(
1523                parent_task,
1524                task_id,
1525                Some(ArcOrOwned::Owned(task_type)),
1526                turbo_tasks,
1527            );
1528            return task_id;
1529        }
1530
1531        // Create a single ExecuteContext for both lookup and connect_child
1532        let mut ctx = self.execute_context(turbo_tasks);
1533
1534        let mut is_new = false;
1535        let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1536            // Task exists in backing storage
1537            // So we only need to insert it into the in-memory cache
1538            self.track_cache_hit(&task_type);
1539            let task_type = match raw_entry(&self.task_cache, &task_type) {
1540                RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1541                RawEntry::Vacant(e) => {
1542                    let task_type = Arc::new(task_type);
1543                    e.insert(task_type.clone(), task_id);
1544                    ArcOrOwned::Arc(task_type)
1545                }
1546            };
1547            (task_id, task_type)
1548        } else {
1549            // Task doesn't exist in memory cache or backing storage
1550            // So we might need to create a new task
1551            let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1552                RawEntry::Occupied(e) => {
1553                    // Another thread beat us to creating this task - use their task_id.
1554                    // They will handle logging the new task as modified
1555                    let task_id = *e.get();
1556                    drop(e);
1557                    self.track_cache_hit(&task_type);
1558                    (task_id, ArcOrOwned::Owned(task_type))
1559                }
1560                RawEntry::Vacant(e) => {
1561                    // We're creating a new task.
1562                    let task_type = Arc::new(task_type);
1563                    let task_id = self.persisted_task_id_factory.get();
1564                    // Initialize storage BEFORE making task_id visible in the cache.
1565                    // This ensures any thread that reads task_id from the cache sees
1566                    // the storage entry already initialized (restored flags set).
1567                    self.storage.initialize_new_task(task_id);
1568                    e.insert(task_type.clone(), task_id);
1569                    // insert() consumes e, releasing the lock
1570                    self.track_cache_miss(&task_type);
1571                    is_new = true;
1572                    (task_id, ArcOrOwned::Arc(task_type))
1573                }
1574            };
1575            (task_id, task_type)
1576        };
1577        if is_new && is_root {
1578            AggregationUpdateQueue::run(
1579                AggregationUpdateJob::UpdateAggregationNumber {
1580                    task_id,
1581                    base_aggregation_number: u32::MAX,
1582                    distance: None,
1583                },
1584                &mut ctx,
1585            );
1586        }
1587        // Reuse the same ExecuteContext for connect_child
1588        operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1589
1590        task_id
1591    }
1592
1593    fn get_or_create_transient_task(
1594        &self,
1595        task_type: CachedTaskType,
1596        parent_task: Option<TaskId>,
1597        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1598    ) -> TaskId {
1599        let is_root = task_type.native_fn.is_root;
1600
1601        if let Some(parent_task) = parent_task
1602            && !parent_task.is_transient()
1603        {
1604            self.panic_persistent_calling_transient(
1605                self.debug_get_task_description(parent_task),
1606                Some(&task_type),
1607                /* cell_id */ None,
1608            );
1609        }
1610        // First check if the task exists in the cache which only uses a read lock.
1611        // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
1612        // before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
1613        if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1614            self.track_cache_hit(&task_type);
1615            self.connect_child(
1616                parent_task,
1617                task_id,
1618                Some(ArcOrOwned::Owned(task_type)),
1619                turbo_tasks,
1620            );
1621            return task_id;
1622        }
1623        // If not, acquire a write lock and double check / insert
1624        match raw_entry(&self.task_cache, &task_type) {
1625            RawEntry::Occupied(e) => {
1626                let task_id = *e.get();
1627                drop(e);
1628                self.track_cache_hit(&task_type);
1629                self.connect_child(
1630                    parent_task,
1631                    task_id,
1632                    Some(ArcOrOwned::Owned(task_type)),
1633                    turbo_tasks,
1634                );
1635                task_id
1636            }
1637            RawEntry::Vacant(e) => {
1638                let task_type = Arc::new(task_type);
1639                let task_id = self.transient_task_id_factory.get();
1640                // Initialize storage BEFORE making task_id visible in the cache.
1641                self.storage.initialize_new_task(task_id);
1642                e.insert(task_type.clone(), task_id);
1643                self.track_cache_miss(&task_type);
1644
1645                if is_root {
1646                    let mut ctx = self.execute_context(turbo_tasks);
1647                    AggregationUpdateQueue::run(
1648                        AggregationUpdateJob::UpdateAggregationNumber {
1649                            task_id,
1650                            base_aggregation_number: u32::MAX,
1651                            distance: None,
1652                        },
1653                        &mut ctx,
1654                    );
1655                }
1656
1657                self.connect_child(
1658                    parent_task,
1659                    task_id,
1660                    Some(ArcOrOwned::Arc(task_type)),
1661                    turbo_tasks,
1662                );
1663
1664                task_id
1665            }
1666        }
1667    }
1668
1669    /// Generate an object that implements [`fmt::Display`] explaining why the given
1670    /// [`CachedTaskType`] is transient.
1671    fn debug_trace_transient_task(
1672        &self,
1673        task_type: &CachedTaskType,
1674        cell_id: Option<CellId>,
1675    ) -> DebugTraceTransientTask {
1676        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1677        // from tracing the same task many times, so use a visited_set
1678        fn inner_id(
1679            backend: &TurboTasksBackendInner<impl BackingStorage>,
1680            task_id: TaskId,
1681            cell_type_id: Option<ValueTypeId>,
1682            visited_set: &mut FxHashSet<TaskId>,
1683        ) -> DebugTraceTransientTask {
1684            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1685                if visited_set.contains(&task_id) {
1686                    let task_name = task_type.get_name();
1687                    DebugTraceTransientTask::Collapsed {
1688                        task_name,
1689                        cell_type_id,
1690                    }
1691                } else {
1692                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1693                }
1694            } else {
1695                DebugTraceTransientTask::Uncached { cell_type_id }
1696            }
1697        }
1698        fn inner_cached(
1699            backend: &TurboTasksBackendInner<impl BackingStorage>,
1700            task_type: &CachedTaskType,
1701            cell_type_id: Option<ValueTypeId>,
1702            visited_set: &mut FxHashSet<TaskId>,
1703        ) -> DebugTraceTransientTask {
1704            let task_name = task_type.get_name();
1705
1706            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1707                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1708                    // `task_id` should never be `None` at this point, as that would imply a
1709                    // non-local task is returning a local `Vc`...
1710                    // Just ignore if it happens, as we're likely already panicking.
1711                    return None;
1712                };
1713                if task_id.is_transient() {
1714                    Some(Box::new(inner_id(
1715                        backend,
1716                        task_id,
1717                        cause_self_raw_vc.try_get_type_id(),
1718                        visited_set,
1719                    )))
1720                } else {
1721                    None
1722                }
1723            });
1724            let cause_args = task_type
1725                .arg
1726                .get_raw_vcs()
1727                .into_iter()
1728                .filter_map(|raw_vc| {
1729                    let Some(task_id) = raw_vc.try_get_task_id() else {
1730                        // `task_id` should never be `None` (see comment above)
1731                        return None;
1732                    };
1733                    if !task_id.is_transient() {
1734                        return None;
1735                    }
1736                    Some((task_id, raw_vc.try_get_type_id()))
1737                })
1738                .collect::<IndexSet<_>>() // dedupe
1739                .into_iter()
1740                .map(|(task_id, cell_type_id)| {
1741                    inner_id(backend, task_id, cell_type_id, visited_set)
1742                })
1743                .collect();
1744
1745            DebugTraceTransientTask::Cached {
1746                task_name,
1747                cell_type_id,
1748                cause_self,
1749                cause_args,
1750            }
1751        }
1752        inner_cached(
1753            self,
1754            task_type,
1755            cell_id.map(|c| c.type_id),
1756            &mut FxHashSet::default(),
1757        )
1758    }
1759
1760    fn invalidate_task(
1761        &self,
1762        task_id: TaskId,
1763        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1764    ) {
1765        if !self.should_track_dependencies() {
1766            panic!("Dependency tracking is disabled so invalidation is not allowed");
1767        }
1768        operation::InvalidateOperation::run(
1769            smallvec![task_id],
1770            #[cfg(feature = "trace_task_dirty")]
1771            TaskDirtyCause::Invalidator,
1772            self.execute_context(turbo_tasks),
1773        );
1774    }
1775
1776    fn invalidate_tasks(
1777        &self,
1778        tasks: &[TaskId],
1779        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1780    ) {
1781        if !self.should_track_dependencies() {
1782            panic!("Dependency tracking is disabled so invalidation is not allowed");
1783        }
1784        operation::InvalidateOperation::run(
1785            tasks.iter().copied().collect(),
1786            #[cfg(feature = "trace_task_dirty")]
1787            TaskDirtyCause::Unknown,
1788            self.execute_context(turbo_tasks),
1789        );
1790    }
1791
1792    fn invalidate_tasks_set(
1793        &self,
1794        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1795        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1796    ) {
1797        if !self.should_track_dependencies() {
1798            panic!("Dependency tracking is disabled so invalidation is not allowed");
1799        }
1800        operation::InvalidateOperation::run(
1801            tasks.iter().copied().collect(),
1802            #[cfg(feature = "trace_task_dirty")]
1803            TaskDirtyCause::Unknown,
1804            self.execute_context(turbo_tasks),
1805        );
1806    }
1807
1808    fn invalidate_serialization(
1809        &self,
1810        task_id: TaskId,
1811        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1812    ) {
1813        if task_id.is_transient() {
1814            return;
1815        }
1816        let mut ctx = self.execute_context(turbo_tasks);
1817        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1818        task.invalidate_serialization();
1819    }
1820
1821    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1822        let task = self.storage.access_mut(task_id);
1823        if let Some(value) = task.get_persistent_task_type() {
1824            format!("{task_id:?} {}", value)
1825        } else if let Some(value) = task.get_transient_task_type() {
1826            format!("{task_id:?} {}", value)
1827        } else {
1828            format!("{task_id:?} unknown")
1829        }
1830    }
1831
1832    fn get_task_name(
1833        &self,
1834        task_id: TaskId,
1835        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1836    ) -> String {
1837        let mut ctx = self.execute_context(turbo_tasks);
1838        let task = ctx.task(task_id, TaskDataCategory::Data);
1839        if let Some(value) = task.get_persistent_task_type() {
1840            value.to_string()
1841        } else if let Some(value) = task.get_transient_task_type() {
1842            value.to_string()
1843        } else {
1844            "unknown".to_string()
1845        }
1846    }
1847
1848    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1849        let task = self.storage.access_mut(task_id);
1850        task.get_persistent_task_type().cloned()
1851    }
1852
1853    fn task_execution_canceled(
1854        &self,
1855        task_id: TaskId,
1856        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1857    ) {
1858        let mut ctx = self.execute_context(turbo_tasks);
1859        let mut task = ctx.task(task_id, TaskDataCategory::All);
1860        if let Some(in_progress) = task.take_in_progress() {
1861            match in_progress {
1862                InProgressState::Scheduled {
1863                    done_event,
1864                    reason: _,
1865                } => done_event.notify(usize::MAX),
1866                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1867                    done_event.notify(usize::MAX)
1868                }
1869                InProgressState::Canceled => {}
1870            }
1871        }
1872        // Notify any readers waiting on in-progress cells so their listeners
1873        // resolve and foreground jobs can finish (prevents stop_and_wait hang).
1874        let in_progress_cells = task.take_in_progress_cells();
1875        if let Some(ref cells) = in_progress_cells {
1876            for state in cells.values() {
1877                state.event.notify(usize::MAX);
1878            }
1879        }
1880
1881        // Mark the cancelled task as session-dependent dirty so it will be re-executed
1882        // in the next session. Without this, any reader that encounters the cancelled task
1883        // records an error in its output. That error is persisted and would poison
1884        // subsequent builds. By marking the task session-dependent dirty, the next build
1885        // re-executes it, which invalidates dependents and corrects the stale errors.
1886        let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1887            task.update_dirty_state(Some(Dirtyness::SessionDependent))
1888        } else {
1889            None
1890        };
1891
1892        let old = task.set_in_progress(InProgressState::Canceled);
1893        debug_assert!(old.is_none(), "InProgress already exists");
1894        drop(task);
1895
1896        if let Some(data_update) = data_update {
1897            AggregationUpdateQueue::run(data_update, &mut ctx);
1898        }
1899
1900        drop(in_progress_cells);
1901    }
1902
1903    fn try_start_task_execution(
1904        &self,
1905        task_id: TaskId,
1906        priority: TaskPriority,
1907        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1908    ) -> Option<TaskExecutionSpec<'_>> {
1909        let execution_reason;
1910        let task_type;
1911        {
1912            let mut ctx = self.execute_context(turbo_tasks);
1913            let mut task = ctx.task(task_id, TaskDataCategory::All);
1914            task_type = task.get_task_type().to_owned();
1915            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1916            if let Some(tasks) = task.prefetch() {
1917                drop(task);
1918                ctx.prepare_tasks(tasks, "prefetch");
1919                task = ctx.task(task_id, TaskDataCategory::All);
1920            }
1921            let in_progress = task.take_in_progress()?;
1922            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1923                let old = task.set_in_progress(in_progress);
1924                debug_assert!(old.is_none(), "InProgress already exists");
1925                return None;
1926            };
1927            execution_reason = reason;
1928            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1929                InProgressStateInner {
1930                    stale: false,
1931                    once_task,
1932                    done_event,
1933                    session_dependent: false,
1934                    marked_as_completed: false,
1935                    new_children: Default::default(),
1936                },
1937            )));
1938            debug_assert!(old.is_none(), "InProgress already exists");
1939
1940            // Make all current collectibles outdated (remove left-over outdated collectibles)
1941            enum Collectible {
1942                Current(CollectibleRef, i32),
1943                Outdated(CollectibleRef),
1944            }
1945            let collectibles = task
1946                .iter_collectibles()
1947                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1948                .chain(
1949                    task.iter_outdated_collectibles()
1950                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1951                )
1952                .collect::<Vec<_>>();
1953            for collectible in collectibles {
1954                match collectible {
1955                    Collectible::Current(collectible, value) => {
1956                        let _ = task.insert_outdated_collectible(collectible, value);
1957                    }
1958                    Collectible::Outdated(collectible) => {
1959                        if task
1960                            .collectibles()
1961                            .is_none_or(|m| m.get(&collectible).is_none())
1962                        {
1963                            task.remove_outdated_collectibles(&collectible);
1964                        }
1965                    }
1966                }
1967            }
1968
1969            if self.should_track_dependencies() {
1970                // Make all dependencies outdated
1971                let cell_dependencies = task.iter_cell_dependencies().collect();
1972                task.set_outdated_cell_dependencies(cell_dependencies);
1973
1974                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1975                task.set_outdated_output_dependencies(outdated_output_dependencies);
1976            }
1977        }
1978
1979        let (span, future) = match task_type {
1980            TaskType::Cached(task_type) => {
1981                let CachedTaskType {
1982                    native_fn,
1983                    this,
1984                    arg,
1985                } = &*task_type;
1986                (
1987                    native_fn.span(task_id.persistence(), execution_reason, priority),
1988                    native_fn.execute(*this, &**arg),
1989                )
1990            }
1991            TaskType::Transient(task_type) => {
1992                let span = tracing::trace_span!("turbo_tasks::root_task");
1993                let future = match &*task_type {
1994                    TransientTask::Root(f) => f(),
1995                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1996                };
1997                (span, future)
1998            }
1999        };
2000        Some(TaskExecutionSpec { future, span })
2001    }
2002
2003    fn task_execution_completed(
2004        &self,
2005        task_id: TaskId,
2006        result: Result<RawVc, TurboTasksExecutionError>,
2007        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2008        #[cfg(feature = "verify_determinism")] stateful: bool,
2009        has_invalidator: bool,
2010        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2011    ) -> bool {
2012        // Task completion is a 4 step process:
2013        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
2014        //    aggregation number of the task and the new children.
2015        // 2. Connect the new children to the task (and do the relevant aggregation updates).
2016        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
2017        // 4. Shrink the task memory to reduce footprint of the task.
2018
2019        // Due to persistence it is possible that the process is cancelled after any step. This is
2020        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
2021        // in-memory representation.
2022
2023        // The task might be invalidated during this process, so we need to check the stale flag
2024        // at the start of every step.
2025
2026        #[cfg(not(feature = "trace_task_details"))]
2027        let span = tracing::trace_span!(
2028            "task execution completed",
2029            new_children = tracing::field::Empty
2030        )
2031        .entered();
2032        #[cfg(feature = "trace_task_details")]
2033        let span = tracing::trace_span!(
2034            "task execution completed",
2035            task_id = display(task_id),
2036            result = match result.as_ref() {
2037                Ok(value) => display(either::Either::Left(value)),
2038                Err(err) => display(either::Either::Right(err)),
2039            },
2040            new_children = tracing::field::Empty,
2041            immutable = tracing::field::Empty,
2042            new_output = tracing::field::Empty,
2043            output_dependents = tracing::field::Empty,
2044            stale = tracing::field::Empty,
2045        )
2046        .entered();
2047
2048        let is_error = result.is_err();
2049
2050        let mut ctx = self.execute_context(turbo_tasks);
2051
2052        let Some(TaskExecutionCompletePrepareResult {
2053            new_children,
2054            is_now_immutable,
2055            #[cfg(feature = "verify_determinism")]
2056            no_output_set,
2057            new_output,
2058            output_dependent_tasks,
2059        }) = self.task_execution_completed_prepare(
2060            &mut ctx,
2061            #[cfg(feature = "trace_task_details")]
2062            &span,
2063            task_id,
2064            result,
2065            cell_counters,
2066            #[cfg(feature = "verify_determinism")]
2067            stateful,
2068            has_invalidator,
2069        )
2070        else {
2071            // Task was stale and has been rescheduled
2072            #[cfg(feature = "trace_task_details")]
2073            span.record("stale", "prepare");
2074            return true;
2075        };
2076
2077        #[cfg(feature = "trace_task_details")]
2078        span.record("new_output", new_output.is_some());
2079        #[cfg(feature = "trace_task_details")]
2080        span.record("output_dependents", output_dependent_tasks.len());
2081
2082        // When restoring from filesystem cache the following might not be executed (since we can
2083        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
2084        // would be executed again.
2085
2086        if !output_dependent_tasks.is_empty() {
2087            self.task_execution_completed_invalidate_output_dependent(
2088                &mut ctx,
2089                task_id,
2090                output_dependent_tasks,
2091            );
2092        }
2093
2094        let has_new_children = !new_children.is_empty();
2095        span.record("new_children", new_children.len());
2096
2097        if has_new_children {
2098            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2099        }
2100
2101        if has_new_children
2102            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2103        {
2104            // Task was stale and has been rescheduled
2105            #[cfg(feature = "trace_task_details")]
2106            span.record("stale", "connect");
2107            return true;
2108        }
2109
2110        let (stale, in_progress_cells) = self.task_execution_completed_finish(
2111            &mut ctx,
2112            task_id,
2113            #[cfg(feature = "verify_determinism")]
2114            no_output_set,
2115            new_output,
2116            is_now_immutable,
2117        );
2118        if stale {
2119            // Task was stale and has been rescheduled
2120            #[cfg(feature = "trace_task_details")]
2121            span.record("stale", "finish");
2122            return true;
2123        }
2124
2125        let removed_data =
2126            self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2127
2128        // Drop data outside of critical sections
2129        drop(removed_data);
2130        drop(in_progress_cells);
2131
2132        false
2133    }
2134
2135    fn task_execution_completed_prepare(
2136        &self,
2137        ctx: &mut impl ExecuteContext<'_>,
2138        #[cfg(feature = "trace_task_details")] span: &Span,
2139        task_id: TaskId,
2140        result: Result<RawVc, TurboTasksExecutionError>,
2141        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2142        #[cfg(feature = "verify_determinism")] stateful: bool,
2143        has_invalidator: bool,
2144    ) -> Option<TaskExecutionCompletePrepareResult> {
2145        let mut task = ctx.task(task_id, TaskDataCategory::All);
2146        let Some(in_progress) = task.get_in_progress_mut() else {
2147            panic!("Task execution completed, but task is not in progress: {task:#?}");
2148        };
2149        if matches!(in_progress, InProgressState::Canceled) {
2150            return Some(TaskExecutionCompletePrepareResult {
2151                new_children: Default::default(),
2152                is_now_immutable: false,
2153                #[cfg(feature = "verify_determinism")]
2154                no_output_set: false,
2155                new_output: None,
2156                output_dependent_tasks: Default::default(),
2157            });
2158        }
2159        let &mut InProgressState::InProgress(box InProgressStateInner {
2160            stale,
2161            ref mut new_children,
2162            session_dependent,
2163            once_task: is_once_task,
2164            ..
2165        }) = in_progress
2166        else {
2167            panic!("Task execution completed, but task is not in progress: {task:#?}");
2168        };
2169
2170        // If the task is stale, reschedule it
2171        #[cfg(not(feature = "no_fast_stale"))]
2172        if stale && !is_once_task {
2173            let Some(InProgressState::InProgress(box InProgressStateInner {
2174                done_event,
2175                mut new_children,
2176                ..
2177            })) = task.take_in_progress()
2178            else {
2179                unreachable!();
2180            };
2181            let old = task.set_in_progress(InProgressState::Scheduled {
2182                done_event,
2183                reason: TaskExecutionReason::Stale,
2184            });
2185            debug_assert!(old.is_none(), "InProgress already exists");
2186            // Remove old children from new_children to leave only the children that had their
2187            // active count increased
2188            for task in task.iter_children() {
2189                new_children.remove(&task);
2190            }
2191            drop(task);
2192
2193            // We need to undo the active count increase for the children since we throw away the
2194            // new_children list now.
2195            AggregationUpdateQueue::run(
2196                AggregationUpdateJob::DecreaseActiveCounts {
2197                    task_ids: new_children.into_iter().collect(),
2198                },
2199                ctx,
2200            );
2201            return None;
2202        }
2203
2204        // take the children from the task to process them
2205        let mut new_children = take(new_children);
2206
2207        // handle stateful (only tracked when verify_determinism is enabled)
2208        #[cfg(feature = "verify_determinism")]
2209        if stateful {
2210            task.set_stateful(true);
2211        }
2212
2213        // handle has_invalidator
2214        if has_invalidator {
2215            task.set_invalidator(true);
2216        }
2217
2218        // handle cell counters: update max index and remove cells that are no longer used
2219        // On error, skip this update: the task may have failed before creating all cells it
2220        // normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2221        // based on partial counters would cause "cell no longer exists" errors for tasks that
2222        // still hold dependencies on those cells. The old cell data is preserved on error
2223        // (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2224        // that data is correct.
2225        // NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2226        // skips cell data removal on error.
2227        if result.is_ok() {
2228            let old_counters: FxHashMap<_, _> = task
2229                .iter_cell_type_max_index()
2230                .map(|(&k, &v)| (k, v))
2231                .collect();
2232            let mut counters_to_remove = old_counters.clone();
2233
2234            for (&cell_type, &max_index) in cell_counters.iter() {
2235                if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2236                    if old_max_index != max_index {
2237                        task.insert_cell_type_max_index(cell_type, max_index);
2238                    }
2239                } else {
2240                    task.insert_cell_type_max_index(cell_type, max_index);
2241                }
2242            }
2243            for (cell_type, _) in counters_to_remove {
2244                task.remove_cell_type_max_index(&cell_type);
2245            }
2246        }
2247
2248        let mut queue = AggregationUpdateQueue::new();
2249
2250        let mut old_edges = Vec::new();
2251
2252        let has_children = !new_children.is_empty();
2253        let is_immutable = task.immutable();
2254        let task_dependencies_for_immutable =
2255            // Task was previously marked as immutable
2256            if !is_immutable
2257            // Task is not session dependent (session dependent tasks can change between sessions)
2258            && !session_dependent
2259            // Task has no invalidator
2260            && !task.invalidator()
2261            // Task has no dependencies on collectibles
2262            && task.is_collectibles_dependencies_empty()
2263        {
2264            Some(
2265                // Collect all dependencies on tasks to check if all dependencies are immutable
2266                task.iter_output_dependencies()
2267                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2268                    .collect::<FxHashSet<_>>(),
2269            )
2270        } else {
2271            None
2272        };
2273
2274        if has_children {
2275            // Prepare all new children
2276            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2277
2278            // Filter actual new children
2279            old_edges.extend(
2280                task.iter_children()
2281                    .filter(|task| !new_children.remove(task))
2282                    .map(OutdatedEdge::Child),
2283            );
2284        } else {
2285            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2286        }
2287
2288        old_edges.extend(
2289            task.iter_outdated_collectibles()
2290                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2291        );
2292
2293        if self.should_track_dependencies() {
2294            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2295            // At execution start, active deps are copied to outdated as a "before" snapshot.
2296            // During execution, new deps are added to active.
2297            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2298            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2299            // breaking dependency tracking.
2300            old_edges.extend(
2301                task.iter_outdated_cell_dependencies()
2302                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2303            );
2304            old_edges.extend(
2305                task.iter_outdated_output_dependencies()
2306                    .map(OutdatedEdge::OutputDependency),
2307            );
2308        }
2309
2310        // Check if output need to be updated
2311        let current_output = task.get_output();
2312        #[cfg(feature = "verify_determinism")]
2313        let no_output_set = current_output.is_none();
2314        let new_output = match result {
2315            Ok(RawVc::TaskOutput(output_task_id)) => {
2316                if let Some(OutputValue::Output(current_task_id)) = current_output
2317                    && *current_task_id == output_task_id
2318                {
2319                    None
2320                } else {
2321                    Some(OutputValue::Output(output_task_id))
2322                }
2323            }
2324            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2325                if let Some(OutputValue::Cell(CellRef {
2326                    task: current_task_id,
2327                    cell: current_cell,
2328                })) = current_output
2329                    && *current_task_id == output_task_id
2330                    && *current_cell == cell
2331                {
2332                    None
2333                } else {
2334                    Some(OutputValue::Cell(CellRef {
2335                        task: output_task_id,
2336                        cell,
2337                    }))
2338                }
2339            }
2340            Ok(RawVc::LocalOutput(..)) => {
2341                panic!("Non-local tasks must not return a local Vc");
2342            }
2343            Err(err) => {
2344                if let Some(OutputValue::Error(old_error)) = current_output
2345                    && **old_error == err
2346                {
2347                    None
2348                } else {
2349                    Some(OutputValue::Error(Arc::new((&err).into())))
2350                }
2351            }
2352        };
2353        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2354        // When output has changed, grab the dependent tasks
2355        if new_output.is_some() && ctx.should_track_dependencies() {
2356            output_dependent_tasks = task.iter_output_dependent().collect();
2357        }
2358
2359        drop(task);
2360
2361        // Check if the task can be marked as immutable
2362        let mut is_now_immutable = false;
2363        if let Some(dependencies) = task_dependencies_for_immutable
2364            && dependencies
2365                .iter()
2366                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2367        {
2368            is_now_immutable = true;
2369        }
2370        #[cfg(feature = "trace_task_details")]
2371        span.record("immutable", is_immutable || is_now_immutable);
2372
2373        if !queue.is_empty() || !old_edges.is_empty() {
2374            #[cfg(feature = "trace_task_completion")]
2375            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2376            // Remove outdated edges first, before removing in_progress+dirty flag.
2377            // We need to make sure all outdated edges are removed before the task can potentially
2378            // be scheduled and executed again
2379            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2380        }
2381
2382        Some(TaskExecutionCompletePrepareResult {
2383            new_children,
2384            is_now_immutable,
2385            #[cfg(feature = "verify_determinism")]
2386            no_output_set,
2387            new_output,
2388            output_dependent_tasks,
2389        })
2390    }
2391
2392    fn task_execution_completed_invalidate_output_dependent(
2393        &self,
2394        ctx: &mut impl ExecuteContext<'_>,
2395        task_id: TaskId,
2396        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2397    ) {
2398        debug_assert!(!output_dependent_tasks.is_empty());
2399
2400        if output_dependent_tasks.len() > 1 {
2401            ctx.prepare_tasks(
2402                output_dependent_tasks
2403                    .iter()
2404                    .map(|&id| (id, TaskDataCategory::All)),
2405                "invalidate output dependents",
2406            );
2407        }
2408
2409        fn process_output_dependents(
2410            ctx: &mut impl ExecuteContext<'_>,
2411            task_id: TaskId,
2412            dependent_task_id: TaskId,
2413            queue: &mut AggregationUpdateQueue,
2414        ) {
2415            #[cfg(feature = "trace_task_output_dependencies")]
2416            let span = tracing::trace_span!(
2417                "invalidate output dependency",
2418                task = %task_id,
2419                dependent_task = %dependent_task_id,
2420                result = tracing::field::Empty,
2421            )
2422            .entered();
2423            let mut make_stale = true;
2424            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2425            let transient_task_type = dependent.get_transient_task_type();
2426            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2427                // once tasks are never invalidated
2428                #[cfg(feature = "trace_task_output_dependencies")]
2429                span.record("result", "once task");
2430                return;
2431            }
2432            if dependent.outdated_output_dependencies_contains(&task_id) {
2433                #[cfg(feature = "trace_task_output_dependencies")]
2434                span.record("result", "outdated dependency");
2435                // output dependency is outdated, so it hasn't read the output yet
2436                // and doesn't need to be invalidated
2437                // But importantly we still need to make the task dirty as it should no longer
2438                // be considered as "recomputation".
2439                make_stale = false;
2440            } else if !dependent.output_dependencies_contains(&task_id) {
2441                // output dependency has been removed, so the task doesn't depend on the
2442                // output anymore and doesn't need to be invalidated
2443                #[cfg(feature = "trace_task_output_dependencies")]
2444                span.record("result", "no backward dependency");
2445                return;
2446            }
2447            make_task_dirty_internal(
2448                dependent,
2449                dependent_task_id,
2450                make_stale,
2451                #[cfg(feature = "trace_task_dirty")]
2452                TaskDirtyCause::OutputChange { task_id },
2453                queue,
2454                ctx,
2455            );
2456            #[cfg(feature = "trace_task_output_dependencies")]
2457            span.record("result", "marked dirty");
2458        }
2459
2460        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2461            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2462            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2463            let _ = scope_and_block(chunks.len(), |scope| {
2464                for chunk in chunks {
2465                    let child_ctx = ctx.child_context();
2466                    scope.spawn(move || {
2467                        let mut ctx = child_ctx.create();
2468                        let mut queue = AggregationUpdateQueue::new();
2469                        for dependent_task_id in chunk {
2470                            process_output_dependents(
2471                                &mut ctx,
2472                                task_id,
2473                                dependent_task_id,
2474                                &mut queue,
2475                            )
2476                        }
2477                        queue.execute(&mut ctx);
2478                    });
2479                }
2480            });
2481        } else {
2482            let mut queue = AggregationUpdateQueue::new();
2483            for dependent_task_id in output_dependent_tasks {
2484                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2485            }
2486            queue.execute(ctx);
2487        }
2488    }
2489
2490    fn task_execution_completed_unfinished_children_dirty(
2491        &self,
2492        ctx: &mut impl ExecuteContext<'_>,
2493        new_children: &FxHashSet<TaskId>,
2494    ) {
2495        debug_assert!(!new_children.is_empty());
2496
2497        let mut queue = AggregationUpdateQueue::new();
2498        ctx.for_each_task_all(
2499            new_children.iter().copied(),
2500            "unfinished children dirty",
2501            |child_task, ctx| {
2502                if !child_task.has_output() {
2503                    let child_id = child_task.id();
2504                    make_task_dirty_internal(
2505                        child_task,
2506                        child_id,
2507                        false,
2508                        #[cfg(feature = "trace_task_dirty")]
2509                        TaskDirtyCause::InitialDirty,
2510                        &mut queue,
2511                        ctx,
2512                    );
2513                }
2514            },
2515        );
2516
2517        queue.execute(ctx);
2518    }
2519
2520    fn task_execution_completed_connect(
2521        &self,
2522        ctx: &mut impl ExecuteContext<'_>,
2523        task_id: TaskId,
2524        new_children: FxHashSet<TaskId>,
2525    ) -> bool {
2526        debug_assert!(!new_children.is_empty());
2527
2528        let mut task = ctx.task(task_id, TaskDataCategory::All);
2529        let Some(in_progress) = task.get_in_progress() else {
2530            panic!("Task execution completed, but task is not in progress: {task:#?}");
2531        };
2532        if matches!(in_progress, InProgressState::Canceled) {
2533            // Task was canceled in the meantime, so we don't connect the children
2534            return false;
2535        }
2536        let InProgressState::InProgress(box InProgressStateInner {
2537            #[cfg(not(feature = "no_fast_stale"))]
2538            stale,
2539            once_task: is_once_task,
2540            ..
2541        }) = in_progress
2542        else {
2543            panic!("Task execution completed, but task is not in progress: {task:#?}");
2544        };
2545
2546        // If the task is stale, reschedule it
2547        #[cfg(not(feature = "no_fast_stale"))]
2548        if *stale && !is_once_task {
2549            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2550                task.take_in_progress()
2551            else {
2552                unreachable!();
2553            };
2554            let old = task.set_in_progress(InProgressState::Scheduled {
2555                done_event,
2556                reason: TaskExecutionReason::Stale,
2557            });
2558            debug_assert!(old.is_none(), "InProgress already exists");
2559            drop(task);
2560
2561            // All `new_children` are currently hold active with an active count and we need to undo
2562            // that. (We already filtered out the old children from that list)
2563            AggregationUpdateQueue::run(
2564                AggregationUpdateJob::DecreaseActiveCounts {
2565                    task_ids: new_children.into_iter().collect(),
2566                },
2567                ctx,
2568            );
2569            return true;
2570        }
2571
2572        let has_active_count = ctx.should_track_activeness()
2573            && task
2574                .get_activeness()
2575                .is_some_and(|activeness| activeness.active_counter > 0);
2576        connect_children(
2577            ctx,
2578            task_id,
2579            task,
2580            new_children,
2581            has_active_count,
2582            ctx.should_track_activeness(),
2583        );
2584
2585        false
2586    }
2587
2588    fn task_execution_completed_finish(
2589        &self,
2590        ctx: &mut impl ExecuteContext<'_>,
2591        task_id: TaskId,
2592        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2593        new_output: Option<OutputValue>,
2594        is_now_immutable: bool,
2595    ) -> (
2596        bool,
2597        Option<
2598            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2599        >,
2600    ) {
2601        let mut task = ctx.task(task_id, TaskDataCategory::All);
2602        let Some(in_progress) = task.take_in_progress() else {
2603            panic!("Task execution completed, but task is not in progress: {task:#?}");
2604        };
2605        if matches!(in_progress, InProgressState::Canceled) {
2606            // Task was canceled in the meantime, so we don't finish it
2607            return (false, None);
2608        }
2609        let InProgressState::InProgress(box InProgressStateInner {
2610            done_event,
2611            once_task: is_once_task,
2612            stale,
2613            session_dependent,
2614            marked_as_completed: _,
2615            new_children,
2616        }) = in_progress
2617        else {
2618            panic!("Task execution completed, but task is not in progress: {task:#?}");
2619        };
2620        debug_assert!(new_children.is_empty());
2621
2622        // If the task is stale, reschedule it
2623        if stale && !is_once_task {
2624            let old = task.set_in_progress(InProgressState::Scheduled {
2625                done_event,
2626                reason: TaskExecutionReason::Stale,
2627            });
2628            debug_assert!(old.is_none(), "InProgress already exists");
2629            return (true, None);
2630        }
2631
2632        // Set the output if it has changed
2633        let mut old_content = None;
2634        if let Some(value) = new_output {
2635            old_content = task.set_output(value);
2636        }
2637
2638        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2639        // to be invalidated and we can mark it as immutable.
2640        if is_now_immutable {
2641            task.set_immutable(true);
2642        }
2643
2644        // Notify in progress cells and remove all of them
2645        let in_progress_cells = task.take_in_progress_cells();
2646        if let Some(ref cells) = in_progress_cells {
2647            for state in cells.values() {
2648                state.event.notify(usize::MAX);
2649            }
2650        }
2651
2652        // Compute the new dirty state
2653        let new_dirtyness = if session_dependent {
2654            Some(Dirtyness::SessionDependent)
2655        } else {
2656            None
2657        };
2658        #[cfg(feature = "verify_determinism")]
2659        let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2660        let data_update = task.update_dirty_state(new_dirtyness);
2661
2662        #[cfg(feature = "verify_determinism")]
2663        let reschedule =
2664            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2665        #[cfg(not(feature = "verify_determinism"))]
2666        let reschedule = false;
2667        if reschedule {
2668            let old = task.set_in_progress(InProgressState::Scheduled {
2669                done_event,
2670                reason: TaskExecutionReason::Stale,
2671            });
2672            debug_assert!(old.is_none(), "InProgress already exists");
2673            drop(task);
2674        } else {
2675            drop(task);
2676
2677            // Notify dependent tasks that are waiting for this task to finish
2678            done_event.notify(usize::MAX);
2679        }
2680
2681        drop(old_content);
2682
2683        if let Some(data_update) = data_update {
2684            AggregationUpdateQueue::run(data_update, ctx);
2685        }
2686
2687        // We return so the data can be dropped outside of critical sections
2688        (reschedule, in_progress_cells)
2689    }
2690
2691    fn task_execution_completed_cleanup(
2692        &self,
2693        ctx: &mut impl ExecuteContext<'_>,
2694        task_id: TaskId,
2695        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2696        is_error: bool,
2697    ) -> Vec<SharedReference> {
2698        let mut task = ctx.task(task_id, TaskDataCategory::All);
2699        let mut removed_cell_data = Vec::new();
2700        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2701        // after an error as it is likely transient and we want to keep the dependent tasks
2702        // clean to avoid re-executions.
2703        // NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2704        // skips cell_type_max_index updates on error.
2705        if !is_error {
2706            // Remove no longer existing cells and
2707            // find all outdated data items (removed cells, outdated edges)
2708            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2709            // anyway and we want to avoid needless re-executions. When the cells become
2710            // used again, they are invalidated from the update cell operation.
2711            // Remove cell data for cells that no longer exist
2712            let to_remove_persistent: Vec<_> = task
2713                .iter_persistent_cell_data()
2714                .filter_map(|(cell, _)| {
2715                    cell_counters
2716                        .get(&cell.type_id)
2717                        .is_none_or(|start_index| cell.index >= *start_index)
2718                        .then_some(*cell)
2719                })
2720                .collect();
2721
2722            // Remove transient cell data for cells that no longer exist
2723            let to_remove_transient: Vec<_> = task
2724                .iter_transient_cell_data()
2725                .filter_map(|(cell, _)| {
2726                    cell_counters
2727                        .get(&cell.type_id)
2728                        .is_none_or(|start_index| cell.index >= *start_index)
2729                        .then_some(*cell)
2730                })
2731                .collect();
2732            removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2733            for cell in to_remove_persistent {
2734                if let Some(data) = task.remove_persistent_cell_data(&cell) {
2735                    removed_cell_data.push(data.into_untyped());
2736                }
2737            }
2738            for cell in to_remove_transient {
2739                if let Some(data) = task.remove_transient_cell_data(&cell) {
2740                    removed_cell_data.push(data);
2741                }
2742            }
2743            // Remove cell_data_hash entries for cells that no longer exist
2744            let to_remove_hash: Vec<_> = task
2745                .iter_cell_data_hash()
2746                .filter_map(|(cell, _)| {
2747                    cell_counters
2748                        .get(&cell.type_id)
2749                        .is_none_or(|start_index| cell.index >= *start_index)
2750                        .then_some(*cell)
2751                })
2752                .collect();
2753            for cell in to_remove_hash {
2754                task.remove_cell_data_hash(&cell);
2755            }
2756        }
2757
2758        // Clean up task storage after execution:
2759        // - Shrink collections marked with shrink_on_completion
2760        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2761        task.cleanup_after_execution();
2762
2763        drop(task);
2764
2765        // Return so we can drop outside of critical sections
2766        removed_cell_data
2767    }
2768
2769    fn run_backend_job<'a>(
2770        self: &'a Arc<Self>,
2771        job: TurboTasksBackendJob,
2772        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2773    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2774        Box::pin(async move {
2775            match job {
2776                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2777                    debug_assert!(self.should_persist());
2778
2779                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2780                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2781                    let mut idle_start_listener = self.idle_start_event.listen();
2782                    let mut idle_end_listener = self.idle_end_event.listen();
2783                    let mut fresh_idle = true;
2784                    loop {
2785                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2786                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2787                        let idle_timeout = *IDLE_TIMEOUT;
2788                        let (time, mut reason) =
2789                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2790                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2791                            } else {
2792                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2793                            };
2794
2795                        let until = last_snapshot + time;
2796                        if until > Instant::now() {
2797                            let mut stop_listener = self.stopping_event.listen();
2798                            if self.stopping.load(Ordering::Acquire) {
2799                                return;
2800                            }
2801                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2802                                Instant::now() + idle_timeout
2803                            } else {
2804                                far_future()
2805                            };
2806                            loop {
2807                                tokio::select! {
2808                                    _ = &mut stop_listener => {
2809                                        return;
2810                                    },
2811                                    _ = &mut idle_start_listener => {
2812                                        fresh_idle = true;
2813                                        idle_time = Instant::now() + idle_timeout;
2814                                        idle_start_listener = self.idle_start_event.listen()
2815                                    },
2816                                    _ = &mut idle_end_listener => {
2817                                        idle_time = until + idle_timeout;
2818                                        idle_end_listener = self.idle_end_event.listen()
2819                                    },
2820                                    _ = tokio::time::sleep_until(until) => {
2821                                        break;
2822                                    },
2823                                    _ = tokio::time::sleep_until(idle_time) => {
2824                                        if turbo_tasks.is_idle() {
2825                                            reason = "idle timeout";
2826                                            break;
2827                                        }
2828                                    },
2829                                }
2830                            }
2831                        }
2832
2833                        let this = self.clone();
2834                        // Create a root span shared by both the snapshot/persist
2835                        // work and the subsequent compaction so they appear
2836                        // grouped together in trace viewers.
2837                        let background_span =
2838                            tracing::info_span!(parent: None, "background snapshot");
2839                        let snapshot =
2840                            this.snapshot_and_persist(background_span.id(), reason, turbo_tasks);
2841                        if let Some((snapshot_start, new_data)) = snapshot {
2842                            last_snapshot = snapshot_start;
2843
2844                            // Compact while idle (up to limit), regardless of
2845                            // whether the snapshot had new data.
2846                            // `background_span` is not entered here because
2847                            // `EnteredSpan` is `!Send` and would prevent the
2848                            // future from being sent across threads when it
2849                            // suspends at the `select!` await below.
2850                            const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2851                            for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2852                                let idle_ended = tokio::select! {
2853                                    biased;
2854                                    _ = &mut idle_end_listener => {
2855                                        idle_end_listener = self.idle_end_event.listen();
2856                                        true
2857                                    },
2858                                    _ = std::future::ready(()) => false,
2859                                };
2860                                if idle_ended {
2861                                    break;
2862                                }
2863                                // Enter the span only around the synchronous
2864                                // compact() call so we never hold an
2865                                // `EnteredSpan` across an await point.
2866                                let _compact_span = tracing::info_span!(
2867                                    parent: background_span.id(),
2868                                    "compact database"
2869                                )
2870                                .entered();
2871                                match self.backing_storage.compact() {
2872                                    Ok(true) => {}
2873                                    Ok(false) => break,
2874                                    Err(err) => {
2875                                        eprintln!("Compaction failed: {err:?}");
2876                                        break;
2877                                    }
2878                                }
2879                            }
2880
2881                            if !new_data {
2882                                fresh_idle = false;
2883                                continue;
2884                            }
2885                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2886                            self.last_snapshot.store(
2887                                last_snapshot.as_millis().try_into().unwrap(),
2888                                Ordering::Relaxed,
2889                            );
2890
2891                            turbo_tasks.schedule_backend_background_job(
2892                                TurboTasksBackendJob::FollowUpSnapshot,
2893                            );
2894                            return;
2895                        }
2896                    }
2897                }
2898            }
2899        })
2900    }
2901
2902    fn try_read_own_task_cell(
2903        &self,
2904        task_id: TaskId,
2905        cell: CellId,
2906        options: ReadCellOptions,
2907        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2908    ) -> Result<TypedCellContent> {
2909        let mut ctx = self.execute_context(turbo_tasks);
2910        let task = ctx.task(task_id, TaskDataCategory::Data);
2911        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2912            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2913            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2914        } else {
2915            Ok(CellContent(None).into_typed(cell.type_id))
2916        }
2917    }
2918
2919    fn read_task_collectibles(
2920        &self,
2921        task_id: TaskId,
2922        collectible_type: TraitTypeId,
2923        reader_id: Option<TaskId>,
2924        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2925    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2926        let mut ctx = self.execute_context(turbo_tasks);
2927        let mut collectibles = AutoMap::default();
2928        {
2929            let mut task = ctx.task(task_id, TaskDataCategory::All);
2930            // Ensure it's an root node
2931            loop {
2932                let aggregation_number = get_aggregation_number(&task);
2933                if is_root_node(aggregation_number) {
2934                    break;
2935                }
2936                drop(task);
2937                AggregationUpdateQueue::run(
2938                    AggregationUpdateJob::UpdateAggregationNumber {
2939                        task_id,
2940                        base_aggregation_number: u32::MAX,
2941                        distance: None,
2942                    },
2943                    &mut ctx,
2944                );
2945                task = ctx.task(task_id, TaskDataCategory::All);
2946            }
2947            for (collectible, count) in task.iter_aggregated_collectibles() {
2948                if *count > 0 && collectible.collectible_type == collectible_type {
2949                    *collectibles
2950                        .entry(RawVc::TaskCell(
2951                            collectible.cell.task,
2952                            collectible.cell.cell,
2953                        ))
2954                        .or_insert(0) += 1;
2955                }
2956            }
2957            for (&collectible, &count) in task.iter_collectibles() {
2958                if collectible.collectible_type == collectible_type {
2959                    *collectibles
2960                        .entry(RawVc::TaskCell(
2961                            collectible.cell.task,
2962                            collectible.cell.cell,
2963                        ))
2964                        .or_insert(0) += count;
2965                }
2966            }
2967            if let Some(reader_id) = reader_id {
2968                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2969            }
2970        }
2971        if let Some(reader_id) = reader_id {
2972            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2973            let target = CollectiblesRef {
2974                task: task_id,
2975                collectible_type,
2976            };
2977            if !reader.remove_outdated_collectibles_dependencies(&target) {
2978                let _ = reader.add_collectibles_dependencies(target);
2979            }
2980        }
2981        collectibles
2982    }
2983
2984    fn emit_collectible(
2985        &self,
2986        collectible_type: TraitTypeId,
2987        collectible: RawVc,
2988        task_id: TaskId,
2989        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2990    ) {
2991        self.assert_valid_collectible(task_id, collectible);
2992
2993        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2994            panic!("Collectibles need to be resolved");
2995        };
2996        let cell = CellRef {
2997            task: collectible_task,
2998            cell,
2999        };
3000        operation::UpdateCollectibleOperation::run(
3001            task_id,
3002            CollectibleRef {
3003                collectible_type,
3004                cell,
3005            },
3006            1,
3007            self.execute_context(turbo_tasks),
3008        );
3009    }
3010
3011    fn unemit_collectible(
3012        &self,
3013        collectible_type: TraitTypeId,
3014        collectible: RawVc,
3015        count: u32,
3016        task_id: TaskId,
3017        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3018    ) {
3019        self.assert_valid_collectible(task_id, collectible);
3020
3021        let RawVc::TaskCell(collectible_task, cell) = collectible else {
3022            panic!("Collectibles need to be resolved");
3023        };
3024        let cell = CellRef {
3025            task: collectible_task,
3026            cell,
3027        };
3028        operation::UpdateCollectibleOperation::run(
3029            task_id,
3030            CollectibleRef {
3031                collectible_type,
3032                cell,
3033            },
3034            -(i32::try_from(count).unwrap()),
3035            self.execute_context(turbo_tasks),
3036        );
3037    }
3038
3039    fn update_task_cell(
3040        &self,
3041        task_id: TaskId,
3042        cell: CellId,
3043        is_serializable_cell_content: bool,
3044        content: CellContent,
3045        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3046        content_hash: Option<CellHash>,
3047        verification_mode: VerificationMode,
3048        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3049    ) {
3050        operation::UpdateCellOperation::run(
3051            task_id,
3052            cell,
3053            content,
3054            is_serializable_cell_content,
3055            updated_key_hashes,
3056            content_hash,
3057            verification_mode,
3058            self.execute_context(turbo_tasks),
3059        );
3060    }
3061
3062    fn mark_own_task_as_session_dependent(
3063        &self,
3064        task_id: TaskId,
3065        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3066    ) {
3067        if !self.should_track_dependencies() {
3068            // Without dependency tracking we don't need session dependent tasks
3069            return;
3070        }
3071        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3072        let mut ctx = self.execute_context(turbo_tasks);
3073        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3074        let aggregation_number = get_aggregation_number(&task);
3075        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3076            drop(task);
3077            // We want to use a high aggregation number to avoid large aggregation chains for
3078            // session dependent tasks (which change on every run)
3079            AggregationUpdateQueue::run(
3080                AggregationUpdateJob::UpdateAggregationNumber {
3081                    task_id,
3082                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3083                    distance: None,
3084                },
3085                &mut ctx,
3086            );
3087            task = ctx.task(task_id, TaskDataCategory::Meta);
3088        }
3089        if let Some(InProgressState::InProgress(box InProgressStateInner {
3090            session_dependent,
3091            ..
3092        })) = task.get_in_progress_mut()
3093        {
3094            *session_dependent = true;
3095        }
3096    }
3097
3098    fn mark_own_task_as_finished(
3099        &self,
3100        task: TaskId,
3101        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3102    ) {
3103        let mut ctx = self.execute_context(turbo_tasks);
3104        let mut task = ctx.task(task, TaskDataCategory::Data);
3105        if let Some(InProgressState::InProgress(box InProgressStateInner {
3106            marked_as_completed,
3107            ..
3108        })) = task.get_in_progress_mut()
3109        {
3110            *marked_as_completed = true;
3111            // TODO this should remove the dirty state (also check session_dependent)
3112            // but this would break some assumptions for strongly consistent reads.
3113            // Client tasks are not connected yet, so we wouldn't wait for them.
3114            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
3115        }
3116    }
3117
3118    fn connect_task(
3119        &self,
3120        task: TaskId,
3121        parent_task: Option<TaskId>,
3122        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3123    ) {
3124        self.assert_not_persistent_calling_transient(parent_task, task, None);
3125        ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3126    }
3127
3128    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3129        let task_id = self.transient_task_id_factory.get();
3130        {
3131            let mut task = self.storage.access_mut(task_id);
3132            task.init_transient_task(task_id, task_type, self.should_track_activeness());
3133        }
3134        #[cfg(feature = "verify_aggregation_graph")]
3135        self.root_tasks.lock().insert(task_id);
3136        task_id
3137    }
3138
3139    fn dispose_root_task(
3140        &self,
3141        task_id: TaskId,
3142        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3143    ) {
3144        #[cfg(feature = "verify_aggregation_graph")]
3145        self.root_tasks.lock().remove(&task_id);
3146
3147        let mut ctx = self.execute_context(turbo_tasks);
3148        let mut task = ctx.task(task_id, TaskDataCategory::All);
3149        let is_dirty = task.is_dirty();
3150        let has_dirty_containers = task.has_dirty_containers();
3151        if is_dirty.is_some() || has_dirty_containers {
3152            if let Some(activeness_state) = task.get_activeness_mut() {
3153                // We will finish the task, but it would be removed after the task is done
3154                activeness_state.unset_root_type();
3155                activeness_state.set_active_until_clean();
3156            };
3157        } else if let Some(activeness_state) = task.take_activeness() {
3158            // Technically nobody should be listening to this event, but just in case
3159            // we notify it anyway
3160            activeness_state.all_clean_event.notify(usize::MAX);
3161        }
3162    }
3163
3164    #[cfg(feature = "verify_aggregation_graph")]
3165    fn verify_aggregation_graph(
3166        &self,
3167        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3168        idle: bool,
3169    ) {
3170        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3171            return;
3172        }
3173        use std::{collections::VecDeque, env, io::stdout};
3174
3175        use crate::backend::operation::{get_uppers, is_aggregating_node};
3176
3177        let mut ctx = self.execute_context(turbo_tasks);
3178        let root_tasks = self.root_tasks.lock().clone();
3179
3180        for task_id in root_tasks.into_iter() {
3181            let mut queue = VecDeque::new();
3182            let mut visited = FxHashSet::default();
3183            let mut aggregated_nodes = FxHashSet::default();
3184            let mut collectibles = FxHashMap::default();
3185            let root_task_id = task_id;
3186            visited.insert(task_id);
3187            aggregated_nodes.insert(task_id);
3188            queue.push_back(task_id);
3189            let mut counter = 0;
3190            while let Some(task_id) = queue.pop_front() {
3191                counter += 1;
3192                if counter % 100000 == 0 {
3193                    println!(
3194                        "queue={}, visited={}, aggregated_nodes={}",
3195                        queue.len(),
3196                        visited.len(),
3197                        aggregated_nodes.len()
3198                    );
3199                }
3200                let task = ctx.task(task_id, TaskDataCategory::All);
3201                if idle && !self.is_idle.load(Ordering::Relaxed) {
3202                    return;
3203                }
3204
3205                let uppers = get_uppers(&task);
3206                if task_id != root_task_id
3207                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3208                {
3209                    panic!(
3210                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3211                         {:?})",
3212                        task_id,
3213                        task.get_task_description(),
3214                        uppers
3215                    );
3216                }
3217
3218                for (collectible, _) in task.iter_aggregated_collectibles() {
3219                    collectibles
3220                        .entry(*collectible)
3221                        .or_insert_with(|| (false, Vec::new()))
3222                        .1
3223                        .push(task_id);
3224                }
3225
3226                for (&collectible, &value) in task.iter_collectibles() {
3227                    if value > 0 {
3228                        if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3229                            *flag = true
3230                        } else {
3231                            panic!(
3232                                "Task {} has a collectible {:?} that is not in any upper task",
3233                                task_id, collectible
3234                            );
3235                        }
3236                    }
3237                }
3238
3239                let is_dirty = task.has_dirty();
3240                let has_dirty_container = task.has_dirty_containers();
3241                let should_be_in_upper = is_dirty || has_dirty_container;
3242
3243                let aggregation_number = get_aggregation_number(&task);
3244                if is_aggregating_node(aggregation_number) {
3245                    aggregated_nodes.insert(task_id);
3246                }
3247                // println!(
3248                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3249                //     ctx.get_task_description(task_id),
3250                //     uppers
3251                // );
3252
3253                for child_id in task.iter_children() {
3254                    // println!("{task_id}: child -> {child_id}");
3255                    if visited.insert(child_id) {
3256                        queue.push_back(child_id);
3257                    }
3258                }
3259                drop(task);
3260
3261                if should_be_in_upper {
3262                    for upper_id in uppers {
3263                        let upper = ctx.task(upper_id, TaskDataCategory::All);
3264                        let in_upper = upper
3265                            .get_aggregated_dirty_containers(&task_id)
3266                            .is_some_and(|&dirty| dirty > 0);
3267                        if !in_upper {
3268                            let containers: Vec<_> = upper
3269                                .iter_aggregated_dirty_containers()
3270                                .map(|(&k, &v)| (k, v))
3271                                .collect();
3272                            let upper_task_desc = upper.get_task_description();
3273                            drop(upper);
3274                            panic!(
3275                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3276                                 ({})\nThese dirty containers are present:\n{:#?}",
3277                                task_id,
3278                                ctx.task(task_id, TaskDataCategory::Data)
3279                                    .get_task_description(),
3280                                upper_id,
3281                                upper_task_desc,
3282                                containers,
3283                            );
3284                        }
3285                    }
3286                }
3287            }
3288
3289            for (collectible, (flag, task_ids)) in collectibles {
3290                if !flag {
3291                    use std::io::Write;
3292                    let mut stdout = stdout().lock();
3293                    writeln!(
3294                        stdout,
3295                        "{:?} that is not emitted in any child task but in these aggregated \
3296                         tasks: {:#?}",
3297                        collectible,
3298                        task_ids
3299                            .iter()
3300                            .map(|t| format!(
3301                                "{t} {}",
3302                                ctx.task(*t, TaskDataCategory::Data).get_task_description()
3303                            ))
3304                            .collect::<Vec<_>>()
3305                    )
3306                    .unwrap();
3307
3308                    let task_id = collectible.cell.task;
3309                    let mut queue = {
3310                        let task = ctx.task(task_id, TaskDataCategory::All);
3311                        get_uppers(&task)
3312                    };
3313                    let mut visited = FxHashSet::default();
3314                    for &upper_id in queue.iter() {
3315                        visited.insert(upper_id);
3316                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3317                    }
3318                    while let Some(task_id) = queue.pop() {
3319                        let task = ctx.task(task_id, TaskDataCategory::All);
3320                        let desc = task.get_task_description();
3321                        let aggregated_collectible = task
3322                            .get_aggregated_collectibles(&collectible)
3323                            .copied()
3324                            .unwrap_or_default();
3325                        let uppers = get_uppers(&task);
3326                        drop(task);
3327                        writeln!(
3328                            stdout,
3329                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3330                        )
3331                        .unwrap();
3332                        if task_ids.contains(&task_id) {
3333                            writeln!(
3334                                stdout,
3335                                "Task has an upper connection to an aggregated task that doesn't \
3336                                 reference it. Upper connection is invalid!"
3337                            )
3338                            .unwrap();
3339                        }
3340                        for upper_id in uppers {
3341                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3342                            if !visited.contains(&upper_id) {
3343                                queue.push(upper_id);
3344                            }
3345                        }
3346                    }
3347                    panic!("See stdout for more details");
3348                }
3349            }
3350        }
3351    }
3352
3353    fn assert_not_persistent_calling_transient(
3354        &self,
3355        parent_id: Option<TaskId>,
3356        child_id: TaskId,
3357        cell_id: Option<CellId>,
3358    ) {
3359        if let Some(parent_id) = parent_id
3360            && !parent_id.is_transient()
3361            && child_id.is_transient()
3362        {
3363            self.panic_persistent_calling_transient(
3364                self.debug_get_task_description(parent_id),
3365                self.debug_get_cached_task_type(child_id).as_deref(),
3366                cell_id,
3367            );
3368        }
3369    }
3370
3371    fn panic_persistent_calling_transient(
3372        &self,
3373        parent: String,
3374        child: Option<&CachedTaskType>,
3375        cell_id: Option<CellId>,
3376    ) {
3377        let transient_reason = if let Some(child) = child {
3378            Cow::Owned(format!(
3379                " The callee is transient because it depends on:\n{}",
3380                self.debug_trace_transient_task(child, cell_id),
3381            ))
3382        } else {
3383            Cow::Borrowed("")
3384        };
3385        panic!(
3386            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3387            parent,
3388            child.map_or("unknown", |t| t.get_name()),
3389            transient_reason,
3390        );
3391    }
3392
3393    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3394        // these checks occur in a potentially hot codepath, but they're cheap
3395        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3396            // This should never happen: The collectible APIs use ResolvedVc
3397            let task_info = if let Some(col_task_ty) = collectible
3398                .try_get_task_id()
3399                .map(|t| self.debug_get_task_description(t))
3400            {
3401                Cow::Owned(format!(" (return type of {col_task_ty})"))
3402            } else {
3403                Cow::Borrowed("")
3404            };
3405            panic!("Collectible{task_info} must be a ResolvedVc")
3406        };
3407        if col_task_id.is_transient() && !task_id.is_transient() {
3408            let transient_reason =
3409                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3410                    Cow::Owned(format!(
3411                        ". The collectible is transient because it depends on:\n{}",
3412                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3413                    ))
3414                } else {
3415                    Cow::Borrowed("")
3416                };
3417            // this should never happen: How would a persistent function get a transient Vc?
3418            panic!(
3419                "Collectible is transient, transient collectibles cannot be emitted from \
3420                 persistent tasks{transient_reason}",
3421            )
3422        }
3423    }
3424}
3425
3426impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3427    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3428        self.0.startup(turbo_tasks);
3429    }
3430
3431    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3432        self.0.stopping();
3433    }
3434
3435    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3436        self.0.stop(turbo_tasks);
3437    }
3438
3439    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3440        self.0.idle_start(turbo_tasks);
3441    }
3442
3443    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3444        self.0.idle_end();
3445    }
3446
3447    fn get_or_create_persistent_task(
3448        &self,
3449        task_type: CachedTaskType,
3450        parent_task: Option<TaskId>,
3451        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3452    ) -> TaskId {
3453        self.0
3454            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3455    }
3456
3457    fn get_or_create_transient_task(
3458        &self,
3459        task_type: CachedTaskType,
3460        parent_task: Option<TaskId>,
3461        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3462    ) -> TaskId {
3463        self.0
3464            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3465    }
3466
3467    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3468        self.0.invalidate_task(task_id, turbo_tasks);
3469    }
3470
3471    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3472        self.0.invalidate_tasks(tasks, turbo_tasks);
3473    }
3474
3475    fn invalidate_tasks_set(
3476        &self,
3477        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3478        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3479    ) {
3480        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3481    }
3482
3483    fn invalidate_serialization(
3484        &self,
3485        task_id: TaskId,
3486        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3487    ) {
3488        self.0.invalidate_serialization(task_id, turbo_tasks);
3489    }
3490
3491    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3492        self.0.task_execution_canceled(task, turbo_tasks)
3493    }
3494
3495    fn try_start_task_execution(
3496        &self,
3497        task_id: TaskId,
3498        priority: TaskPriority,
3499        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3500    ) -> Option<TaskExecutionSpec<'_>> {
3501        self.0
3502            .try_start_task_execution(task_id, priority, turbo_tasks)
3503    }
3504
3505    fn task_execution_completed(
3506        &self,
3507        task_id: TaskId,
3508        result: Result<RawVc, TurboTasksExecutionError>,
3509        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3510        #[cfg(feature = "verify_determinism")] stateful: bool,
3511        has_invalidator: bool,
3512        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3513    ) -> bool {
3514        self.0.task_execution_completed(
3515            task_id,
3516            result,
3517            cell_counters,
3518            #[cfg(feature = "verify_determinism")]
3519            stateful,
3520            has_invalidator,
3521            turbo_tasks,
3522        )
3523    }
3524
3525    type BackendJob = TurboTasksBackendJob;
3526
3527    fn run_backend_job<'a>(
3528        &'a self,
3529        job: Self::BackendJob,
3530        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3531    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3532        self.0.run_backend_job(job, turbo_tasks)
3533    }
3534
3535    fn try_read_task_output(
3536        &self,
3537        task_id: TaskId,
3538        reader: Option<TaskId>,
3539        options: ReadOutputOptions,
3540        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3541    ) -> Result<Result<RawVc, EventListener>> {
3542        self.0
3543            .try_read_task_output(task_id, reader, options, turbo_tasks)
3544    }
3545
3546    fn try_read_task_cell(
3547        &self,
3548        task_id: TaskId,
3549        cell: CellId,
3550        reader: Option<TaskId>,
3551        options: ReadCellOptions,
3552        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3553    ) -> Result<Result<TypedCellContent, EventListener>> {
3554        self.0
3555            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3556    }
3557
3558    fn try_read_own_task_cell(
3559        &self,
3560        task_id: TaskId,
3561        cell: CellId,
3562        options: ReadCellOptions,
3563        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3564    ) -> Result<TypedCellContent> {
3565        self.0
3566            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3567    }
3568
3569    fn read_task_collectibles(
3570        &self,
3571        task_id: TaskId,
3572        collectible_type: TraitTypeId,
3573        reader: Option<TaskId>,
3574        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3575    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3576        self.0
3577            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3578    }
3579
3580    fn emit_collectible(
3581        &self,
3582        collectible_type: TraitTypeId,
3583        collectible: RawVc,
3584        task_id: TaskId,
3585        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3586    ) {
3587        self.0
3588            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3589    }
3590
3591    fn unemit_collectible(
3592        &self,
3593        collectible_type: TraitTypeId,
3594        collectible: RawVc,
3595        count: u32,
3596        task_id: TaskId,
3597        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3598    ) {
3599        self.0
3600            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3601    }
3602
3603    fn update_task_cell(
3604        &self,
3605        task_id: TaskId,
3606        cell: CellId,
3607        is_serializable_cell_content: bool,
3608        content: CellContent,
3609        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3610        content_hash: Option<CellHash>,
3611        verification_mode: VerificationMode,
3612        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3613    ) {
3614        self.0.update_task_cell(
3615            task_id,
3616            cell,
3617            is_serializable_cell_content,
3618            content,
3619            updated_key_hashes,
3620            content_hash,
3621            verification_mode,
3622            turbo_tasks,
3623        );
3624    }
3625
3626    fn mark_own_task_as_finished(
3627        &self,
3628        task_id: TaskId,
3629        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3630    ) {
3631        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3632    }
3633
3634    fn mark_own_task_as_session_dependent(
3635        &self,
3636        task: TaskId,
3637        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3638    ) {
3639        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3640    }
3641
3642    fn connect_task(
3643        &self,
3644        task: TaskId,
3645        parent_task: Option<TaskId>,
3646        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3647    ) {
3648        self.0.connect_task(task, parent_task, turbo_tasks);
3649    }
3650
3651    fn create_transient_task(
3652        &self,
3653        task_type: TransientTaskType,
3654        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3655    ) -> TaskId {
3656        self.0.create_transient_task(task_type)
3657    }
3658
3659    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3660        self.0.dispose_root_task(task_id, turbo_tasks);
3661    }
3662
3663    fn task_statistics(&self) -> &TaskStatisticsApi {
3664        &self.0.task_statistics
3665    }
3666
3667    fn is_tracking_dependencies(&self) -> bool {
3668        self.0.options.dependency_tracking
3669    }
3670
3671    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3672        self.0.get_task_name(task, turbo_tasks)
3673    }
3674}
3675
3676enum DebugTraceTransientTask {
3677    Cached {
3678        task_name: &'static str,
3679        cell_type_id: Option<ValueTypeId>,
3680        cause_self: Option<Box<DebugTraceTransientTask>>,
3681        cause_args: Vec<DebugTraceTransientTask>,
3682    },
3683    /// This representation is used when this task is a duplicate of one previously shown
3684    Collapsed {
3685        task_name: &'static str,
3686        cell_type_id: Option<ValueTypeId>,
3687    },
3688    Uncached {
3689        cell_type_id: Option<ValueTypeId>,
3690    },
3691}
3692
3693impl DebugTraceTransientTask {
3694    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3695        let indent = "    ".repeat(level);
3696        f.write_str(&indent)?;
3697
3698        fn fmt_cell_type_id(
3699            f: &mut fmt::Formatter<'_>,
3700            cell_type_id: Option<ValueTypeId>,
3701        ) -> fmt::Result {
3702            if let Some(ty) = cell_type_id {
3703                write!(
3704                    f,
3705                    " (read cell of type {})",
3706                    get_value_type(ty).ty.global_name
3707                )
3708            } else {
3709                Ok(())
3710            }
3711        }
3712
3713        // write the name and type
3714        match self {
3715            Self::Cached {
3716                task_name,
3717                cell_type_id,
3718                ..
3719            }
3720            | Self::Collapsed {
3721                task_name,
3722                cell_type_id,
3723                ..
3724            } => {
3725                f.write_str(task_name)?;
3726                fmt_cell_type_id(f, *cell_type_id)?;
3727                if matches!(self, Self::Collapsed { .. }) {
3728                    f.write_str(" (collapsed)")?;
3729                }
3730            }
3731            Self::Uncached { cell_type_id } => {
3732                f.write_str("unknown transient task")?;
3733                fmt_cell_type_id(f, *cell_type_id)?;
3734            }
3735        }
3736        f.write_char('\n')?;
3737
3738        // write any extra "cause" information we might have
3739        if let Self::Cached {
3740            cause_self,
3741            cause_args,
3742            ..
3743        } = self
3744        {
3745            if let Some(c) = cause_self {
3746                writeln!(f, "{indent}  self:")?;
3747                c.fmt_indented(f, level + 1)?;
3748            }
3749            if !cause_args.is_empty() {
3750                writeln!(f, "{indent}  args:")?;
3751                for c in cause_args {
3752                    c.fmt_indented(f, level + 1)?;
3753                }
3754            }
3755        }
3756        Ok(())
3757    }
3758}
3759
3760impl fmt::Display for DebugTraceTransientTask {
3761    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3762        self.fmt_indented(f, 0)
3763    }
3764}
3765
3766// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3767fn far_future() -> Instant {
3768    // Roughly 30 years from now.
3769    // API does not provide a way to obtain max `Instant`
3770    // or convert specific date in the future to instant.
3771    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3772    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3773}
3774
3775/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3776/// buffer.
3777/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3778/// resulting buffer sizes.
3779///
3780/// TODO: The `Result` return type is an artifact of the bincode `Encode` trait requiring
3781/// fallible encoding. In practice, encoding to a `SmallVec` is infallible (no I/O), and the only
3782/// real failure mode — a `TypedSharedReference` whose value type has no bincode impl — is a
3783/// programmer error caught by the panic in the caller. Consider making the bincode encoding trait
3784/// infallible (i.e. returning `()` instead of `Result<(), EncodeError>`) to eliminate the
3785/// spurious `Result` threading throughout the encode path.
3786fn encode_task_data(
3787    task: TaskId,
3788    data: &TaskStorage,
3789    category: SpecificTaskDataCategory,
3790    scratch_buffer: &mut TurboBincodeBuffer,
3791) -> Result<TurboBincodeBuffer> {
3792    scratch_buffer.clear();
3793    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3794    data.encode(category, &mut encoder)?;
3795
3796    if cfg!(feature = "verify_serialization") {
3797        TaskStorage::new()
3798            .decode(
3799                category,
3800                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3801            )
3802            .with_context(|| {
3803                format!(
3804                    "expected to be able to decode serialized data for '{category:?}' information \
3805                     for {task}"
3806                )
3807            })?;
3808    }
3809    Ok(SmallVec::from_slice(scratch_buffer))
3810}