turbo_tasks_backend/backend/
mod.rs

1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7    borrow::Cow,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    pin::Pin,
13    sync::{
14        Arc, LazyLock,
15        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16    },
17};
18
19use anyhow::{Context, Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use tracing::{Span, trace_span};
27use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
28use turbo_tasks::{
29    CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
30    ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
31    TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, ValueTypeId,
32    backend::{
33        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
34        TurboTasksExecutionError, TypedCellContent, VerificationMode,
35    },
36    event::{Event, EventDescription, EventListener},
37    message_queue::TimingEvent,
38    registry::get_value_type,
39    scope::scope_and_block,
40    task_statistics::TaskStatisticsApi,
41    trace::TraceRawVcs,
42    turbo_tasks,
43    util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
44};
45
46pub use self::{
47    operation::AnyOperation,
48    storage::{SpecificTaskDataCategory, TaskDataCategory},
49};
50#[cfg(feature = "trace_task_dirty")]
51use crate::backend::operation::TaskDirtyCause;
52use crate::{
53    backend::{
54        operation::{
55            AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
56            CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
57            ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
58            TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
59            is_root_node, make_task_dirty_internal, prepare_new_children,
60        },
61        storage::Storage,
62        storage_schema::{TaskStorage, TaskStorageAccessors},
63    },
64    backing_storage::BackingStorage,
65    data::{
66        ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
67        InProgressState, InProgressStateInner, OutputValue, TransientTask,
68    },
69    utils::{
70        arc_or_owned::ArcOrOwned,
71        chunked_vec::ChunkedVec,
72        dash_map_drop_contents::drop_contents,
73        dash_map_raw_entry::{RawEntry, raw_entry},
74        ptr_eq_arc::PtrEqArc,
75        shard_amount::compute_shard_amount,
76        sharded::Sharded,
77        swap_retain,
78    },
79};
80
81/// Threshold for parallelizing making dependent tasks dirty.
82/// If the number of dependent tasks exceeds this threshold,
83/// the operation will be parallelized.
84const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
85
86const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
87
88/// Configurable idle timeout for snapshot persistence.
89/// Defaults to 2 seconds if not set or if the value is invalid.
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91    std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92        .ok()
93        .and_then(|v| v.parse::<u64>().ok())
94        .map(Duration::from_millis)
95        .unwrap_or(Duration::from_secs(2))
96});
97
98struct SnapshotRequest {
99    snapshot_requested: bool,
100    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
101}
102
103impl SnapshotRequest {
104    fn new() -> Self {
105        Self {
106            snapshot_requested: false,
107            suspended_operations: FxHashSet::default(),
108        }
109    }
110}
111
112pub enum StorageMode {
113    /// Queries the storage for cache entries that don't exist locally.
114    ReadOnly,
115    /// Queries the storage for cache entries that don't exist locally.
116    /// Regularly pushes changes to the backing storage.
117    ReadWrite,
118    /// Queries the storage for cache entries that don't exist locally.
119    /// On shutdown, pushes all changes to the backing storage.
120    ReadWriteOnShutdown,
121}
122
123pub struct BackendOptions {
124    /// Enables dependency tracking.
125    ///
126    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
127    /// forever.
128    pub dependency_tracking: bool,
129
130    /// Enables active tracking.
131    ///
132    /// Automatically disabled when `dependency_tracking` is disabled.
133    ///
134    /// When disabled: All tasks are considered as active.
135    pub active_tracking: bool,
136
137    /// Enables the backing storage.
138    pub storage_mode: Option<StorageMode>,
139
140    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
141    /// datastructures. If `None`, it will use the available parallelism.
142    pub num_workers: Option<usize>,
143
144    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
145    pub small_preallocation: bool,
146}
147
148impl Default for BackendOptions {
149    fn default() -> Self {
150        Self {
151            dependency_tracking: true,
152            active_tracking: true,
153            storage_mode: Some(StorageMode::ReadWrite),
154            num_workers: None,
155            small_preallocation: false,
156        }
157    }
158}
159
160pub enum TurboTasksBackendJob {
161    InitialSnapshot,
162    FollowUpSnapshot,
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
168
169struct TurboTasksBackendInner<B: BackingStorage> {
170    options: BackendOptions,
171
172    start_time: Instant,
173
174    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
175    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
176
177    persisted_task_cache_log: Option<TaskCacheLog>,
178    task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
179
180    storage: Storage,
181
182    /// When true, the backing_storage has data that is not in the local storage.
183    local_is_partial: AtomicBool,
184
185    /// Number of executing operations + Highest bit is set when snapshot is
186    /// requested. When that bit is set, operations should pause until the
187    /// snapshot is completed. When the bit is set and in progress counter
188    /// reaches zero, `operations_completed_when_snapshot_requested` is
189    /// triggered.
190    in_progress_operations: AtomicUsize,
191
192    snapshot_request: Mutex<SnapshotRequest>,
193    /// Condition Variable that is triggered when `in_progress_operations`
194    /// reaches zero while snapshot is requested. All operations are either
195    /// completed or suspended.
196    operations_suspended: Condvar,
197    /// Condition Variable that is triggered when a snapshot is completed and
198    /// operations can continue.
199    snapshot_completed: Condvar,
200    /// The timestamp of the last started snapshot since [`Self::start_time`].
201    last_snapshot: AtomicU64,
202
203    stopping: AtomicBool,
204    stopping_event: Event,
205    idle_start_event: Event,
206    idle_end_event: Event,
207    #[cfg(feature = "verify_aggregation_graph")]
208    is_idle: AtomicBool,
209
210    task_statistics: TaskStatisticsApi,
211
212    backing_storage: B,
213
214    #[cfg(feature = "verify_aggregation_graph")]
215    root_tasks: Mutex<FxHashSet<TaskId>>,
216}
217
218impl<B: BackingStorage> TurboTasksBackend<B> {
219    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
220        Self(Arc::new(TurboTasksBackendInner::new(
221            options,
222            backing_storage,
223        )))
224    }
225
226    pub fn backing_storage(&self) -> &B {
227        &self.0.backing_storage
228    }
229}
230
231impl<B: BackingStorage> TurboTasksBackendInner<B> {
232    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
233        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
234        let need_log = matches!(
235            options.storage_mode,
236            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
237        );
238        if !options.dependency_tracking {
239            options.active_tracking = false;
240        }
241        let small_preallocation = options.small_preallocation;
242        let next_task_id = backing_storage
243            .next_free_task_id()
244            .expect("Failed to get task id");
245        Self {
246            options,
247            start_time: Instant::now(),
248            persisted_task_id_factory: IdFactoryWithReuse::new(
249                next_task_id,
250                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
251            ),
252            transient_task_id_factory: IdFactoryWithReuse::new(
253                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
254                TaskId::MAX,
255            ),
256            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
257            task_cache: FxDashMap::default(),
258            local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
259            storage: Storage::new(shard_amount, small_preallocation),
260            in_progress_operations: AtomicUsize::new(0),
261            snapshot_request: Mutex::new(SnapshotRequest::new()),
262            operations_suspended: Condvar::new(),
263            snapshot_completed: Condvar::new(),
264            last_snapshot: AtomicU64::new(0),
265            stopping: AtomicBool::new(false),
266            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
267            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
268            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
269            #[cfg(feature = "verify_aggregation_graph")]
270            is_idle: AtomicBool::new(false),
271            task_statistics: TaskStatisticsApi::default(),
272            backing_storage,
273            #[cfg(feature = "verify_aggregation_graph")]
274            root_tasks: Default::default(),
275        }
276    }
277
278    fn execute_context<'a>(
279        &'a self,
280        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
281    ) -> impl ExecuteContext<'a> {
282        ExecuteContextImpl::new(self, turbo_tasks)
283    }
284
285    /// # Safety
286    ///
287    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
288    unsafe fn execute_context_with_tx<'e, 'tx>(
289        &'e self,
290        tx: Option<&'e B::ReadTransaction<'tx>>,
291        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
292    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
293    where
294        'tx: 'e,
295    {
296        // Safety: `tx` is from `self`.
297        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
298    }
299
300    fn suspending_requested(&self) -> bool {
301        self.should_persist()
302            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
303    }
304
305    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
306        #[cold]
307        fn operation_suspend_point_cold<B: BackingStorage>(
308            this: &TurboTasksBackendInner<B>,
309            suspend: impl FnOnce() -> AnyOperation,
310        ) {
311            let operation = Arc::new(suspend());
312            let mut snapshot_request = this.snapshot_request.lock();
313            if snapshot_request.snapshot_requested {
314                snapshot_request
315                    .suspended_operations
316                    .insert(operation.clone().into());
317                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
318                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
319                if value == SNAPSHOT_REQUESTED_BIT {
320                    this.operations_suspended.notify_all();
321                }
322                this.snapshot_completed
323                    .wait_while(&mut snapshot_request, |snapshot_request| {
324                        snapshot_request.snapshot_requested
325                    });
326                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
327                snapshot_request
328                    .suspended_operations
329                    .remove(&operation.into());
330            }
331        }
332
333        if self.suspending_requested() {
334            operation_suspend_point_cold(self, suspend);
335        }
336    }
337
338    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
339        if !self.should_persist() {
340            return OperationGuard { backend: None };
341        }
342        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
343        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
344            let mut snapshot_request = self.snapshot_request.lock();
345            if snapshot_request.snapshot_requested {
346                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
347                if value == SNAPSHOT_REQUESTED_BIT {
348                    self.operations_suspended.notify_all();
349                }
350                self.snapshot_completed
351                    .wait_while(&mut snapshot_request, |snapshot_request| {
352                        snapshot_request.snapshot_requested
353                    });
354                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
355            }
356        }
357        OperationGuard {
358            backend: Some(self),
359        }
360    }
361
362    fn should_persist(&self) -> bool {
363        matches!(
364            self.options.storage_mode,
365            Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
366        )
367    }
368
369    fn should_restore(&self) -> bool {
370        self.options.storage_mode.is_some()
371    }
372
373    fn should_track_dependencies(&self) -> bool {
374        self.options.dependency_tracking
375    }
376
377    fn should_track_activeness(&self) -> bool {
378        self.options.active_tracking
379    }
380
381    fn track_cache_hit(&self, task_type: &CachedTaskType) {
382        self.task_statistics
383            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
384    }
385
386    fn track_cache_miss(&self, task_type: &CachedTaskType) {
387        self.task_statistics
388            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
389    }
390}
391
392pub(crate) struct OperationGuard<'a, B: BackingStorage> {
393    backend: Option<&'a TurboTasksBackendInner<B>>,
394}
395
396impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
397    fn drop(&mut self) {
398        if let Some(backend) = self.backend {
399            let fetch_sub = backend
400                .in_progress_operations
401                .fetch_sub(1, Ordering::AcqRel);
402            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
403                backend.operations_suspended.notify_all();
404            }
405        }
406    }
407}
408
409/// Intermediate result of step 1 of task execution completion.
410struct TaskExecutionCompletePrepareResult {
411    pub new_children: FxHashSet<TaskId>,
412    pub is_now_immutable: bool,
413    #[cfg(feature = "verify_determinism")]
414    pub no_output_set: bool,
415    pub new_output: Option<OutputValue>,
416    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
417}
418
419// Operations
420impl<B: BackingStorage> TurboTasksBackendInner<B> {
421    /// # Safety
422    ///
423    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
424    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
425        &'l self,
426        tx: Option<&'l B::ReadTransaction<'tx>>,
427        parent_task: Option<TaskId>,
428        child_task: TaskId,
429        task_type: Option<ArcOrOwned<CachedTaskType>>,
430        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
431    ) {
432        operation::ConnectChildOperation::run(parent_task, child_task, task_type, unsafe {
433            self.execute_context_with_tx(tx, turbo_tasks)
434        });
435    }
436
437    fn connect_child(
438        &self,
439        parent_task: Option<TaskId>,
440        child_task: TaskId,
441        task_type: Option<ArcOrOwned<CachedTaskType>>,
442        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
443    ) {
444        operation::ConnectChildOperation::run(
445            parent_task,
446            child_task,
447            task_type,
448            self.execute_context(turbo_tasks),
449        );
450    }
451
452    fn try_read_task_output(
453        self: &Arc<Self>,
454        task_id: TaskId,
455        reader: Option<TaskId>,
456        options: ReadOutputOptions,
457        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
458    ) -> Result<Result<RawVc, EventListener>> {
459        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
460
461        let mut ctx = self.execute_context(turbo_tasks);
462        let need_reader_task = if self.should_track_dependencies()
463            && !matches!(options.tracking, ReadTracking::Untracked)
464            && reader.is_some_and(|reader_id| reader_id != task_id)
465            && let Some(reader_id) = reader
466            && reader_id != task_id
467        {
468            Some(reader_id)
469        } else {
470            None
471        };
472        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
473            // Having a task_pair here is not optimal, but otherwise this would lead to a race
474            // condition. See below.
475            // TODO(sokra): solve that in a more performant way.
476            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
477            (task, Some(reader))
478        } else {
479            (ctx.task(task_id, TaskDataCategory::All), None)
480        };
481
482        fn listen_to_done_event(
483            reader_description: Option<EventDescription>,
484            tracking: ReadTracking,
485            done_event: &Event,
486        ) -> EventListener {
487            done_event.listen_with_note(move || {
488                move || {
489                    if let Some(reader_description) = reader_description.as_ref() {
490                        format!(
491                            "try_read_task_output from {} ({})",
492                            reader_description, tracking
493                        )
494                    } else {
495                        format!("try_read_task_output ({})", tracking)
496                    }
497                }
498            })
499        }
500
501        fn check_in_progress(
502            task: &impl TaskGuard,
503            reader_description: Option<EventDescription>,
504            tracking: ReadTracking,
505        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
506        {
507            match task.get_in_progress() {
508                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
509                    listen_to_done_event(reader_description, tracking, done_event),
510                ))),
511                Some(InProgressState::InProgress(box InProgressStateInner {
512                    done_event, ..
513                })) => Some(Ok(Err(listen_to_done_event(
514                    reader_description,
515                    tracking,
516                    done_event,
517                )))),
518                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
519                    "{} was canceled",
520                    task.get_task_description()
521                ))),
522                None => None,
523            }
524        }
525
526        if matches!(options.consistency, ReadConsistency::Strong) {
527            // Ensure it's an root node
528            loop {
529                let aggregation_number = get_aggregation_number(&task);
530                if is_root_node(aggregation_number) {
531                    break;
532                }
533                drop(task);
534                drop(reader_task);
535                {
536                    let _span = tracing::trace_span!(
537                        "make root node for strongly consistent read",
538                        %task_id
539                    )
540                    .entered();
541                    AggregationUpdateQueue::run(
542                        AggregationUpdateJob::UpdateAggregationNumber {
543                            task_id,
544                            base_aggregation_number: u32::MAX,
545                            distance: None,
546                        },
547                        &mut ctx,
548                    );
549                }
550                (task, reader_task) = if let Some(reader_id) = need_reader_task {
551                    // TODO(sokra): see comment above
552                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
553                    (task, Some(reader))
554                } else {
555                    (ctx.task(task_id, TaskDataCategory::All), None)
556                }
557            }
558
559            let is_dirty = task.is_dirty();
560
561            // Check the dirty count of the root node
562            let has_dirty_containers = task.has_dirty_containers();
563            if has_dirty_containers || is_dirty.is_some() {
564                let activeness = task.get_activeness_mut();
565                let mut task_ids_to_schedule: Vec<_> = Vec::new();
566                // When there are dirty task, subscribe to the all_clean_event
567                let activeness = if let Some(activeness) = activeness {
568                    // This makes sure all tasks stay active and this task won't stale.
569                    // active_until_clean is automatically removed when this
570                    // task is clean.
571                    activeness.set_active_until_clean();
572                    activeness
573                } else {
574                    // If we don't have a root state, add one. This also makes sure all tasks stay
575                    // active and this task won't stale. active_until_clean
576                    // is automatically removed when this task is clean.
577                    if ctx.should_track_activeness() {
578                        // A newly added Activeness need to make sure to schedule the tasks
579                        task_ids_to_schedule = task.dirty_containers().collect();
580                        task_ids_to_schedule.push(task_id);
581                    }
582                    let activeness =
583                        task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
584                    activeness.set_active_until_clean();
585                    activeness
586                };
587                let listener = activeness.all_clean_event.listen_with_note(move || {
588                    let this = self.clone();
589                    let tt = turbo_tasks.pin();
590                    move || {
591                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
592                        let mut ctx = this.execute_context(tt);
593                        let mut visited = FxHashSet::default();
594                        fn indent(s: &str) -> String {
595                            s.split_inclusive('\n')
596                                .flat_map(|line: &str| ["  ", line].into_iter())
597                                .collect::<String>()
598                        }
599                        fn get_info(
600                            ctx: &mut impl ExecuteContext<'_>,
601                            task_id: TaskId,
602                            parent_and_count: Option<(TaskId, i32)>,
603                            visited: &mut FxHashSet<TaskId>,
604                        ) -> String {
605                            let task = ctx.task(task_id, TaskDataCategory::All);
606                            let is_dirty = task.is_dirty();
607                            let in_progress =
608                                task.get_in_progress()
609                                    .map_or("not in progress", |p| match p {
610                                        InProgressState::InProgress(_) => "in progress",
611                                        InProgressState::Scheduled { .. } => "scheduled",
612                                        InProgressState::Canceled => "canceled",
613                                    });
614                            let activeness = task.get_activeness().map_or_else(
615                                || "not active".to_string(),
616                                |activeness| format!("{activeness:?}"),
617                            );
618                            let aggregation_number = get_aggregation_number(&task);
619                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
620                            {
621                                let uppers = get_uppers(&task);
622                                !uppers.contains(&parent_task_id)
623                            } else {
624                                false
625                            };
626
627                            // Check the dirty count of the root node
628                            let has_dirty_containers = task.has_dirty_containers();
629
630                            let task_description = task.get_task_description();
631                            let is_dirty_label = if let Some(parent_priority) = is_dirty {
632                                format!(", dirty({parent_priority})")
633                            } else {
634                                String::new()
635                            };
636                            let has_dirty_containers_label = if has_dirty_containers {
637                                ", dirty containers"
638                            } else {
639                                ""
640                            };
641                            let count = if let Some((_, count)) = parent_and_count {
642                                format!(" {count}")
643                            } else {
644                                String::new()
645                            };
646                            let mut info = format!(
647                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
648                                 {in_progress}, \
649                                 {activeness}{is_dirty_label}{has_dirty_containers_label})",
650                            );
651                            let children: Vec<_> = task.dirty_containers_with_count().collect();
652                            drop(task);
653
654                            if missing_upper {
655                                info.push_str("\n  ERROR: missing upper connection");
656                            }
657
658                            if has_dirty_containers || !children.is_empty() {
659                                writeln!(info, "\n  dirty tasks:").unwrap();
660
661                                for (child_task_id, count) in children {
662                                    let task_description = ctx
663                                        .task(child_task_id, TaskDataCategory::Data)
664                                        .get_task_description();
665                                    if visited.insert(child_task_id) {
666                                        let child_info = get_info(
667                                            ctx,
668                                            child_task_id,
669                                            Some((task_id, count)),
670                                            visited,
671                                        );
672                                        info.push_str(&indent(&child_info));
673                                        if !info.ends_with('\n') {
674                                            info.push('\n');
675                                        }
676                                    } else {
677                                        writeln!(
678                                            info,
679                                            "  {child_task_id} {task_description} {count} \
680                                             (already visited)"
681                                        )
682                                        .unwrap();
683                                    }
684                                }
685                            }
686                            info
687                        }
688                        let info = get_info(&mut ctx, task_id, None, &mut visited);
689                        format!(
690                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
691                        )
692                    }
693                });
694                drop(reader_task);
695                drop(task);
696                if !task_ids_to_schedule.is_empty() {
697                    let mut queue = AggregationUpdateQueue::new();
698                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
699                    queue.execute(&mut ctx);
700                }
701
702                return Ok(Err(listener));
703            }
704        }
705
706        let reader_description = reader_task
707            .as_ref()
708            .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
709        if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
710        {
711            return value;
712        }
713
714        if let Some(output) = task.get_output() {
715            let result = match output {
716                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
717                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
718                OutputValue::Error(error) => Err(error
719                    .with_task_context(task.get_task_type(), Some(task_id))
720                    .into()),
721            };
722            if let Some(mut reader_task) = reader_task
723                && options.tracking.should_track(result.is_err())
724                && (!task.immutable() || cfg!(feature = "verify_immutable"))
725            {
726                #[cfg(feature = "trace_task_output_dependencies")]
727                let _span = tracing::trace_span!(
728                    "add output dependency",
729                    task = %task_id,
730                    dependent_task = ?reader
731                )
732                .entered();
733                let mut queue = LeafDistanceUpdateQueue::new();
734                let reader = reader.unwrap();
735                if task.add_output_dependent(reader) {
736                    // Ensure that dependent leaf distance is strictly monotonic increasing
737                    let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
738                    let reader_leaf_distance =
739                        reader_task.get_leaf_distance().copied().unwrap_or_default();
740                    if reader_leaf_distance.distance <= leaf_distance.distance {
741                        queue.push(
742                            reader,
743                            leaf_distance.distance,
744                            leaf_distance.max_distance_in_buffer,
745                        );
746                    }
747                }
748
749                drop(task);
750
751                // Note: We use `task_pair` earlier to lock the task and its reader at the same
752                // time. If we didn't and just locked the reader here, an invalidation could occur
753                // between grabbing the locks. If that happened, and if the task is "outdated" or
754                // doesn't have the dependency edge yet, the invalidation would be lost.
755
756                if !reader_task.remove_outdated_output_dependencies(&task_id) {
757                    let _ = reader_task.add_output_dependencies(task_id);
758                }
759                drop(reader_task);
760
761                queue.execute(&mut ctx);
762            }
763
764            return result;
765        }
766        drop(reader_task);
767
768        let note = EventDescription::new(|| {
769            move || {
770                if let Some(reader) = reader_description.as_ref() {
771                    format!("try_read_task_output (recompute) from {reader}",)
772                } else {
773                    "try_read_task_output (recompute, untracked)".to_string()
774                }
775            }
776        });
777
778        // Output doesn't exist. We need to schedule the task to compute it.
779        let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
780            TaskExecutionReason::OutputNotAvailable,
781            EventDescription::new(|| task.get_task_desc_fn()),
782            note,
783        );
784
785        // It's not possible that the task is InProgress at this point. If it is InProgress {
786        // done: true } it must have Output and would early return.
787        let old = task.set_in_progress(in_progress_state);
788        debug_assert!(old.is_none(), "InProgress already exists");
789        ctx.schedule_task(task, TaskPriority::Initial);
790
791        Ok(Err(listener))
792    }
793
794    fn try_read_task_cell(
795        &self,
796        task_id: TaskId,
797        reader: Option<TaskId>,
798        cell: CellId,
799        options: ReadCellOptions,
800        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
801    ) -> Result<Result<TypedCellContent, EventListener>> {
802        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
803
804        fn add_cell_dependency(
805            task_id: TaskId,
806            mut task: impl TaskGuard,
807            reader: Option<TaskId>,
808            reader_task: Option<impl TaskGuard>,
809            cell: CellId,
810            key: Option<u64>,
811        ) {
812            if let Some(mut reader_task) = reader_task
813                && (!task.immutable() || cfg!(feature = "verify_immutable"))
814            {
815                let reader = reader.unwrap();
816                let _ = task.add_cell_dependents((cell, key, reader));
817                drop(task);
818
819                // Note: We use `task_pair` earlier to lock the task and its reader at the same
820                // time. If we didn't and just locked the reader here, an invalidation could occur
821                // between grabbing the locks. If that happened, and if the task is "outdated" or
822                // doesn't have the dependency edge yet, the invalidation would be lost.
823
824                let target = CellRef {
825                    task: task_id,
826                    cell,
827                };
828                if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
829                    let _ = reader_task.add_cell_dependencies((target, key));
830                }
831                drop(reader_task);
832            }
833        }
834
835        let ReadCellOptions {
836            is_serializable_cell_content,
837            tracking,
838            final_read_hint,
839        } = options;
840
841        let mut ctx = self.execute_context(turbo_tasks);
842        let (mut task, reader_task) = if self.should_track_dependencies()
843            && !matches!(tracking, ReadCellTracking::Untracked)
844            && let Some(reader_id) = reader
845            && reader_id != task_id
846        {
847            // Having a task_pair here is not optimal, but otherwise this would lead to a race
848            // condition. See below.
849            // TODO(sokra): solve that in a more performant way.
850            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
851            (task, Some(reader))
852        } else {
853            (ctx.task(task_id, TaskDataCategory::All), None)
854        };
855
856        let content = if final_read_hint {
857            task.remove_cell_data(is_serializable_cell_content, cell)
858        } else {
859            task.get_cell_data(is_serializable_cell_content, cell)
860        };
861        if let Some(content) = content {
862            if tracking.should_track(false) {
863                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
864            }
865            return Ok(Ok(TypedCellContent(
866                cell.type_id,
867                CellContent(Some(content.reference)),
868            )));
869        }
870
871        let in_progress = task.get_in_progress();
872        if matches!(
873            in_progress,
874            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
875        ) {
876            return Ok(Err(self
877                .listen_to_cell(&mut task, task_id, &reader_task, cell)
878                .0));
879        }
880        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
881
882        // Check cell index range (cell might not exist at all)
883        let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
884        let Some(max_id) = max_id else {
885            let task_desc = task.get_task_description();
886            if tracking.should_track(true) {
887                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
888            }
889            bail!(
890                "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
891            );
892        };
893        if cell.index >= max_id {
894            let task_desc = task.get_task_description();
895            if tracking.should_track(true) {
896                add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
897            }
898            bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
899        }
900
901        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
902        // task the get the cell content.
903
904        // Listen to the cell and potentially schedule the task
905        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
906        drop(reader_task);
907        if !new_listener {
908            return Ok(Err(listener));
909        }
910
911        let _span = tracing::trace_span!(
912            "recomputation",
913            cell_type = get_value_type(cell.type_id).global_name,
914            cell_index = cell.index
915        )
916        .entered();
917
918        // Schedule the task, if not already scheduled
919        if is_cancelled {
920            bail!("{} was canceled", task.get_task_description());
921        }
922
923        let _ = task.add_scheduled(
924            TaskExecutionReason::CellNotAvailable,
925            EventDescription::new(|| task.get_task_desc_fn()),
926        );
927        ctx.schedule_task(task, TaskPriority::Initial);
928
929        Ok(Err(listener))
930    }
931
932    fn listen_to_cell(
933        &self,
934        task: &mut impl TaskGuard,
935        task_id: TaskId,
936        reader_task: &Option<impl TaskGuard>,
937        cell: CellId,
938    ) -> (EventListener, bool) {
939        let note = || {
940            let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
941            move || {
942                if let Some(reader_desc) = reader_desc.as_ref() {
943                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
944                } else {
945                    "try_read_task_cell (in progress, untracked)".to_string()
946                }
947            }
948        };
949        if let Some(in_progress) = task.get_in_progress_cells(&cell) {
950            // Someone else is already computing the cell
951            let listener = in_progress.event.listen_with_note(note);
952            return (listener, false);
953        }
954        let in_progress = InProgressCellState::new(task_id, cell);
955        let listener = in_progress.event.listen_with_note(note);
956        let old = task.insert_in_progress_cells(cell, in_progress);
957        debug_assert!(old.is_none(), "InProgressCell already exists");
958        (listener, true)
959    }
960
961    fn snapshot_and_persist(
962        &self,
963        parent_span: Option<tracing::Id>,
964        reason: &str,
965    ) -> Option<(Instant, bool)> {
966        let snapshot_span =
967            tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
968                .entered();
969        let start = Instant::now();
970        debug_assert!(self.should_persist());
971
972        let suspended_operations;
973        {
974            let _span = tracing::info_span!("blocking").entered();
975            let mut snapshot_request = self.snapshot_request.lock();
976            snapshot_request.snapshot_requested = true;
977            let active_operations = self
978                .in_progress_operations
979                .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
980            if active_operations != 0 {
981                self.operations_suspended
982                    .wait_while(&mut snapshot_request, |_| {
983                        self.in_progress_operations.load(Ordering::Relaxed)
984                            != SNAPSHOT_REQUESTED_BIT
985                    });
986            }
987            suspended_operations = snapshot_request
988                .suspended_operations
989                .iter()
990                .map(|op| op.arc().clone())
991                .collect::<Vec<_>>();
992        }
993        self.storage.start_snapshot();
994        let mut persisted_task_cache_log = self
995            .persisted_task_cache_log
996            .as_ref()
997            .map(|l| l.take(|i| i))
998            .unwrap_or_default();
999        let mut snapshot_request = self.snapshot_request.lock();
1000        snapshot_request.snapshot_requested = false;
1001        self.in_progress_operations
1002            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1003        self.snapshot_completed.notify_all();
1004        let snapshot_time = Instant::now();
1005        drop(snapshot_request);
1006
1007        #[cfg(feature = "print_cache_item_size")]
1008        #[derive(Default)]
1009        struct TaskCacheStats {
1010            data: usize,
1011            data_compressed: usize,
1012            data_count: usize,
1013            meta: usize,
1014            meta_compressed: usize,
1015            meta_count: usize,
1016            upper_count: usize,
1017            collectibles_count: usize,
1018            aggregated_collectibles_count: usize,
1019            children_count: usize,
1020            followers_count: usize,
1021            collectibles_dependents_count: usize,
1022            aggregated_dirty_containers_count: usize,
1023            output_size: usize,
1024        }
1025        #[cfg(feature = "print_cache_item_size")]
1026        impl TaskCacheStats {
1027            fn compressed_size(data: &[u8]) -> Result<usize> {
1028                Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1029                    data,
1030                    &mut Vec::new(),
1031                    lzzzz::lz4::ACC_LEVEL_DEFAULT,
1032                )?)
1033            }
1034
1035            fn add_data(&mut self, data: &[u8]) {
1036                self.data += data.len();
1037                self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1038                self.data_count += 1;
1039            }
1040
1041            fn add_meta(&mut self, data: &[u8]) {
1042                self.meta += data.len();
1043                self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1044                self.meta_count += 1;
1045            }
1046
1047            fn add_counts(&mut self, storage: &TaskStorage) {
1048                let counts = storage.meta_counts();
1049                self.upper_count += counts.upper;
1050                self.collectibles_count += counts.collectibles;
1051                self.aggregated_collectibles_count += counts.aggregated_collectibles;
1052                self.children_count += counts.children;
1053                self.followers_count += counts.followers;
1054                self.collectibles_dependents_count += counts.collectibles_dependents;
1055                self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1056                if let Some(output) = storage.get_output() {
1057                    use turbo_bincode::turbo_bincode_encode;
1058
1059                    self.output_size += turbo_bincode_encode(&output)
1060                        .map(|data| data.len())
1061                        .unwrap_or(0);
1062                }
1063            }
1064        }
1065        #[cfg(feature = "print_cache_item_size")]
1066        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1067            Mutex::new(FxHashMap::default());
1068
1069        let preprocess = |task_id: TaskId, inner: &TaskStorage| {
1070            if task_id.is_transient() {
1071                return (None, None);
1072            }
1073
1074            let meta_restored = inner.flags.meta_restored();
1075            let data_restored = inner.flags.data_restored();
1076
1077            // Encode meta/data directly from TaskStorage
1078            let meta = meta_restored.then(|| inner.clone_meta_snapshot());
1079            let data = data_restored.then(|| inner.clone_data_snapshot());
1080
1081            (meta, data)
1082        };
1083        let process = |task_id: TaskId,
1084                       (meta, data): (Option<TaskStorage>, Option<TaskStorage>),
1085                       buffer: &mut TurboBincodeBuffer| {
1086            #[cfg(feature = "print_cache_item_size")]
1087            if let Some(ref m) = meta {
1088                task_cache_stats
1089                    .lock()
1090                    .entry(self.debug_get_task_name(task_id))
1091                    .or_default()
1092                    .add_counts(m);
1093            }
1094            (
1095                task_id,
1096                meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)),
1097                data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)),
1098            )
1099        };
1100        let process_snapshot =
1101            |task_id: TaskId, inner: Box<TaskStorage>, buffer: &mut TurboBincodeBuffer| {
1102                if task_id.is_transient() {
1103                    return (task_id, None, None);
1104                }
1105
1106                #[cfg(feature = "print_cache_item_size")]
1107                if inner.flags.meta_modified() {
1108                    task_cache_stats
1109                        .lock()
1110                        .entry(self.debug_get_task_name(task_id))
1111                        .or_default()
1112                        .add_counts(&inner);
1113                }
1114
1115                // Encode meta/data directly from TaskStorage snapshot
1116                (
1117                    task_id,
1118                    inner.flags.meta_modified().then(|| {
1119                        encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer)
1120                    }),
1121                    inner.flags.data_modified().then(|| {
1122                        encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer)
1123                    }),
1124                )
1125            };
1126
1127        let snapshot = self
1128            .storage
1129            .take_snapshot(&preprocess, &process, &process_snapshot);
1130
1131        let task_snapshots = snapshot
1132            .into_iter()
1133            .filter_map(|iter| {
1134                let mut iter = iter
1135                    .filter_map(
1136                        |(task_id, meta, data): (
1137                            _,
1138                            Option<Result<SmallVec<_>>>,
1139                            Option<Result<SmallVec<_>>>,
1140                        )| {
1141                            let meta = match meta {
1142                                Some(Ok(meta)) => {
1143                                    #[cfg(feature = "print_cache_item_size")]
1144                                    task_cache_stats
1145                                        .lock()
1146                                        .entry(self.debug_get_task_name(task_id))
1147                                        .or_default()
1148                                        .add_meta(&meta);
1149                                    Some(meta)
1150                                }
1151                                None => None,
1152                                Some(Err(err)) => {
1153                                    println!(
1154                                        "Serializing task {} failed (meta): {:?}",
1155                                        self.debug_get_task_description(task_id),
1156                                        err
1157                                    );
1158                                    None
1159                                }
1160                            };
1161                            let data = match data {
1162                                Some(Ok(data)) => {
1163                                    #[cfg(feature = "print_cache_item_size")]
1164                                    task_cache_stats
1165                                        .lock()
1166                                        .entry(self.debug_get_task_name(task_id))
1167                                        .or_default()
1168                                        .add_data(&data);
1169                                    Some(data)
1170                                }
1171                                None => None,
1172                                Some(Err(err)) => {
1173                                    println!(
1174                                        "Serializing task {} failed (data): {:?}",
1175                                        self.debug_get_task_description(task_id),
1176                                        err
1177                                    );
1178                                    None
1179                                }
1180                            };
1181                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1182                        },
1183                    )
1184                    .peekable();
1185                iter.peek().is_some().then_some(iter)
1186            })
1187            .collect::<Vec<_>>();
1188
1189        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1190
1191        drop(snapshot_span);
1192
1193        if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1194            return Some((snapshot_time, false));
1195        }
1196
1197        let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1198        {
1199            if let Err(err) = self.backing_storage.save_snapshot(
1200                suspended_operations,
1201                persisted_task_cache_log,
1202                task_snapshots,
1203            ) {
1204                println!("Persisting failed: {err:?}");
1205                return None;
1206            }
1207            #[cfg(feature = "print_cache_item_size")]
1208            {
1209                let mut task_cache_stats = task_cache_stats
1210                    .into_inner()
1211                    .into_iter()
1212                    .collect::<Vec<_>>();
1213                if !task_cache_stats.is_empty() {
1214                    use turbo_tasks::util::FormatBytes;
1215
1216                    use crate::utils::markdown_table::print_markdown_table;
1217
1218                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1219                        (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1220                            .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1221                    });
1222                    println!(
1223                        "Task cache stats: {} ({})",
1224                        FormatBytes(
1225                            task_cache_stats
1226                                .iter()
1227                                .map(|(_, s)| s.data_compressed + s.meta_compressed)
1228                                .sum::<usize>()
1229                        ),
1230                        FormatBytes(
1231                            task_cache_stats
1232                                .iter()
1233                                .map(|(_, s)| s.data + s.meta)
1234                                .sum::<usize>()
1235                        )
1236                    );
1237
1238                    print_markdown_table(
1239                        [
1240                            "Task",
1241                            " Total Size",
1242                            " Data Size",
1243                            " Data Count x Avg",
1244                            " Data Count x Avg",
1245                            " Meta Size",
1246                            " Meta Count x Avg",
1247                            " Meta Count x Avg",
1248                            " Uppers",
1249                            " Coll",
1250                            " Agg Coll",
1251                            " Children",
1252                            " Followers",
1253                            " Coll Deps",
1254                            " Agg Dirty",
1255                            " Output Size",
1256                        ],
1257                        task_cache_stats.iter(),
1258                        |(task_desc, stats)| {
1259                            [
1260                                task_desc.to_string(),
1261                                format!(
1262                                    " {} ({})",
1263                                    FormatBytes(stats.data_compressed + stats.meta_compressed),
1264                                    FormatBytes(stats.data + stats.meta)
1265                                ),
1266                                format!(
1267                                    " {} ({})",
1268                                    FormatBytes(stats.data_compressed),
1269                                    FormatBytes(stats.data)
1270                                ),
1271                                format!(" {} x", stats.data_count,),
1272                                format!(
1273                                    "{} ({})",
1274                                    FormatBytes(
1275                                        stats
1276                                            .data_compressed
1277                                            .checked_div(stats.data_count)
1278                                            .unwrap_or(0)
1279                                    ),
1280                                    FormatBytes(
1281                                        stats.data.checked_div(stats.data_count).unwrap_or(0)
1282                                    ),
1283                                ),
1284                                format!(
1285                                    " {} ({})",
1286                                    FormatBytes(stats.meta_compressed),
1287                                    FormatBytes(stats.meta)
1288                                ),
1289                                format!(" {} x", stats.meta_count,),
1290                                format!(
1291                                    "{} ({})",
1292                                    FormatBytes(
1293                                        stats
1294                                            .meta_compressed
1295                                            .checked_div(stats.meta_count)
1296                                            .unwrap_or(0)
1297                                    ),
1298                                    FormatBytes(
1299                                        stats.meta.checked_div(stats.meta_count).unwrap_or(0)
1300                                    ),
1301                                ),
1302                                format!(" {}", stats.upper_count),
1303                                format!(" {}", stats.collectibles_count),
1304                                format!(" {}", stats.aggregated_collectibles_count),
1305                                format!(" {}", stats.children_count),
1306                                format!(" {}", stats.followers_count),
1307                                format!(" {}", stats.collectibles_dependents_count),
1308                                format!(" {}", stats.aggregated_dirty_containers_count),
1309                                format!(" {}", FormatBytes(stats.output_size)),
1310                            ]
1311                        },
1312                    );
1313                }
1314            }
1315        }
1316
1317        let elapsed = start.elapsed();
1318        // avoid spamming the event queue with information about fast operations
1319        if elapsed > Duration::from_secs(10) {
1320            turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1321                "Finished writing to filesystem cache".to_string(),
1322                elapsed,
1323            )));
1324        }
1325
1326        Some((snapshot_time, true))
1327    }
1328
1329    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1330        if self.should_restore() {
1331            // Continue all uncompleted operations
1332            // They can't be interrupted by a snapshot since the snapshotting job has not been
1333            // scheduled yet.
1334            let uncompleted_operations = self
1335                .backing_storage
1336                .uncompleted_operations()
1337                .expect("Failed to get uncompleted operations");
1338            if !uncompleted_operations.is_empty() {
1339                let mut ctx = self.execute_context(turbo_tasks);
1340                for op in uncompleted_operations {
1341                    op.execute(&mut ctx);
1342                }
1343            }
1344        }
1345
1346        // Only when it should write regularly to the storage, we schedule the initial snapshot
1347        // job.
1348        if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1349            // Schedule the snapshot job
1350            let _span = trace_span!("persisting background job").entered();
1351            let _span = tracing::info_span!("thread").entered();
1352            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1353        }
1354    }
1355
1356    fn stopping(&self) {
1357        self.stopping.store(true, Ordering::Release);
1358        self.stopping_event.notify(usize::MAX);
1359    }
1360
1361    #[allow(unused_variables)]
1362    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1363        #[cfg(feature = "verify_aggregation_graph")]
1364        {
1365            self.is_idle.store(false, Ordering::Release);
1366            self.verify_aggregation_graph(turbo_tasks, false);
1367        }
1368        if self.should_persist() {
1369            self.snapshot_and_persist(Span::current().into(), "stop");
1370        }
1371        drop_contents(&self.task_cache);
1372        self.storage.drop_contents();
1373        if let Err(err) = self.backing_storage.shutdown() {
1374            println!("Shutting down failed: {err}");
1375        }
1376    }
1377
1378    #[allow(unused_variables)]
1379    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1380        self.idle_start_event.notify(usize::MAX);
1381
1382        #[cfg(feature = "verify_aggregation_graph")]
1383        {
1384            use tokio::select;
1385
1386            self.is_idle.store(true, Ordering::Release);
1387            let this = self.clone();
1388            let turbo_tasks = turbo_tasks.pin();
1389            tokio::task::spawn(async move {
1390                select! {
1391                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1392                        // do nothing
1393                    }
1394                    _ = this.idle_end_event.listen() => {
1395                        return;
1396                    }
1397                }
1398                if !this.is_idle.load(Ordering::Relaxed) {
1399                    return;
1400                }
1401                this.verify_aggregation_graph(&*turbo_tasks, true);
1402            });
1403        }
1404    }
1405
1406    fn idle_end(&self) {
1407        #[cfg(feature = "verify_aggregation_graph")]
1408        self.is_idle.store(false, Ordering::Release);
1409        self.idle_end_event.notify(usize::MAX);
1410    }
1411
1412    fn get_or_create_persistent_task(
1413        &self,
1414        task_type: CachedTaskType,
1415        parent_task: Option<TaskId>,
1416        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1417    ) -> TaskId {
1418        // First check if the task exists in the cache which only uses a read lock
1419        if let Some(task_id) = self.task_cache.get(&task_type) {
1420            let task_id = *task_id;
1421            self.track_cache_hit(&task_type);
1422            self.connect_child(
1423                parent_task,
1424                task_id,
1425                Some(ArcOrOwned::Owned(task_type)),
1426                turbo_tasks,
1427            );
1428            return task_id;
1429        }
1430
1431        let check_backing_storage =
1432            self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1433        let tx = check_backing_storage
1434            .then(|| self.backing_storage.start_read_transaction())
1435            .flatten();
1436        let (task_id, task_type) = {
1437            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1438            if let Some(task_id) = unsafe {
1439                check_backing_storage
1440                    .then(|| {
1441                        self.backing_storage
1442                            .forward_lookup_task_cache(tx.as_ref(), &task_type)
1443                            .expect("Failed to lookup task id")
1444                    })
1445                    .flatten()
1446            } {
1447                // Task exists in backing storage
1448                // So we only need to insert it into the in-memory cache
1449                self.track_cache_hit(&task_type);
1450                let task_type = match raw_entry(&self.task_cache, &task_type) {
1451                    RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1452                    RawEntry::Vacant(e) => {
1453                        let task_type = Arc::new(task_type);
1454                        e.insert(task_type.clone(), task_id);
1455                        ArcOrOwned::Arc(task_type)
1456                    }
1457                };
1458                (task_id, task_type)
1459            } else {
1460                // Task doesn't exist in memory cache or backing storage
1461                // So we might need to create a new task
1462                let (task_id, mut task_type) = match raw_entry(&self.task_cache, &task_type) {
1463                    RawEntry::Occupied(e) => {
1464                        let task_id = *e.get();
1465                        drop(e);
1466                        self.track_cache_hit(&task_type);
1467                        (task_id, ArcOrOwned::Owned(task_type))
1468                    }
1469                    RawEntry::Vacant(e) => {
1470                        let task_type = Arc::new(task_type);
1471                        let task_id = self.persisted_task_id_factory.get();
1472                        e.insert(task_type.clone(), task_id);
1473                        self.track_cache_miss(&task_type);
1474                        (task_id, ArcOrOwned::Arc(task_type))
1475                    }
1476                };
1477                if let Some(log) = &self.persisted_task_cache_log {
1478                    let task_type_arc: Arc<CachedTaskType> = Arc::from(task_type);
1479                    log.lock(task_id).push((task_type_arc.clone(), task_id));
1480                    task_type = ArcOrOwned::Arc(task_type_arc);
1481                }
1482                (task_id, task_type)
1483            }
1484        };
1485
1486        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1487        unsafe {
1488            self.connect_child_with_tx(
1489                tx.as_ref(),
1490                parent_task,
1491                task_id,
1492                Some(task_type),
1493                turbo_tasks,
1494            )
1495        };
1496
1497        task_id
1498    }
1499
1500    fn get_or_create_transient_task(
1501        &self,
1502        task_type: CachedTaskType,
1503        parent_task: Option<TaskId>,
1504        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1505    ) -> TaskId {
1506        if let Some(parent_task) = parent_task
1507            && !parent_task.is_transient()
1508        {
1509            self.panic_persistent_calling_transient(
1510                self.debug_get_task_description(parent_task),
1511                Some(&task_type),
1512                /* cell_id */ None,
1513            );
1514        }
1515        // First check if the task exists in the cache which only uses a read lock
1516        if let Some(task_id) = self.task_cache.get(&task_type) {
1517            let task_id = *task_id;
1518            self.track_cache_hit(&task_type);
1519            self.connect_child(
1520                parent_task,
1521                task_id,
1522                Some(ArcOrOwned::Owned(task_type)),
1523                turbo_tasks,
1524            );
1525            return task_id;
1526        }
1527        // If not, acquire a write lock and double check / insert
1528        match raw_entry(&self.task_cache, &task_type) {
1529            RawEntry::Occupied(e) => {
1530                let task_id = *e.get();
1531                drop(e);
1532                self.track_cache_hit(&task_type);
1533                self.connect_child(
1534                    parent_task,
1535                    task_id,
1536                    Some(ArcOrOwned::Owned(task_type)),
1537                    turbo_tasks,
1538                );
1539                task_id
1540            }
1541            RawEntry::Vacant(e) => {
1542                let task_type = Arc::new(task_type);
1543                let task_id = self.transient_task_id_factory.get();
1544                e.insert(task_type.clone(), task_id);
1545                self.track_cache_miss(&task_type);
1546                self.connect_child(
1547                    parent_task,
1548                    task_id,
1549                    Some(ArcOrOwned::Arc(task_type)),
1550                    turbo_tasks,
1551                );
1552
1553                task_id
1554            }
1555        }
1556    }
1557
1558    /// Generate an object that implements [`fmt::Display`] explaining why the given
1559    /// [`CachedTaskType`] is transient.
1560    fn debug_trace_transient_task(
1561        &self,
1562        task_type: &CachedTaskType,
1563        cell_id: Option<CellId>,
1564    ) -> DebugTraceTransientTask {
1565        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1566        // from tracing the same task many times, so use a visited_set
1567        fn inner_id(
1568            backend: &TurboTasksBackendInner<impl BackingStorage>,
1569            task_id: TaskId,
1570            cell_type_id: Option<ValueTypeId>,
1571            visited_set: &mut FxHashSet<TaskId>,
1572        ) -> DebugTraceTransientTask {
1573            if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1574                if visited_set.contains(&task_id) {
1575                    let task_name = task_type.get_name();
1576                    DebugTraceTransientTask::Collapsed {
1577                        task_name,
1578                        cell_type_id,
1579                    }
1580                } else {
1581                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1582                }
1583            } else {
1584                DebugTraceTransientTask::Uncached { cell_type_id }
1585            }
1586        }
1587        fn inner_cached(
1588            backend: &TurboTasksBackendInner<impl BackingStorage>,
1589            task_type: &CachedTaskType,
1590            cell_type_id: Option<ValueTypeId>,
1591            visited_set: &mut FxHashSet<TaskId>,
1592        ) -> DebugTraceTransientTask {
1593            let task_name = task_type.get_name();
1594
1595            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1596                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1597                    // `task_id` should never be `None` at this point, as that would imply a
1598                    // non-local task is returning a local `Vc`...
1599                    // Just ignore if it happens, as we're likely already panicking.
1600                    return None;
1601                };
1602                if task_id.is_transient() {
1603                    Some(Box::new(inner_id(
1604                        backend,
1605                        task_id,
1606                        cause_self_raw_vc.try_get_type_id(),
1607                        visited_set,
1608                    )))
1609                } else {
1610                    None
1611                }
1612            });
1613            let cause_args = task_type
1614                .arg
1615                .get_raw_vcs()
1616                .into_iter()
1617                .filter_map(|raw_vc| {
1618                    let Some(task_id) = raw_vc.try_get_task_id() else {
1619                        // `task_id` should never be `None` (see comment above)
1620                        return None;
1621                    };
1622                    if !task_id.is_transient() {
1623                        return None;
1624                    }
1625                    Some((task_id, raw_vc.try_get_type_id()))
1626                })
1627                .collect::<IndexSet<_>>() // dedupe
1628                .into_iter()
1629                .map(|(task_id, cell_type_id)| {
1630                    inner_id(backend, task_id, cell_type_id, visited_set)
1631                })
1632                .collect();
1633
1634            DebugTraceTransientTask::Cached {
1635                task_name,
1636                cell_type_id,
1637                cause_self,
1638                cause_args,
1639            }
1640        }
1641        inner_cached(
1642            self,
1643            task_type,
1644            cell_id.map(|c| c.type_id),
1645            &mut FxHashSet::default(),
1646        )
1647    }
1648
1649    fn invalidate_task(
1650        &self,
1651        task_id: TaskId,
1652        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1653    ) {
1654        if !self.should_track_dependencies() {
1655            panic!("Dependency tracking is disabled so invalidation is not allowed");
1656        }
1657        operation::InvalidateOperation::run(
1658            smallvec![task_id],
1659            #[cfg(feature = "trace_task_dirty")]
1660            TaskDirtyCause::Invalidator,
1661            self.execute_context(turbo_tasks),
1662        );
1663    }
1664
1665    fn invalidate_tasks(
1666        &self,
1667        tasks: &[TaskId],
1668        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1669    ) {
1670        if !self.should_track_dependencies() {
1671            panic!("Dependency tracking is disabled so invalidation is not allowed");
1672        }
1673        operation::InvalidateOperation::run(
1674            tasks.iter().copied().collect(),
1675            #[cfg(feature = "trace_task_dirty")]
1676            TaskDirtyCause::Unknown,
1677            self.execute_context(turbo_tasks),
1678        );
1679    }
1680
1681    fn invalidate_tasks_set(
1682        &self,
1683        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1684        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1685    ) {
1686        if !self.should_track_dependencies() {
1687            panic!("Dependency tracking is disabled so invalidation is not allowed");
1688        }
1689        operation::InvalidateOperation::run(
1690            tasks.iter().copied().collect(),
1691            #[cfg(feature = "trace_task_dirty")]
1692            TaskDirtyCause::Unknown,
1693            self.execute_context(turbo_tasks),
1694        );
1695    }
1696
1697    fn invalidate_serialization(
1698        &self,
1699        task_id: TaskId,
1700        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1701    ) {
1702        if task_id.is_transient() {
1703            return;
1704        }
1705        let mut ctx = self.execute_context(turbo_tasks);
1706        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1707        task.invalidate_serialization();
1708    }
1709
1710    fn debug_get_task_description(&self, task_id: TaskId) -> String {
1711        let task = self.storage.access_mut(task_id);
1712        if let Some(value) = task.get_persistent_task_type() {
1713            format!("{task_id:?} {}", value)
1714        } else if let Some(value) = task.get_transient_task_type() {
1715            format!("{task_id:?} {}", value)
1716        } else {
1717            format!("{task_id:?} unknown")
1718        }
1719    }
1720
1721    #[allow(dead_code)]
1722    fn debug_get_task_name(&self, task_id: TaskId) -> String {
1723        let task = self.storage.access_mut(task_id);
1724        if let Some(value) = task.get_persistent_task_type() {
1725            value.to_string()
1726        } else if let Some(value) = task.get_transient_task_type() {
1727            value.to_string()
1728        } else {
1729            "unknown".to_string()
1730        }
1731    }
1732
1733    fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1734        let task = self.storage.access_mut(task_id);
1735        task.get_persistent_task_type().cloned()
1736    }
1737
1738    fn task_execution_canceled(
1739        &self,
1740        task_id: TaskId,
1741        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1742    ) {
1743        let mut ctx = self.execute_context(turbo_tasks);
1744        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1745        if let Some(in_progress) = task.take_in_progress() {
1746            match in_progress {
1747                InProgressState::Scheduled {
1748                    done_event,
1749                    reason: _,
1750                } => done_event.notify(usize::MAX),
1751                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1752                    done_event.notify(usize::MAX)
1753                }
1754                InProgressState::Canceled => {}
1755            }
1756        }
1757        let old = task.set_in_progress(InProgressState::Canceled);
1758        debug_assert!(old.is_none(), "InProgress already exists");
1759    }
1760
1761    fn try_start_task_execution(
1762        &self,
1763        task_id: TaskId,
1764        priority: TaskPriority,
1765        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1766    ) -> Option<TaskExecutionSpec<'_>> {
1767        let execution_reason;
1768        let task_type;
1769        {
1770            let mut ctx = self.execute_context(turbo_tasks);
1771            let mut task = ctx.task(task_id, TaskDataCategory::All);
1772            task_type = task.get_task_type().to_owned();
1773            let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1774            if let Some(tasks) = task.prefetch() {
1775                drop(task);
1776                ctx.prepare_tasks(tasks);
1777                task = ctx.task(task_id, TaskDataCategory::All);
1778            }
1779            let in_progress = task.take_in_progress()?;
1780            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1781                let old = task.set_in_progress(in_progress);
1782                debug_assert!(old.is_none(), "InProgress already exists");
1783                return None;
1784            };
1785            execution_reason = reason;
1786            let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1787                InProgressStateInner {
1788                    stale: false,
1789                    once_task,
1790                    done_event,
1791                    session_dependent: false,
1792                    marked_as_completed: false,
1793                    new_children: Default::default(),
1794                },
1795            )));
1796            debug_assert!(old.is_none(), "InProgress already exists");
1797
1798            // Make all current collectibles outdated (remove left-over outdated collectibles)
1799            enum Collectible {
1800                Current(CollectibleRef, i32),
1801                Outdated(CollectibleRef),
1802            }
1803            let collectibles = task
1804                .iter_collectibles()
1805                .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1806                .chain(
1807                    task.iter_outdated_collectibles()
1808                        .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1809                )
1810                .collect::<Vec<_>>();
1811            for collectible in collectibles {
1812                match collectible {
1813                    Collectible::Current(collectible, value) => {
1814                        let _ = task.insert_outdated_collectible(collectible, value);
1815                    }
1816                    Collectible::Outdated(collectible) => {
1817                        if task
1818                            .collectibles()
1819                            .is_none_or(|m| m.get(&collectible).is_none())
1820                        {
1821                            task.remove_outdated_collectibles(&collectible);
1822                        }
1823                    }
1824                }
1825            }
1826
1827            if self.should_track_dependencies() {
1828                // Make all dependencies outdated
1829                let cell_dependencies = task.iter_cell_dependencies().collect();
1830                task.set_outdated_cell_dependencies(cell_dependencies);
1831
1832                let outdated_output_dependencies = task.iter_output_dependencies().collect();
1833                task.set_outdated_output_dependencies(outdated_output_dependencies);
1834            }
1835        }
1836
1837        let (span, future) = match task_type {
1838            TaskType::Cached(task_type) => {
1839                let CachedTaskType {
1840                    native_fn,
1841                    this,
1842                    arg,
1843                } = &*task_type;
1844                (
1845                    native_fn.span(task_id.persistence(), execution_reason, priority),
1846                    native_fn.execute(*this, &**arg),
1847                )
1848            }
1849            TaskType::Transient(task_type) => {
1850                let span = tracing::trace_span!("turbo_tasks::root_task");
1851                let future = match &*task_type {
1852                    TransientTask::Root(f) => f(),
1853                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1854                };
1855                (span, future)
1856            }
1857        };
1858        Some(TaskExecutionSpec { future, span })
1859    }
1860
1861    fn task_execution_completed(
1862        &self,
1863        task_id: TaskId,
1864        result: Result<RawVc, TurboTasksExecutionError>,
1865        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1866        has_invalidator: bool,
1867        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1868    ) -> bool {
1869        // Task completion is a 4 step process:
1870        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1871        //    aggregation number of the task and the new children.
1872        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1873        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1874        // 4. Shrink the task memory to reduce footprint of the task.
1875
1876        // Due to persistence it is possible that the process is cancelled after any step. This is
1877        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1878        // in-memory representation.
1879
1880        // The task might be invalidated during this process, so we need to check the stale flag
1881        // at the start of every step.
1882
1883        #[cfg(not(feature = "trace_task_details"))]
1884        let span = tracing::trace_span!(
1885            "task execution completed",
1886            new_children = tracing::field::Empty
1887        )
1888        .entered();
1889        #[cfg(feature = "trace_task_details")]
1890        let span = tracing::trace_span!(
1891            "task execution completed",
1892            task_id = display(task_id),
1893            result = match result.as_ref() {
1894                Ok(value) => display(either::Either::Left(value)),
1895                Err(err) => display(either::Either::Right(err)),
1896            },
1897            new_children = tracing::field::Empty,
1898            immutable = tracing::field::Empty,
1899            new_output = tracing::field::Empty,
1900            output_dependents = tracing::field::Empty,
1901            stale = tracing::field::Empty,
1902        )
1903        .entered();
1904
1905        let is_error = result.is_err();
1906
1907        let mut ctx = self.execute_context(turbo_tasks);
1908
1909        let Some(TaskExecutionCompletePrepareResult {
1910            new_children,
1911            is_now_immutable,
1912            #[cfg(feature = "verify_determinism")]
1913            no_output_set,
1914            new_output,
1915            output_dependent_tasks,
1916        }) = self.task_execution_completed_prepare(
1917            &mut ctx,
1918            #[cfg(feature = "trace_task_details")]
1919            &span,
1920            task_id,
1921            result,
1922            cell_counters,
1923            has_invalidator,
1924        )
1925        else {
1926            // Task was stale and has been rescheduled
1927            #[cfg(feature = "trace_task_details")]
1928            span.record("stale", "prepare");
1929            return true;
1930        };
1931
1932        #[cfg(feature = "trace_task_details")]
1933        span.record("new_output", new_output.is_some());
1934        #[cfg(feature = "trace_task_details")]
1935        span.record("output_dependents", output_dependent_tasks.len());
1936
1937        // When restoring from filesystem cache the following might not be executed (since we can
1938        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1939        // would be executed again.
1940
1941        if !output_dependent_tasks.is_empty() {
1942            self.task_execution_completed_invalidate_output_dependent(
1943                &mut ctx,
1944                task_id,
1945                output_dependent_tasks,
1946            );
1947        }
1948
1949        let has_new_children = !new_children.is_empty();
1950        span.record("new_children", new_children.len());
1951
1952        if has_new_children {
1953            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1954        }
1955
1956        if has_new_children
1957            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1958        {
1959            // Task was stale and has been rescheduled
1960            #[cfg(feature = "trace_task_details")]
1961            span.record("stale", "connect");
1962            return true;
1963        }
1964
1965        let (stale, in_progress_cells) = self.task_execution_completed_finish(
1966            &mut ctx,
1967            task_id,
1968            #[cfg(feature = "verify_determinism")]
1969            no_output_set,
1970            new_output,
1971            is_now_immutable,
1972        );
1973        if stale {
1974            // Task was stale and has been rescheduled
1975            #[cfg(feature = "trace_task_details")]
1976            span.record("stale", "finish");
1977            return true;
1978        }
1979
1980        let removed_data =
1981            self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
1982
1983        // Drop data outside of critical sections
1984        drop(removed_data);
1985        drop(in_progress_cells);
1986
1987        false
1988    }
1989
1990    fn task_execution_completed_prepare(
1991        &self,
1992        ctx: &mut impl ExecuteContext<'_>,
1993        #[cfg(feature = "trace_task_details")] span: &Span,
1994        task_id: TaskId,
1995        result: Result<RawVc, TurboTasksExecutionError>,
1996        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1997        has_invalidator: bool,
1998    ) -> Option<TaskExecutionCompletePrepareResult> {
1999        let mut task = ctx.task(task_id, TaskDataCategory::All);
2000        let Some(in_progress) = task.get_in_progress_mut() else {
2001            panic!("Task execution completed, but task is not in progress: {task:#?}");
2002        };
2003        if matches!(in_progress, InProgressState::Canceled) {
2004            return Some(TaskExecutionCompletePrepareResult {
2005                new_children: Default::default(),
2006                is_now_immutable: false,
2007                #[cfg(feature = "verify_determinism")]
2008                no_output_set: false,
2009                new_output: None,
2010                output_dependent_tasks: Default::default(),
2011            });
2012        }
2013        let &mut InProgressState::InProgress(box InProgressStateInner {
2014            stale,
2015            ref mut new_children,
2016            session_dependent,
2017            once_task: is_once_task,
2018            ..
2019        }) = in_progress
2020        else {
2021            panic!("Task execution completed, but task is not in progress: {task:#?}");
2022        };
2023
2024        // If the task is stale, reschedule it
2025        #[cfg(not(feature = "no_fast_stale"))]
2026        if stale && !is_once_task {
2027            let Some(InProgressState::InProgress(box InProgressStateInner {
2028                done_event,
2029                mut new_children,
2030                ..
2031            })) = task.take_in_progress()
2032            else {
2033                unreachable!();
2034            };
2035            let old = task.set_in_progress(InProgressState::Scheduled {
2036                done_event,
2037                reason: TaskExecutionReason::Stale,
2038            });
2039            debug_assert!(old.is_none(), "InProgress already exists");
2040            // Remove old children from new_children to leave only the children that had their
2041            // active count increased
2042            for task in task.iter_children() {
2043                new_children.remove(&task);
2044            }
2045            drop(task);
2046
2047            // We need to undo the active count increase for the children since we throw away the
2048            // new_children list now.
2049            AggregationUpdateQueue::run(
2050                AggregationUpdateJob::DecreaseActiveCounts {
2051                    task_ids: new_children.into_iter().collect(),
2052                },
2053                ctx,
2054            );
2055            return None;
2056        }
2057
2058        // take the children from the task to process them
2059        let mut new_children = take(new_children);
2060
2061        // handle has_invalidator
2062        if has_invalidator {
2063            task.set_invalidator(true);
2064        }
2065
2066        // handle cell counters: update max index and remove cells that are no longer used
2067        let old_counters: FxHashMap<_, _> = task
2068            .iter_cell_type_max_index()
2069            .map(|(&k, &v)| (k, v))
2070            .collect();
2071        let mut counters_to_remove = old_counters.clone();
2072
2073        for (&cell_type, &max_index) in cell_counters.iter() {
2074            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2075                if old_max_index != max_index {
2076                    task.insert_cell_type_max_index(cell_type, max_index);
2077                }
2078            } else {
2079                task.insert_cell_type_max_index(cell_type, max_index);
2080            }
2081        }
2082        for (cell_type, _) in counters_to_remove {
2083            task.remove_cell_type_max_index(&cell_type);
2084        }
2085
2086        let mut queue = AggregationUpdateQueue::new();
2087
2088        let mut old_edges = Vec::new();
2089
2090        let has_children = !new_children.is_empty();
2091        let is_immutable = task.immutable();
2092        let task_dependencies_for_immutable =
2093            // Task was previously marked as immutable
2094            if !is_immutable
2095            // Task is not session dependent (session dependent tasks can change between sessions)
2096            && !session_dependent
2097            // Task has no invalidator
2098            && !task.invalidator()
2099            // Task has no dependencies on collectibles
2100            && task.is_collectibles_dependencies_empty()
2101        {
2102            Some(
2103                // Collect all dependencies on tasks to check if all dependencies are immutable
2104                task.iter_output_dependencies()
2105                    .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2106                    .collect::<FxHashSet<_>>(),
2107            )
2108        } else {
2109            None
2110        };
2111
2112        if has_children {
2113            // Prepare all new children
2114            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2115
2116            // Filter actual new children
2117            old_edges.extend(
2118                task.iter_children()
2119                    .filter(|task| !new_children.remove(task))
2120                    .map(OutdatedEdge::Child),
2121            );
2122        } else {
2123            old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2124        }
2125
2126        old_edges.extend(
2127            task.iter_outdated_collectibles()
2128                .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2129        );
2130
2131        if self.should_track_dependencies() {
2132            // IMPORTANT: Use iter_outdated_* here, NOT iter_* (active dependencies).
2133            // At execution start, active deps are copied to outdated as a "before" snapshot.
2134            // During execution, new deps are added to active.
2135            // Here at completion, we clean up only the OUTDATED deps (the "before" snapshot).
2136            // Using iter_* (active) instead would incorrectly clean up deps that are still valid,
2137            // breaking dependency tracking.
2138            old_edges.extend(
2139                task.iter_outdated_cell_dependencies()
2140                    .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2141            );
2142            old_edges.extend(
2143                task.iter_outdated_output_dependencies()
2144                    .map(OutdatedEdge::OutputDependency),
2145            );
2146        }
2147
2148        // Check if output need to be updated
2149        let current_output = task.get_output();
2150        #[cfg(feature = "verify_determinism")]
2151        let no_output_set = current_output.is_none();
2152        let new_output = match result {
2153            Ok(RawVc::TaskOutput(output_task_id)) => {
2154                if let Some(OutputValue::Output(current_task_id)) = current_output
2155                    && *current_task_id == output_task_id
2156                {
2157                    None
2158                } else {
2159                    Some(OutputValue::Output(output_task_id))
2160                }
2161            }
2162            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2163                if let Some(OutputValue::Cell(CellRef {
2164                    task: current_task_id,
2165                    cell: current_cell,
2166                })) = current_output
2167                    && *current_task_id == output_task_id
2168                    && *current_cell == cell
2169                {
2170                    None
2171                } else {
2172                    Some(OutputValue::Cell(CellRef {
2173                        task: output_task_id,
2174                        cell,
2175                    }))
2176                }
2177            }
2178            Ok(RawVc::LocalOutput(..)) => {
2179                panic!("Non-local tasks must not return a local Vc");
2180            }
2181            Err(err) => {
2182                if let Some(OutputValue::Error(old_error)) = current_output
2183                    && old_error == &err
2184                {
2185                    None
2186                } else {
2187                    Some(OutputValue::Error(err))
2188                }
2189            }
2190        };
2191        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2192        // When output has changed, grab the dependent tasks
2193        if new_output.is_some() && ctx.should_track_dependencies() {
2194            output_dependent_tasks = task.iter_output_dependent().collect();
2195        }
2196
2197        drop(task);
2198
2199        // Check if the task can be marked as immutable
2200        let mut is_now_immutable = false;
2201        if let Some(dependencies) = task_dependencies_for_immutable
2202            && dependencies
2203                .iter()
2204                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2205        {
2206            is_now_immutable = true;
2207        }
2208        #[cfg(feature = "trace_task_details")]
2209        span.record("immutable", is_immutable || is_now_immutable);
2210
2211        if !queue.is_empty() || !old_edges.is_empty() {
2212            #[cfg(feature = "trace_task_completion")]
2213            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2214            // Remove outdated edges first, before removing in_progress+dirty flag.
2215            // We need to make sure all outdated edges are removed before the task can potentially
2216            // be scheduled and executed again
2217            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2218        }
2219
2220        Some(TaskExecutionCompletePrepareResult {
2221            new_children,
2222            is_now_immutable,
2223            #[cfg(feature = "verify_determinism")]
2224            no_output_set,
2225            new_output,
2226            output_dependent_tasks,
2227        })
2228    }
2229
2230    fn task_execution_completed_invalidate_output_dependent(
2231        &self,
2232        ctx: &mut impl ExecuteContext<'_>,
2233        task_id: TaskId,
2234        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2235    ) {
2236        debug_assert!(!output_dependent_tasks.is_empty());
2237
2238        if output_dependent_tasks.len() > 1 {
2239            ctx.prepare_tasks(
2240                output_dependent_tasks
2241                    .iter()
2242                    .map(|&id| (id, TaskDataCategory::All)),
2243            );
2244        }
2245
2246        fn process_output_dependents(
2247            ctx: &mut impl ExecuteContext<'_>,
2248            task_id: TaskId,
2249            dependent_task_id: TaskId,
2250            queue: &mut AggregationUpdateQueue,
2251        ) {
2252            #[cfg(feature = "trace_task_output_dependencies")]
2253            let span = tracing::trace_span!(
2254                "invalidate output dependency",
2255                task = %task_id,
2256                dependent_task = %dependent_task_id,
2257                result = tracing::field::Empty,
2258            )
2259            .entered();
2260            let mut make_stale = true;
2261            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2262            let transient_task_type = dependent.get_transient_task_type();
2263            if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2264                // once tasks are never invalidated
2265                #[cfg(feature = "trace_task_output_dependencies")]
2266                span.record("result", "once task");
2267                return;
2268            }
2269            if dependent.outdated_output_dependencies_contains(&task_id) {
2270                #[cfg(feature = "trace_task_output_dependencies")]
2271                span.record("result", "outdated dependency");
2272                // output dependency is outdated, so it hasn't read the output yet
2273                // and doesn't need to be invalidated
2274                // But importantly we still need to make the task dirty as it should no longer
2275                // be considered as "recomputation".
2276                make_stale = false;
2277            } else if !dependent.output_dependencies_contains(&task_id) {
2278                // output dependency has been removed, so the task doesn't depend on the
2279                // output anymore and doesn't need to be invalidated
2280                #[cfg(feature = "trace_task_output_dependencies")]
2281                span.record("result", "no backward dependency");
2282                return;
2283            }
2284            make_task_dirty_internal(
2285                dependent,
2286                dependent_task_id,
2287                make_stale,
2288                #[cfg(feature = "trace_task_dirty")]
2289                TaskDirtyCause::OutputChange { task_id },
2290                queue,
2291                ctx,
2292            );
2293            #[cfg(feature = "trace_task_output_dependencies")]
2294            span.record("result", "marked dirty");
2295        }
2296
2297        if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2298            let chunk_size = good_chunk_size(output_dependent_tasks.len());
2299            let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2300            let _ = scope_and_block(chunks.len(), |scope| {
2301                for chunk in chunks {
2302                    let child_ctx = ctx.child_context();
2303                    scope.spawn(move || {
2304                        let mut ctx = child_ctx.create();
2305                        let mut queue = AggregationUpdateQueue::new();
2306                        for dependent_task_id in chunk {
2307                            process_output_dependents(
2308                                &mut ctx,
2309                                task_id,
2310                                dependent_task_id,
2311                                &mut queue,
2312                            )
2313                        }
2314                        queue.execute(&mut ctx);
2315                    });
2316                }
2317            });
2318        } else {
2319            let mut queue = AggregationUpdateQueue::new();
2320            for dependent_task_id in output_dependent_tasks {
2321                process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2322            }
2323            queue.execute(ctx);
2324        }
2325    }
2326
2327    fn task_execution_completed_unfinished_children_dirty(
2328        &self,
2329        ctx: &mut impl ExecuteContext<'_>,
2330        new_children: &FxHashSet<TaskId>,
2331    ) {
2332        debug_assert!(!new_children.is_empty());
2333
2334        let mut queue = AggregationUpdateQueue::new();
2335        ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2336            if !child_task.has_output() {
2337                let child_id = child_task.id();
2338                make_task_dirty_internal(
2339                    child_task,
2340                    child_id,
2341                    false,
2342                    #[cfg(feature = "trace_task_dirty")]
2343                    TaskDirtyCause::InitialDirty,
2344                    &mut queue,
2345                    ctx,
2346                );
2347            }
2348        });
2349
2350        queue.execute(ctx);
2351    }
2352
2353    fn task_execution_completed_connect(
2354        &self,
2355        ctx: &mut impl ExecuteContext<'_>,
2356        task_id: TaskId,
2357        new_children: FxHashSet<TaskId>,
2358    ) -> bool {
2359        debug_assert!(!new_children.is_empty());
2360
2361        let mut task = ctx.task(task_id, TaskDataCategory::All);
2362        let Some(in_progress) = task.get_in_progress() else {
2363            panic!("Task execution completed, but task is not in progress: {task:#?}");
2364        };
2365        if matches!(in_progress, InProgressState::Canceled) {
2366            // Task was canceled in the meantime, so we don't connect the children
2367            return false;
2368        }
2369        let InProgressState::InProgress(box InProgressStateInner {
2370            #[cfg(not(feature = "no_fast_stale"))]
2371            stale,
2372            once_task: is_once_task,
2373            ..
2374        }) = in_progress
2375        else {
2376            panic!("Task execution completed, but task is not in progress: {task:#?}");
2377        };
2378
2379        // If the task is stale, reschedule it
2380        #[cfg(not(feature = "no_fast_stale"))]
2381        if *stale && !is_once_task {
2382            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2383                task.take_in_progress()
2384            else {
2385                unreachable!();
2386            };
2387            let old = task.set_in_progress(InProgressState::Scheduled {
2388                done_event,
2389                reason: TaskExecutionReason::Stale,
2390            });
2391            debug_assert!(old.is_none(), "InProgress already exists");
2392            drop(task);
2393
2394            // All `new_children` are currently hold active with an active count and we need to undo
2395            // that. (We already filtered out the old children from that list)
2396            AggregationUpdateQueue::run(
2397                AggregationUpdateJob::DecreaseActiveCounts {
2398                    task_ids: new_children.into_iter().collect(),
2399                },
2400                ctx,
2401            );
2402            return true;
2403        }
2404
2405        let has_active_count = ctx.should_track_activeness()
2406            && task
2407                .get_activeness()
2408                .is_some_and(|activeness| activeness.active_counter > 0);
2409        connect_children(
2410            ctx,
2411            task_id,
2412            task,
2413            new_children,
2414            has_active_count,
2415            ctx.should_track_activeness(),
2416        );
2417
2418        false
2419    }
2420
2421    fn task_execution_completed_finish(
2422        &self,
2423        ctx: &mut impl ExecuteContext<'_>,
2424        task_id: TaskId,
2425        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2426        new_output: Option<OutputValue>,
2427        is_now_immutable: bool,
2428    ) -> (
2429        bool,
2430        Option<
2431            auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2432        >,
2433    ) {
2434        let mut task = ctx.task(task_id, TaskDataCategory::All);
2435        let Some(in_progress) = task.take_in_progress() else {
2436            panic!("Task execution completed, but task is not in progress: {task:#?}");
2437        };
2438        if matches!(in_progress, InProgressState::Canceled) {
2439            // Task was canceled in the meantime, so we don't finish it
2440            return (false, None);
2441        }
2442        let InProgressState::InProgress(box InProgressStateInner {
2443            done_event,
2444            once_task: is_once_task,
2445            stale,
2446            session_dependent,
2447            marked_as_completed: _,
2448            new_children,
2449        }) = in_progress
2450        else {
2451            panic!("Task execution completed, but task is not in progress: {task:#?}");
2452        };
2453        debug_assert!(new_children.is_empty());
2454
2455        // If the task is stale, reschedule it
2456        if stale && !is_once_task {
2457            let old = task.set_in_progress(InProgressState::Scheduled {
2458                done_event,
2459                reason: TaskExecutionReason::Stale,
2460            });
2461            debug_assert!(old.is_none(), "InProgress already exists");
2462            return (true, None);
2463        }
2464
2465        // Set the output if it has changed
2466        let mut old_content = None;
2467        if let Some(value) = new_output {
2468            old_content = task.set_output(value);
2469        }
2470
2471        // If the task has no invalidator and has no mutable dependencies, it does not have a way
2472        // to be invalidated and we can mark it as immutable.
2473        if is_now_immutable {
2474            task.set_immutable(true);
2475        }
2476
2477        // Notify in progress cells and remove all of them
2478        let in_progress_cells = task.take_in_progress_cells();
2479        if let Some(ref cells) = in_progress_cells {
2480            for state in cells.values() {
2481                state.event.notify(usize::MAX);
2482            }
2483        }
2484
2485        // Grab the old dirty state
2486        let old_dirtyness = task.get_dirty().cloned();
2487        let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2488            None => (false, false),
2489            Some(Dirtyness::Dirty(_)) => (true, false),
2490            Some(Dirtyness::SessionDependent) => {
2491                let clean_in_current_session = task.current_session_clean();
2492                (true, clean_in_current_session)
2493            }
2494        };
2495
2496        // Compute the new dirty state
2497        let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2498            (Some(Dirtyness::SessionDependent), true, true)
2499        } else {
2500            (None, false, false)
2501        };
2502
2503        // Update the dirty state
2504        if old_dirtyness != new_dirtyness {
2505            if let Some(value) = new_dirtyness {
2506                task.set_dirty(value);
2507            } else if old_dirtyness.is_some() {
2508                task.take_dirty();
2509            }
2510        }
2511        if old_current_session_self_clean != new_current_session_self_clean {
2512            if new_current_session_self_clean {
2513                task.set_current_session_clean(true);
2514            } else if old_current_session_self_clean {
2515                task.set_current_session_clean(false);
2516            }
2517        }
2518
2519        // Propagate dirtyness changes
2520        let data_update = if old_self_dirty != new_self_dirty
2521            || old_current_session_self_clean != new_current_session_self_clean
2522        {
2523            let dirty_container_count = task
2524                .get_aggregated_dirty_container_count()
2525                .cloned()
2526                .unwrap_or_default();
2527            let current_session_clean_container_count = task
2528                .get_aggregated_current_session_clean_container_count()
2529                .copied()
2530                .unwrap_or_default();
2531            let result = ComputeDirtyAndCleanUpdate {
2532                old_dirty_container_count: dirty_container_count,
2533                new_dirty_container_count: dirty_container_count,
2534                old_current_session_clean_container_count: current_session_clean_container_count,
2535                new_current_session_clean_container_count: current_session_clean_container_count,
2536                old_self_dirty,
2537                new_self_dirty,
2538                old_current_session_self_clean,
2539                new_current_session_self_clean,
2540            }
2541            .compute();
2542            if result.dirty_count_update - result.current_session_clean_update < 0 {
2543                // The task is clean now
2544                if let Some(activeness_state) = task.get_activeness_mut() {
2545                    activeness_state.all_clean_event.notify(usize::MAX);
2546                    activeness_state.unset_active_until_clean();
2547                    if activeness_state.is_empty() {
2548                        task.take_activeness();
2549                    }
2550                }
2551            }
2552            result
2553                .aggregated_update(task_id)
2554                .and_then(|aggregated_update| {
2555                    AggregationUpdateJob::data_update(&mut task, aggregated_update)
2556                })
2557        } else {
2558            None
2559        };
2560
2561        #[cfg(feature = "verify_determinism")]
2562        let reschedule =
2563            (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2564        #[cfg(not(feature = "verify_determinism"))]
2565        let reschedule = false;
2566        if reschedule {
2567            let old = task.set_in_progress(InProgressState::Scheduled {
2568                done_event,
2569                reason: TaskExecutionReason::Stale,
2570            });
2571            debug_assert!(old.is_none(), "InProgress already exists");
2572            drop(task);
2573        } else {
2574            drop(task);
2575
2576            // Notify dependent tasks that are waiting for this task to finish
2577            done_event.notify(usize::MAX);
2578        }
2579
2580        drop(old_content);
2581
2582        if let Some(data_update) = data_update {
2583            AggregationUpdateQueue::run(data_update, ctx);
2584        }
2585
2586        // We return so the data can be dropped outside of critical sections
2587        (reschedule, in_progress_cells)
2588    }
2589
2590    fn task_execution_completed_cleanup(
2591        &self,
2592        ctx: &mut impl ExecuteContext<'_>,
2593        task_id: TaskId,
2594        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2595        is_error: bool,
2596    ) -> Vec<SharedReference> {
2597        let mut task = ctx.task(task_id, TaskDataCategory::All);
2598        let mut removed_cell_data = Vec::new();
2599        // An error is potentially caused by a eventual consistency, so we avoid updating cells
2600        // after an error as it is likely transient and we want to keep the dependent tasks
2601        // clean to avoid re-executions.
2602        if !is_error {
2603            // Remove no longer existing cells and
2604            // find all outdated data items (removed cells, outdated edges)
2605            // Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
2606            // anyway and we want to avoid needless re-executions. When the cells become
2607            // used again, they are invalidated from the update cell operation.
2608            // Remove cell data for cells that no longer exist
2609            let to_remove_persistent: Vec<_> = task
2610                .iter_persistent_cell_data()
2611                .filter_map(|(cell, _)| {
2612                    cell_counters
2613                        .get(&cell.type_id)
2614                        .is_none_or(|start_index| cell.index >= *start_index)
2615                        .then_some(*cell)
2616                })
2617                .collect();
2618
2619            // Remove transient cell data for cells that no longer exist
2620            let to_remove_transient: Vec<_> = task
2621                .iter_transient_cell_data()
2622                .filter_map(|(cell, _)| {
2623                    cell_counters
2624                        .get(&cell.type_id)
2625                        .is_none_or(|start_index| cell.index >= *start_index)
2626                        .then_some(*cell)
2627                })
2628                .collect();
2629            removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2630            for cell in to_remove_persistent {
2631                if let Some(data) = task.remove_persistent_cell_data(&cell) {
2632                    removed_cell_data.push(data.into_untyped());
2633                }
2634            }
2635            for cell in to_remove_transient {
2636                if let Some(data) = task.remove_transient_cell_data(&cell) {
2637                    removed_cell_data.push(data);
2638                }
2639            }
2640        }
2641
2642        // Clean up task storage after execution:
2643        // - Shrink collections marked with shrink_on_completion
2644        // - Drop dependency fields for immutable tasks (they'll never re-execute)
2645        task.cleanup_after_execution();
2646
2647        drop(task);
2648
2649        // Return so we can drop outside of critical sections
2650        removed_cell_data
2651    }
2652
2653    fn run_backend_job<'a>(
2654        self: &'a Arc<Self>,
2655        job: TurboTasksBackendJob,
2656        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2657    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2658        Box::pin(async move {
2659            match job {
2660                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2661                    debug_assert!(self.should_persist());
2662
2663                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2664                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2665                    let mut idle_start_listener = self.idle_start_event.listen();
2666                    let mut idle_end_listener = self.idle_end_event.listen();
2667                    let mut fresh_idle = true;
2668                    loop {
2669                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2670                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2671                        let idle_timeout = *IDLE_TIMEOUT;
2672                        let (time, mut reason) =
2673                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2674                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2675                            } else {
2676                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2677                            };
2678
2679                        let until = last_snapshot + time;
2680                        if until > Instant::now() {
2681                            let mut stop_listener = self.stopping_event.listen();
2682                            if self.stopping.load(Ordering::Acquire) {
2683                                return;
2684                            }
2685                            let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2686                                Instant::now() + idle_timeout
2687                            } else {
2688                                far_future()
2689                            };
2690                            loop {
2691                                tokio::select! {
2692                                    _ = &mut stop_listener => {
2693                                        return;
2694                                    },
2695                                    _ = &mut idle_start_listener => {
2696                                        fresh_idle = true;
2697                                        idle_time = Instant::now() + idle_timeout;
2698                                        idle_start_listener = self.idle_start_event.listen()
2699                                    },
2700                                    _ = &mut idle_end_listener => {
2701                                        idle_time = until + idle_timeout;
2702                                        idle_end_listener = self.idle_end_event.listen()
2703                                    },
2704                                    _ = tokio::time::sleep_until(until) => {
2705                                        break;
2706                                    },
2707                                    _ = tokio::time::sleep_until(idle_time) => {
2708                                        if turbo_tasks.is_idle() {
2709                                            reason = "idle timeout";
2710                                            break;
2711                                        }
2712                                    },
2713                                }
2714                            }
2715                        }
2716
2717                        let this = self.clone();
2718                        let snapshot = this.snapshot_and_persist(None, reason);
2719                        if let Some((snapshot_start, new_data)) = snapshot {
2720                            last_snapshot = snapshot_start;
2721                            if !new_data {
2722                                fresh_idle = false;
2723                                continue;
2724                            }
2725                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2726                            self.last_snapshot.store(
2727                                last_snapshot.as_millis().try_into().unwrap(),
2728                                Ordering::Relaxed,
2729                            );
2730
2731                            turbo_tasks.schedule_backend_background_job(
2732                                TurboTasksBackendJob::FollowUpSnapshot,
2733                            );
2734                            return;
2735                        }
2736                    }
2737                }
2738            }
2739        })
2740    }
2741
2742    fn try_read_own_task_cell(
2743        &self,
2744        task_id: TaskId,
2745        cell: CellId,
2746        options: ReadCellOptions,
2747        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2748    ) -> Result<TypedCellContent> {
2749        let mut ctx = self.execute_context(turbo_tasks);
2750        let task = ctx.task(task_id, TaskDataCategory::Data);
2751        if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2752            debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2753            Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2754        } else {
2755            Ok(CellContent(None).into_typed(cell.type_id))
2756        }
2757    }
2758
2759    fn read_task_collectibles(
2760        &self,
2761        task_id: TaskId,
2762        collectible_type: TraitTypeId,
2763        reader_id: Option<TaskId>,
2764        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2765    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2766        let mut ctx = self.execute_context(turbo_tasks);
2767        let mut collectibles = AutoMap::default();
2768        {
2769            let mut task = ctx.task(task_id, TaskDataCategory::All);
2770            // Ensure it's an root node
2771            loop {
2772                let aggregation_number = get_aggregation_number(&task);
2773                if is_root_node(aggregation_number) {
2774                    break;
2775                }
2776                drop(task);
2777                AggregationUpdateQueue::run(
2778                    AggregationUpdateJob::UpdateAggregationNumber {
2779                        task_id,
2780                        base_aggregation_number: u32::MAX,
2781                        distance: None,
2782                    },
2783                    &mut ctx,
2784                );
2785                task = ctx.task(task_id, TaskDataCategory::All);
2786            }
2787            for (collectible, count) in task.iter_aggregated_collectibles() {
2788                if *count > 0 && collectible.collectible_type == collectible_type {
2789                    *collectibles
2790                        .entry(RawVc::TaskCell(
2791                            collectible.cell.task,
2792                            collectible.cell.cell,
2793                        ))
2794                        .or_insert(0) += 1;
2795                }
2796            }
2797            for (&collectible, &count) in task.iter_collectibles() {
2798                if collectible.collectible_type == collectible_type {
2799                    *collectibles
2800                        .entry(RawVc::TaskCell(
2801                            collectible.cell.task,
2802                            collectible.cell.cell,
2803                        ))
2804                        .or_insert(0) += count;
2805                }
2806            }
2807            if let Some(reader_id) = reader_id {
2808                let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2809            }
2810        }
2811        if let Some(reader_id) = reader_id {
2812            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2813            let target = CollectiblesRef {
2814                task: task_id,
2815                collectible_type,
2816            };
2817            if !reader.remove_outdated_collectibles_dependencies(&target) {
2818                let _ = reader.add_collectibles_dependencies(target);
2819            }
2820        }
2821        collectibles
2822    }
2823
2824    fn emit_collectible(
2825        &self,
2826        collectible_type: TraitTypeId,
2827        collectible: RawVc,
2828        task_id: TaskId,
2829        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2830    ) {
2831        self.assert_valid_collectible(task_id, collectible);
2832
2833        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2834            panic!("Collectibles need to be resolved");
2835        };
2836        let cell = CellRef {
2837            task: collectible_task,
2838            cell,
2839        };
2840        operation::UpdateCollectibleOperation::run(
2841            task_id,
2842            CollectibleRef {
2843                collectible_type,
2844                cell,
2845            },
2846            1,
2847            self.execute_context(turbo_tasks),
2848        );
2849    }
2850
2851    fn unemit_collectible(
2852        &self,
2853        collectible_type: TraitTypeId,
2854        collectible: RawVc,
2855        count: u32,
2856        task_id: TaskId,
2857        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2858    ) {
2859        self.assert_valid_collectible(task_id, collectible);
2860
2861        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2862            panic!("Collectibles need to be resolved");
2863        };
2864        let cell = CellRef {
2865            task: collectible_task,
2866            cell,
2867        };
2868        operation::UpdateCollectibleOperation::run(
2869            task_id,
2870            CollectibleRef {
2871                collectible_type,
2872                cell,
2873            },
2874            -(i32::try_from(count).unwrap()),
2875            self.execute_context(turbo_tasks),
2876        );
2877    }
2878
2879    fn update_task_cell(
2880        &self,
2881        task_id: TaskId,
2882        cell: CellId,
2883        is_serializable_cell_content: bool,
2884        content: CellContent,
2885        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2886        verification_mode: VerificationMode,
2887        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2888    ) {
2889        operation::UpdateCellOperation::run(
2890            task_id,
2891            cell,
2892            content,
2893            is_serializable_cell_content,
2894            updated_key_hashes,
2895            verification_mode,
2896            self.execute_context(turbo_tasks),
2897        );
2898    }
2899
2900    fn mark_own_task_as_session_dependent(
2901        &self,
2902        task_id: TaskId,
2903        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2904    ) {
2905        if !self.should_track_dependencies() {
2906            // Without dependency tracking we don't need session dependent tasks
2907            return;
2908        }
2909        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2910        let mut ctx = self.execute_context(turbo_tasks);
2911        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2912        let aggregation_number = get_aggregation_number(&task);
2913        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2914            drop(task);
2915            // We want to use a high aggregation number to avoid large aggregation chains for
2916            // session dependent tasks (which change on every run)
2917            AggregationUpdateQueue::run(
2918                AggregationUpdateJob::UpdateAggregationNumber {
2919                    task_id,
2920                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2921                    distance: None,
2922                },
2923                &mut ctx,
2924            );
2925            task = ctx.task(task_id, TaskDataCategory::Meta);
2926        }
2927        if let Some(InProgressState::InProgress(box InProgressStateInner {
2928            session_dependent,
2929            ..
2930        })) = task.get_in_progress_mut()
2931        {
2932            *session_dependent = true;
2933        }
2934    }
2935
2936    fn mark_own_task_as_finished(
2937        &self,
2938        task: TaskId,
2939        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2940    ) {
2941        let mut ctx = self.execute_context(turbo_tasks);
2942        let mut task = ctx.task(task, TaskDataCategory::Data);
2943        if let Some(InProgressState::InProgress(box InProgressStateInner {
2944            marked_as_completed,
2945            ..
2946        })) = task.get_in_progress_mut()
2947        {
2948            *marked_as_completed = true;
2949            // TODO this should remove the dirty state (also check session_dependent)
2950            // but this would break some assumptions for strongly consistent reads.
2951            // Client tasks are not connected yet, so we wouldn't wait for them.
2952            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
2953        }
2954    }
2955
2956    fn set_own_task_aggregation_number(
2957        &self,
2958        task: TaskId,
2959        aggregation_number: u32,
2960        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2961    ) {
2962        let mut ctx = self.execute_context(turbo_tasks);
2963        AggregationUpdateQueue::run(
2964            AggregationUpdateJob::UpdateAggregationNumber {
2965                task_id: task,
2966                base_aggregation_number: aggregation_number,
2967                distance: None,
2968            },
2969            &mut ctx,
2970        );
2971    }
2972
2973    fn connect_task(
2974        &self,
2975        task: TaskId,
2976        parent_task: Option<TaskId>,
2977        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2978    ) {
2979        self.assert_not_persistent_calling_transient(parent_task, task, None);
2980        ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
2981    }
2982
2983    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2984        let task_id = self.transient_task_id_factory.get();
2985        {
2986            let mut task = self.storage.access_mut(task_id);
2987            task.init_transient_task(task_id, task_type, self.should_track_activeness());
2988        }
2989        #[cfg(feature = "verify_aggregation_graph")]
2990        self.root_tasks.lock().insert(task_id);
2991        task_id
2992    }
2993
2994    fn dispose_root_task(
2995        &self,
2996        task_id: TaskId,
2997        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2998    ) {
2999        #[cfg(feature = "verify_aggregation_graph")]
3000        self.root_tasks.lock().remove(&task_id);
3001
3002        let mut ctx = self.execute_context(turbo_tasks);
3003        let mut task = ctx.task(task_id, TaskDataCategory::All);
3004        let is_dirty = task.is_dirty();
3005        let has_dirty_containers = task.has_dirty_containers();
3006        if is_dirty.is_some() || has_dirty_containers {
3007            if let Some(activeness_state) = task.get_activeness_mut() {
3008                // We will finish the task, but it would be removed after the task is done
3009                activeness_state.unset_root_type();
3010                activeness_state.set_active_until_clean();
3011            };
3012        } else if let Some(activeness_state) = task.take_activeness() {
3013            // Technically nobody should be listening to this event, but just in case
3014            // we notify it anyway
3015            activeness_state.all_clean_event.notify(usize::MAX);
3016        }
3017    }
3018
3019    #[cfg(feature = "verify_aggregation_graph")]
3020    fn verify_aggregation_graph(
3021        &self,
3022        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3023        idle: bool,
3024    ) {
3025        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3026            return;
3027        }
3028        use std::{collections::VecDeque, env, io::stdout};
3029
3030        use crate::backend::operation::{get_uppers, is_aggregating_node};
3031
3032        let mut ctx = self.execute_context(turbo_tasks);
3033        let root_tasks = self.root_tasks.lock().clone();
3034
3035        for task_id in root_tasks.into_iter() {
3036            let mut queue = VecDeque::new();
3037            let mut visited = FxHashSet::default();
3038            let mut aggregated_nodes = FxHashSet::default();
3039            let mut collectibles = FxHashMap::default();
3040            let root_task_id = task_id;
3041            visited.insert(task_id);
3042            aggregated_nodes.insert(task_id);
3043            queue.push_back(task_id);
3044            let mut counter = 0;
3045            while let Some(task_id) = queue.pop_front() {
3046                counter += 1;
3047                if counter % 100000 == 0 {
3048                    println!(
3049                        "queue={}, visited={}, aggregated_nodes={}",
3050                        queue.len(),
3051                        visited.len(),
3052                        aggregated_nodes.len()
3053                    );
3054                }
3055                let task = ctx.task(task_id, TaskDataCategory::All);
3056                if idle && !self.is_idle.load(Ordering::Relaxed) {
3057                    return;
3058                }
3059
3060                let uppers = get_uppers(&task);
3061                if task_id != root_task_id
3062                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3063                {
3064                    panic!(
3065                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3066                         {:?})",
3067                        task_id,
3068                        ctx.get_task_description(task_id),
3069                        uppers
3070                    );
3071                }
3072
3073                let aggregated_collectibles: Vec<_> = task
3074                    .iter_aggregated_collectibles()
3075                    .map(|(collectible, _)| collectible)
3076                    .collect();
3077                for collectible in aggregated_collectibles {
3078                    collectibles
3079                        .entry(collectible)
3080                        .or_insert_with(|| (false, Vec::new()))
3081                        .1
3082                        .push(task_id);
3083                }
3084
3085                let own_collectibles: Vec<_> = task
3086                    .iter_collectibles_entries()
3087                    .filter_map(|(&collectible, &value)| (value > 0).then_some(collectible))
3088                    .collect::<Vec<_>>();
3089                for collectible in own_collectibles {
3090                    if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3091                        *flag = true
3092                    } else {
3093                        panic!(
3094                            "Task {} has a collectible {:?} that is not in any upper task",
3095                            task_id, collectible
3096                        );
3097                    }
3098                }
3099
3100                let is_dirty = task.has_dirty();
3101                let has_dirty_container = task.has_dirty_containers();
3102                let should_be_in_upper = is_dirty || has_dirty_container;
3103
3104                let aggregation_number = get_aggregation_number(&task);
3105                if is_aggregating_node(aggregation_number) {
3106                    aggregated_nodes.insert(task_id);
3107                }
3108                // println!(
3109                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
3110                //     ctx.get_task_description(task_id),
3111                //     uppers
3112                // );
3113
3114                for child_id in task.iter_children() {
3115                    // println!("{task_id}: child -> {child_id}");
3116                    if visited.insert(child_id) {
3117                        queue.push_back(child_id);
3118                    }
3119                }
3120                drop(task);
3121
3122                if should_be_in_upper {
3123                    for upper_id in uppers {
3124                        let task = ctx.task(upper_id, TaskDataCategory::All);
3125                        let in_upper = task
3126                            .get_aggregated_dirty_containers(&task_id)
3127                            .is_some_and(|&dirty| dirty > 0);
3128                        if !in_upper {
3129                            let containers: Vec<_> = task
3130                                .iter_aggregated_dirty_containers_entries()
3131                                .map(|(&k, &v)| (k, v))
3132                                .collect();
3133                            panic!(
3134                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3135                                 ({})\nThese dirty containers are present:\n{:#?}",
3136                                task_id,
3137                                ctx.get_task_description(task_id),
3138                                upper_id,
3139                                ctx.get_task_description(upper_id),
3140                                containers,
3141                            );
3142                        }
3143                    }
3144                }
3145            }
3146
3147            for (collectible, (flag, task_ids)) in collectibles {
3148                if !flag {
3149                    use std::io::Write;
3150                    let mut stdout = stdout().lock();
3151                    writeln!(
3152                        stdout,
3153                        "{:?} that is not emitted in any child task but in these aggregated \
3154                         tasks: {:#?}",
3155                        collectible,
3156                        task_ids
3157                            .iter()
3158                            .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
3159                            .collect::<Vec<_>>()
3160                    )
3161                    .unwrap();
3162
3163                    let task_id = collectible.cell.task;
3164                    let mut queue = {
3165                        let task = ctx.task(task_id, TaskDataCategory::All);
3166                        get_uppers(&task)
3167                    };
3168                    let mut visited = FxHashSet::default();
3169                    for &upper_id in queue.iter() {
3170                        visited.insert(upper_id);
3171                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3172                    }
3173                    while let Some(task_id) = queue.pop() {
3174                        let desc = ctx.get_task_description(task_id);
3175                        let task = ctx.task(task_id, TaskDataCategory::All);
3176                        let aggregated_collectible = task
3177                            .get_aggregated_collectibles(&collectible)
3178                            .copied()
3179                            .unwrap_or_default();
3180                        let uppers = get_uppers(&task);
3181                        drop(task);
3182                        writeln!(
3183                            stdout,
3184                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3185                        )
3186                        .unwrap();
3187                        if task_ids.contains(&task_id) {
3188                            writeln!(
3189                                stdout,
3190                                "Task has an upper connection to an aggregated task that doesn't \
3191                                 reference it. Upper connection is invalid!"
3192                            )
3193                            .unwrap();
3194                        }
3195                        for upper_id in uppers {
3196                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3197                            if !visited.contains(&upper_id) {
3198                                queue.push(upper_id);
3199                            }
3200                        }
3201                    }
3202                    panic!("See stdout for more details");
3203                }
3204            }
3205        }
3206    }
3207
3208    fn assert_not_persistent_calling_transient(
3209        &self,
3210        parent_id: Option<TaskId>,
3211        child_id: TaskId,
3212        cell_id: Option<CellId>,
3213    ) {
3214        if let Some(parent_id) = parent_id
3215            && !parent_id.is_transient()
3216            && child_id.is_transient()
3217        {
3218            self.panic_persistent_calling_transient(
3219                self.debug_get_task_description(parent_id),
3220                self.debug_get_cached_task_type(child_id).as_deref(),
3221                cell_id,
3222            );
3223        }
3224    }
3225
3226    fn panic_persistent_calling_transient(
3227        &self,
3228        parent: String,
3229        child: Option<&CachedTaskType>,
3230        cell_id: Option<CellId>,
3231    ) {
3232        let transient_reason = if let Some(child) = child {
3233            Cow::Owned(format!(
3234                " The callee is transient because it depends on:\n{}",
3235                self.debug_trace_transient_task(child, cell_id),
3236            ))
3237        } else {
3238            Cow::Borrowed("")
3239        };
3240        panic!(
3241            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3242            parent,
3243            child.map_or("unknown", |t| t.get_name()),
3244            transient_reason,
3245        );
3246    }
3247
3248    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3249        // these checks occur in a potentially hot codepath, but they're cheap
3250        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3251            // This should never happen: The collectible APIs use ResolvedVc
3252            let task_info = if let Some(col_task_ty) = collectible
3253                .try_get_task_id()
3254                .map(|t| self.debug_get_task_description(t))
3255            {
3256                Cow::Owned(format!(" (return type of {col_task_ty})"))
3257            } else {
3258                Cow::Borrowed("")
3259            };
3260            panic!("Collectible{task_info} must be a ResolvedVc")
3261        };
3262        if col_task_id.is_transient() && !task_id.is_transient() {
3263            let transient_reason =
3264                if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3265                    Cow::Owned(format!(
3266                        ". The collectible is transient because it depends on:\n{}",
3267                        self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3268                    ))
3269                } else {
3270                    Cow::Borrowed("")
3271                };
3272            // this should never happen: How would a persistent function get a transient Vc?
3273            panic!(
3274                "Collectible is transient, transient collectibles cannot be emitted from \
3275                 persistent tasks{transient_reason}",
3276            )
3277        }
3278    }
3279}
3280
3281impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3282    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3283        self.0.startup(turbo_tasks);
3284    }
3285
3286    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3287        self.0.stopping();
3288    }
3289
3290    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3291        self.0.stop(turbo_tasks);
3292    }
3293
3294    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3295        self.0.idle_start(turbo_tasks);
3296    }
3297
3298    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3299        self.0.idle_end();
3300    }
3301
3302    fn get_or_create_persistent_task(
3303        &self,
3304        task_type: CachedTaskType,
3305        parent_task: Option<TaskId>,
3306        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3307    ) -> TaskId {
3308        self.0
3309            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3310    }
3311
3312    fn get_or_create_transient_task(
3313        &self,
3314        task_type: CachedTaskType,
3315        parent_task: Option<TaskId>,
3316        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3317    ) -> TaskId {
3318        self.0
3319            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3320    }
3321
3322    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3323        self.0.invalidate_task(task_id, turbo_tasks);
3324    }
3325
3326    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3327        self.0.invalidate_tasks(tasks, turbo_tasks);
3328    }
3329
3330    fn invalidate_tasks_set(
3331        &self,
3332        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3333        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3334    ) {
3335        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3336    }
3337
3338    fn invalidate_serialization(
3339        &self,
3340        task_id: TaskId,
3341        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3342    ) {
3343        self.0.invalidate_serialization(task_id, turbo_tasks);
3344    }
3345
3346    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3347        self.0.task_execution_canceled(task, turbo_tasks)
3348    }
3349
3350    fn try_start_task_execution(
3351        &self,
3352        task_id: TaskId,
3353        priority: TaskPriority,
3354        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3355    ) -> Option<TaskExecutionSpec<'_>> {
3356        self.0
3357            .try_start_task_execution(task_id, priority, turbo_tasks)
3358    }
3359
3360    fn task_execution_completed(
3361        &self,
3362        task_id: TaskId,
3363        result: Result<RawVc, TurboTasksExecutionError>,
3364        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3365        has_invalidator: bool,
3366        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3367    ) -> bool {
3368        self.0.task_execution_completed(
3369            task_id,
3370            result,
3371            cell_counters,
3372            has_invalidator,
3373            turbo_tasks,
3374        )
3375    }
3376
3377    type BackendJob = TurboTasksBackendJob;
3378
3379    fn run_backend_job<'a>(
3380        &'a self,
3381        job: Self::BackendJob,
3382        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3383    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3384        self.0.run_backend_job(job, turbo_tasks)
3385    }
3386
3387    fn try_read_task_output(
3388        &self,
3389        task_id: TaskId,
3390        reader: Option<TaskId>,
3391        options: ReadOutputOptions,
3392        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3393    ) -> Result<Result<RawVc, EventListener>> {
3394        self.0
3395            .try_read_task_output(task_id, reader, options, turbo_tasks)
3396    }
3397
3398    fn try_read_task_cell(
3399        &self,
3400        task_id: TaskId,
3401        cell: CellId,
3402        reader: Option<TaskId>,
3403        options: ReadCellOptions,
3404        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3405    ) -> Result<Result<TypedCellContent, EventListener>> {
3406        self.0
3407            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3408    }
3409
3410    fn try_read_own_task_cell(
3411        &self,
3412        task_id: TaskId,
3413        cell: CellId,
3414        options: ReadCellOptions,
3415        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3416    ) -> Result<TypedCellContent> {
3417        self.0
3418            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3419    }
3420
3421    fn read_task_collectibles(
3422        &self,
3423        task_id: TaskId,
3424        collectible_type: TraitTypeId,
3425        reader: Option<TaskId>,
3426        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3427    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3428        self.0
3429            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3430    }
3431
3432    fn emit_collectible(
3433        &self,
3434        collectible_type: TraitTypeId,
3435        collectible: RawVc,
3436        task_id: TaskId,
3437        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3438    ) {
3439        self.0
3440            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3441    }
3442
3443    fn unemit_collectible(
3444        &self,
3445        collectible_type: TraitTypeId,
3446        collectible: RawVc,
3447        count: u32,
3448        task_id: TaskId,
3449        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3450    ) {
3451        self.0
3452            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3453    }
3454
3455    fn update_task_cell(
3456        &self,
3457        task_id: TaskId,
3458        cell: CellId,
3459        is_serializable_cell_content: bool,
3460        content: CellContent,
3461        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3462        verification_mode: VerificationMode,
3463        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3464    ) {
3465        self.0.update_task_cell(
3466            task_id,
3467            cell,
3468            is_serializable_cell_content,
3469            content,
3470            updated_key_hashes,
3471            verification_mode,
3472            turbo_tasks,
3473        );
3474    }
3475
3476    fn mark_own_task_as_finished(
3477        &self,
3478        task_id: TaskId,
3479        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3480    ) {
3481        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3482    }
3483
3484    fn set_own_task_aggregation_number(
3485        &self,
3486        task: TaskId,
3487        aggregation_number: u32,
3488        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3489    ) {
3490        self.0
3491            .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3492    }
3493
3494    fn mark_own_task_as_session_dependent(
3495        &self,
3496        task: TaskId,
3497        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3498    ) {
3499        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3500    }
3501
3502    fn connect_task(
3503        &self,
3504        task: TaskId,
3505        parent_task: Option<TaskId>,
3506        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3507    ) {
3508        self.0.connect_task(task, parent_task, turbo_tasks);
3509    }
3510
3511    fn create_transient_task(
3512        &self,
3513        task_type: TransientTaskType,
3514        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3515    ) -> TaskId {
3516        self.0.create_transient_task(task_type)
3517    }
3518
3519    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3520        self.0.dispose_root_task(task_id, turbo_tasks);
3521    }
3522
3523    fn task_statistics(&self) -> &TaskStatisticsApi {
3524        &self.0.task_statistics
3525    }
3526
3527    fn is_tracking_dependencies(&self) -> bool {
3528        self.0.options.dependency_tracking
3529    }
3530}
3531
3532enum DebugTraceTransientTask {
3533    Cached {
3534        task_name: &'static str,
3535        cell_type_id: Option<ValueTypeId>,
3536        cause_self: Option<Box<DebugTraceTransientTask>>,
3537        cause_args: Vec<DebugTraceTransientTask>,
3538    },
3539    /// This representation is used when this task is a duplicate of one previously shown
3540    Collapsed {
3541        task_name: &'static str,
3542        cell_type_id: Option<ValueTypeId>,
3543    },
3544    Uncached {
3545        cell_type_id: Option<ValueTypeId>,
3546    },
3547}
3548
3549impl DebugTraceTransientTask {
3550    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3551        let indent = "    ".repeat(level);
3552        f.write_str(&indent)?;
3553
3554        fn fmt_cell_type_id(
3555            f: &mut fmt::Formatter<'_>,
3556            cell_type_id: Option<ValueTypeId>,
3557        ) -> fmt::Result {
3558            if let Some(ty) = cell_type_id {
3559                write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3560            } else {
3561                Ok(())
3562            }
3563        }
3564
3565        // write the name and type
3566        match self {
3567            Self::Cached {
3568                task_name,
3569                cell_type_id,
3570                ..
3571            }
3572            | Self::Collapsed {
3573                task_name,
3574                cell_type_id,
3575                ..
3576            } => {
3577                f.write_str(task_name)?;
3578                fmt_cell_type_id(f, *cell_type_id)?;
3579                if matches!(self, Self::Collapsed { .. }) {
3580                    f.write_str(" (collapsed)")?;
3581                }
3582            }
3583            Self::Uncached { cell_type_id } => {
3584                f.write_str("unknown transient task")?;
3585                fmt_cell_type_id(f, *cell_type_id)?;
3586            }
3587        }
3588        f.write_char('\n')?;
3589
3590        // write any extra "cause" information we might have
3591        if let Self::Cached {
3592            cause_self,
3593            cause_args,
3594            ..
3595        } = self
3596        {
3597            if let Some(c) = cause_self {
3598                writeln!(f, "{indent}  self:")?;
3599                c.fmt_indented(f, level + 1)?;
3600            }
3601            if !cause_args.is_empty() {
3602                writeln!(f, "{indent}  args:")?;
3603                for c in cause_args {
3604                    c.fmt_indented(f, level + 1)?;
3605                }
3606            }
3607        }
3608        Ok(())
3609    }
3610}
3611
3612impl fmt::Display for DebugTraceTransientTask {
3613    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3614        self.fmt_indented(f, 0)
3615    }
3616}
3617
3618// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3619fn far_future() -> Instant {
3620    // Roughly 30 years from now.
3621    // API does not provide a way to obtain max `Instant`
3622    // or convert specific date in the future to instant.
3623    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3624    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3625}
3626
3627/// Encodes task data, using the provided buffer as a scratch space.  Returns a new exactly sized
3628/// buffer.
3629/// This allows reusing the buffer across multiple encode calls to optimize allocations and
3630/// resulting buffer sizes.
3631fn encode_task_data(
3632    task: TaskId,
3633    data: &TaskStorage,
3634    category: SpecificTaskDataCategory,
3635    scratch_buffer: &mut TurboBincodeBuffer,
3636) -> Result<TurboBincodeBuffer> {
3637    scratch_buffer.clear();
3638    let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3639    data.encode(category, &mut encoder)?;
3640
3641    if cfg!(feature = "verify_serialization") {
3642        TaskStorage::new()
3643            .decode(
3644                category,
3645                &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3646            )
3647            .with_context(|| {
3648                format!(
3649                    "expected to be able to decode serialized data for '{category:?}' information \
3650                     for {task}"
3651                )
3652            })?;
3653    }
3654    Ok(SmallVec::from_slice(scratch_buffer))
3655}