turbo_tasks_backend/backend/
mod.rs

1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6    borrow::Cow,
7    cmp::min,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    ops::Range,
13    pin::Pin,
14    sync::{
15        Arc,
16        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17    },
18};
19
20use anyhow::{Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, info_span, trace_span};
28use turbo_tasks::{
29    CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
30    ReadOutputOptions, ReadTracking, SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId,
31    TraitTypeId, TurboTasksBackendApi, ValueTypeId,
32    backend::{
33        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
34        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
35    },
36    event::{Event, EventListener},
37    message_queue::TimingEvent,
38    registry::get_value_type,
39    task_statistics::TaskStatisticsApi,
40    trace::TraceRawVcs,
41    turbo_tasks,
42    util::{IdFactoryWithReuse, good_chunk_size},
43};
44
45pub use self::{operation::AnyOperation, storage::TaskDataCategory};
46#[cfg(feature = "trace_task_dirty")]
47use crate::backend::operation::TaskDirtyCause;
48use crate::{
49    backend::{
50        operation::{
51            AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
52            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
53            Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
54            get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
55        },
56        storage::{
57            InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
58            iter_many, remove,
59        },
60    },
61    backing_storage::BackingStorage,
62    data::{
63        ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
64        CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
65        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
66    },
67    utils::{
68        bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
69        ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
70    },
71};
72
73const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
74
75struct SnapshotRequest {
76    snapshot_requested: bool,
77    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
78}
79
80impl SnapshotRequest {
81    fn new() -> Self {
82        Self {
83            snapshot_requested: false,
84            suspended_operations: FxHashSet::default(),
85        }
86    }
87}
88
89type TransientTaskOnce =
90    Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
91
92pub enum TransientTask {
93    /// A root task that will track dependencies and re-execute when
94    /// dependencies change. Task will eventually settle to the correct
95    /// execution.
96    ///
97    /// Always active. Automatically scheduled.
98    Root(TransientTaskRoot),
99
100    // TODO implement these strongly consistency
101    /// A single root task execution. It won't track dependencies.
102    /// Task will definitely include all invalidations that happened before the
103    /// start of the task. It may or may not include invalidations that
104    /// happened after that. It may see these invalidations partially
105    /// applied.
106    ///
107    /// Active until done. Automatically scheduled.
108    Once(TransientTaskOnce),
109}
110
111pub enum StorageMode {
112    /// Queries the storage for cache entries that don't exist locally.
113    ReadOnly,
114    /// Queries the storage for cache entries that don't exist locally.
115    /// Keeps a log of all changes and regularly push them to the backing storage.
116    ReadWrite,
117}
118
119pub struct BackendOptions {
120    /// Enables dependency tracking.
121    ///
122    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
123    /// forever.
124    pub dependency_tracking: bool,
125
126    /// Enables active tracking.
127    ///
128    /// Automatically disabled when `dependency_tracking` is disabled.
129    ///
130    /// When disabled: All tasks are considered as active.
131    pub active_tracking: bool,
132
133    /// Enables the backing storage.
134    pub storage_mode: Option<StorageMode>,
135
136    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
137    /// datastructures. If `None`, it will use the available parallelism.
138    pub num_workers: Option<usize>,
139
140    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
141    pub small_preallocation: bool,
142}
143
144impl Default for BackendOptions {
145    fn default() -> Self {
146        Self {
147            dependency_tracking: true,
148            active_tracking: true,
149            storage_mode: Some(StorageMode::ReadWrite),
150            num_workers: None,
151            small_preallocation: false,
152        }
153    }
154}
155
156pub enum TurboTasksBackendJob {
157    InitialSnapshot,
158    FollowUpSnapshot,
159    Prefetch {
160        data: Arc<FxIndexMap<TaskId, bool>>,
161        range: Option<Range<usize>>,
162    },
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
168
169struct TurboTasksBackendInner<B: BackingStorage> {
170    options: BackendOptions,
171
172    start_time: Instant,
173    session_id: SessionId,
174
175    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
176    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
177
178    persisted_task_cache_log: Option<TaskCacheLog>,
179    task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
180    transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
181
182    storage: Storage,
183
184    /// When true, the backing_storage has data that is not in the local storage.
185    local_is_partial: AtomicBool,
186
187    /// Number of executing operations + Highest bit is set when snapshot is
188    /// requested. When that bit is set, operations should pause until the
189    /// snapshot is completed. When the bit is set and in progress counter
190    /// reaches zero, `operations_completed_when_snapshot_requested` is
191    /// triggered.
192    in_progress_operations: AtomicUsize,
193
194    snapshot_request: Mutex<SnapshotRequest>,
195    /// Condition Variable that is triggered when `in_progress_operations`
196    /// reaches zero while snapshot is requested. All operations are either
197    /// completed or suspended.
198    operations_suspended: Condvar,
199    /// Condition Variable that is triggered when a snapshot is completed and
200    /// operations can continue.
201    snapshot_completed: Condvar,
202    /// The timestamp of the last started snapshot since [`Self::start_time`].
203    last_snapshot: AtomicU64,
204
205    stopping: AtomicBool,
206    stopping_event: Event,
207    idle_start_event: Event,
208    idle_end_event: Event,
209    #[cfg(feature = "verify_aggregation_graph")]
210    is_idle: AtomicBool,
211
212    task_statistics: TaskStatisticsApi,
213
214    backing_storage: B,
215
216    #[cfg(feature = "verify_aggregation_graph")]
217    root_tasks: Mutex<FxHashSet<TaskId>>,
218}
219
220impl<B: BackingStorage> TurboTasksBackend<B> {
221    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
222        Self(Arc::new(TurboTasksBackendInner::new(
223            options,
224            backing_storage,
225        )))
226    }
227
228    pub fn backing_storage(&self) -> &B {
229        &self.0.backing_storage
230    }
231}
232
233impl<B: BackingStorage> TurboTasksBackendInner<B> {
234    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
235        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
236        let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
237        if !options.dependency_tracking {
238            options.active_tracking = false;
239        }
240        let small_preallocation = options.small_preallocation;
241        let next_task_id = backing_storage
242            .next_free_task_id()
243            .expect("Failed to get task id");
244        Self {
245            options,
246            start_time: Instant::now(),
247            session_id: backing_storage
248                .next_session_id()
249                .expect("Failed get session id"),
250            persisted_task_id_factory: IdFactoryWithReuse::new(
251                next_task_id,
252                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
253            ),
254            transient_task_id_factory: IdFactoryWithReuse::new(
255                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
256                TaskId::MAX,
257            ),
258            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
259            task_cache: BiMap::new(),
260            transient_tasks: FxDashMap::default(),
261            local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
262            storage: Storage::new(shard_amount, small_preallocation),
263            in_progress_operations: AtomicUsize::new(0),
264            snapshot_request: Mutex::new(SnapshotRequest::new()),
265            operations_suspended: Condvar::new(),
266            snapshot_completed: Condvar::new(),
267            last_snapshot: AtomicU64::new(0),
268            stopping: AtomicBool::new(false),
269            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
270            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
271            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
272            #[cfg(feature = "verify_aggregation_graph")]
273            is_idle: AtomicBool::new(false),
274            task_statistics: TaskStatisticsApi::default(),
275            backing_storage,
276            #[cfg(feature = "verify_aggregation_graph")]
277            root_tasks: Default::default(),
278        }
279    }
280
281    fn execute_context<'a>(
282        &'a self,
283        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
284    ) -> impl ExecuteContext<'a> {
285        ExecuteContextImpl::new(self, turbo_tasks)
286    }
287
288    fn session_id(&self) -> SessionId {
289        self.session_id
290    }
291
292    /// # Safety
293    ///
294    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
295    unsafe fn execute_context_with_tx<'e, 'tx>(
296        &'e self,
297        tx: Option<&'e B::ReadTransaction<'tx>>,
298        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
299    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
300    where
301        'tx: 'e,
302    {
303        // Safety: `tx` is from `self`.
304        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
305    }
306
307    fn suspending_requested(&self) -> bool {
308        self.should_persist()
309            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
310    }
311
312    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
313        #[cold]
314        fn operation_suspend_point_cold<B: BackingStorage>(
315            this: &TurboTasksBackendInner<B>,
316            suspend: impl FnOnce() -> AnyOperation,
317        ) {
318            let operation = Arc::new(suspend());
319            let mut snapshot_request = this.snapshot_request.lock();
320            if snapshot_request.snapshot_requested {
321                snapshot_request
322                    .suspended_operations
323                    .insert(operation.clone().into());
324                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
325                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
326                if value == SNAPSHOT_REQUESTED_BIT {
327                    this.operations_suspended.notify_all();
328                }
329                this.snapshot_completed
330                    .wait_while(&mut snapshot_request, |snapshot_request| {
331                        snapshot_request.snapshot_requested
332                    });
333                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
334                snapshot_request
335                    .suspended_operations
336                    .remove(&operation.into());
337            }
338        }
339
340        if self.suspending_requested() {
341            operation_suspend_point_cold(self, suspend);
342        }
343    }
344
345    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
346        if !self.should_persist() {
347            return OperationGuard { backend: None };
348        }
349        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
350        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
351            let mut snapshot_request = self.snapshot_request.lock();
352            if snapshot_request.snapshot_requested {
353                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
354                if value == SNAPSHOT_REQUESTED_BIT {
355                    self.operations_suspended.notify_all();
356                }
357                self.snapshot_completed
358                    .wait_while(&mut snapshot_request, |snapshot_request| {
359                        snapshot_request.snapshot_requested
360                    });
361                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
362            }
363        }
364        OperationGuard {
365            backend: Some(self),
366        }
367    }
368
369    fn should_persist(&self) -> bool {
370        matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
371    }
372
373    fn should_restore(&self) -> bool {
374        self.options.storage_mode.is_some()
375    }
376
377    fn should_track_dependencies(&self) -> bool {
378        self.options.dependency_tracking
379    }
380
381    fn should_track_activeness(&self) -> bool {
382        self.options.active_tracking
383    }
384
385    fn track_cache_hit(&self, task_type: &CachedTaskType) {
386        self.task_statistics
387            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
388    }
389
390    fn track_cache_miss(&self, task_type: &CachedTaskType) {
391        self.task_statistics
392            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
393    }
394}
395
396pub(crate) struct OperationGuard<'a, B: BackingStorage> {
397    backend: Option<&'a TurboTasksBackendInner<B>>,
398}
399
400impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
401    fn drop(&mut self) {
402        if let Some(backend) = self.backend {
403            let fetch_sub = backend
404                .in_progress_operations
405                .fetch_sub(1, Ordering::AcqRel);
406            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
407                backend.operations_suspended.notify_all();
408            }
409        }
410    }
411}
412
413/// Intermediate result of step 1 of task execution completion.
414struct TaskExecutionCompletePrepareResult {
415    pub new_children: FxHashSet<TaskId>,
416    pub removed_data: Vec<CachedDataItem>,
417    pub is_now_immutable: bool,
418    #[cfg(feature = "verify_determinism")]
419    pub no_output_set: bool,
420    pub new_output: Option<OutputValue>,
421    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
422}
423
424// Operations
425impl<B: BackingStorage> TurboTasksBackendInner<B> {
426    /// # Safety
427    ///
428    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
429    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
430        &'l self,
431        tx: Option<&'l B::ReadTransaction<'tx>>,
432        parent_task: Option<TaskId>,
433        child_task: TaskId,
434        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
435    ) {
436        operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
437            self.execute_context_with_tx(tx, turbo_tasks)
438        });
439    }
440
441    fn connect_child(
442        &self,
443        parent_task: Option<TaskId>,
444        child_task: TaskId,
445        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
446    ) {
447        operation::ConnectChildOperation::run(
448            parent_task,
449            child_task,
450            self.execute_context(turbo_tasks),
451        );
452    }
453
454    fn try_read_task_output(
455        self: &Arc<Self>,
456        task_id: TaskId,
457        reader: Option<TaskId>,
458        options: ReadOutputOptions,
459        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
460    ) -> Result<Result<RawVc, EventListener>> {
461        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
462
463        let mut ctx = self.execute_context(turbo_tasks);
464        let need_reader_task = if self.should_track_dependencies()
465            && !matches!(options.tracking, ReadTracking::Untracked)
466            && reader.is_some_and(|reader_id| reader_id != task_id)
467            && let Some(reader_id) = reader
468            && reader_id != task_id
469        {
470            Some(reader_id)
471        } else {
472            None
473        };
474        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
475            // Having a task_pair here is not optimal, but otherwise this would lead to a race
476            // condition. See below.
477            // TODO(sokra): solve that in a more performant way.
478            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
479            (task, Some(reader))
480        } else {
481            (ctx.task(task_id, TaskDataCategory::All), None)
482        };
483
484        fn listen_to_done_event<B: BackingStorage>(
485            this: &TurboTasksBackendInner<B>,
486            reader: Option<TaskId>,
487            tracking: ReadTracking,
488            done_event: &Event,
489        ) -> EventListener {
490            done_event.listen_with_note(move || {
491                let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
492                move || {
493                    if let Some(reader_desc) = reader_desc.as_ref() {
494                        format!("try_read_task_output from {} ({})", reader_desc(), tracking)
495                    } else {
496                        format!("try_read_task_output ({})", tracking)
497                    }
498                }
499            })
500        }
501
502        fn check_in_progress<B: BackingStorage>(
503            this: &TurboTasksBackendInner<B>,
504            task: &impl TaskGuard,
505            reader: Option<TaskId>,
506            tracking: ReadTracking,
507            ctx: &impl ExecuteContext<'_>,
508        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
509        {
510            match get!(task, InProgress) {
511                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
512                    listen_to_done_event(this, reader, tracking, done_event),
513                ))),
514                Some(InProgressState::InProgress(box InProgressStateInner {
515                    done_event, ..
516                })) => Some(Ok(Err(listen_to_done_event(
517                    this, reader, tracking, done_event,
518                )))),
519                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
520                    "{} was canceled",
521                    ctx.get_task_description(task.id())
522                ))),
523                None => None,
524            }
525        }
526
527        if matches!(options.consistency, ReadConsistency::Strong) {
528            // Ensure it's an root node
529            loop {
530                let aggregation_number = get_aggregation_number(&task);
531                if is_root_node(aggregation_number) {
532                    break;
533                }
534                drop(task);
535                drop(reader_task);
536                {
537                    let _span = tracing::trace_span!(
538                        "make root node for strongly consistent read",
539                        %task_id
540                    )
541                    .entered();
542                    AggregationUpdateQueue::run(
543                        AggregationUpdateJob::UpdateAggregationNumber {
544                            task_id,
545                            base_aggregation_number: u32::MAX,
546                            distance: None,
547                        },
548                        &mut ctx,
549                    );
550                }
551                (task, reader_task) = if let Some(reader_id) = need_reader_task {
552                    // TODO(sokra): see comment above
553                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
554                    (task, Some(reader))
555                } else {
556                    (ctx.task(task_id, TaskDataCategory::All), None)
557                }
558            }
559
560            let is_dirty =
561                get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
562
563            // Check the dirty count of the root node
564            let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
565                .cloned()
566                .unwrap_or_default()
567                .get(self.session_id);
568            if dirty_tasks > 0 || is_dirty {
569                let activeness = get_mut!(task, Activeness);
570                let mut task_ids_to_schedule: Vec<_> = Vec::new();
571                // When there are dirty task, subscribe to the all_clean_event
572                let activeness = if let Some(activeness) = activeness {
573                    // This makes sure all tasks stay active and this task won't stale.
574                    // active_until_clean is automatically removed when this
575                    // task is clean.
576                    activeness.set_active_until_clean();
577                    activeness
578                } else {
579                    // If we don't have a root state, add one. This also makes sure all tasks stay
580                    // active and this task won't stale. active_until_clean
581                    // is automatically removed when this task is clean.
582                    get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
583                        .set_active_until_clean();
584                    if ctx.should_track_activeness() {
585                        // A newly added Activeness need to make sure to schedule the tasks
586                        task_ids_to_schedule = get_many!(
587                            task,
588                            AggregatedDirtyContainer {
589                                task
590                            } count if count.get(self.session_id) > 0 => {
591                                task
592                            }
593                        );
594                        task_ids_to_schedule.push(task_id);
595                    }
596                    get!(task, Activeness).unwrap()
597                };
598                let listener = activeness.all_clean_event.listen_with_note(move || {
599                    let this = self.clone();
600                    let tt = turbo_tasks.pin();
601                    move || {
602                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
603                        let mut ctx = this.execute_context(tt);
604                        let mut visited = FxHashSet::default();
605                        fn indent(s: &str) -> String {
606                            s.split_inclusive('\n')
607                                .flat_map(|line: &str| ["  ", line].into_iter())
608                                .collect::<String>()
609                        }
610                        fn get_info(
611                            ctx: &mut impl ExecuteContext<'_>,
612                            task_id: TaskId,
613                            parent_and_count: Option<(TaskId, i32)>,
614                            visited: &mut FxHashSet<TaskId>,
615                        ) -> String {
616                            let task = ctx.task(task_id, TaskDataCategory::Data);
617                            let is_dirty = get!(task, Dirty)
618                                .map_or(false, |dirty_state| dirty_state.get(ctx.session_id()));
619                            let in_progress =
620                                get!(task, InProgress).map_or("not in progress", |p| match p {
621                                    InProgressState::InProgress(_) => "in progress",
622                                    InProgressState::Scheduled { .. } => "scheduled",
623                                    InProgressState::Canceled => "canceled",
624                                });
625                            let activeness = get!(task, Activeness).map_or_else(
626                                || "not active".to_string(),
627                                |activeness| format!("{activeness:?}"),
628                            );
629                            let aggregation_number = get_aggregation_number(&task);
630                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
631                            {
632                                let uppers = get_uppers(&task);
633                                !uppers.contains(&parent_task_id)
634                            } else {
635                                false
636                            };
637
638                            // Check the dirty count of the root node
639                            let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
640                                .cloned()
641                                .unwrap_or_default()
642                                .get(ctx.session_id());
643
644                            let task_description = ctx.get_task_description(task_id);
645                            let is_dirty = if is_dirty { ", dirty" } else { "" };
646                            let count = if let Some((_, count)) = parent_and_count {
647                                format!(" {count}")
648                            } else {
649                                String::new()
650                            };
651                            let mut info = format!(
652                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
653                                 {in_progress}, {activeness}{is_dirty})",
654                            );
655                            let children: Vec<_> = iter_many!(
656                                task,
657                                AggregatedDirtyContainer {
658                                    task
659                                } count => {
660                                    (task, count.get(ctx.session_id()))
661                                }
662                            )
663                            .filter(|(_, count)| *count > 0)
664                            .collect();
665                            drop(task);
666
667                            if missing_upper {
668                                info.push_str("\n  ERROR: missing upper connection");
669                            }
670
671                            if dirty_tasks > 0 || !children.is_empty() {
672                                writeln!(info, "\n  {dirty_tasks} dirty tasks:").unwrap();
673
674                                for (child_task_id, count) in children {
675                                    let task_description = ctx.get_task_description(child_task_id);
676                                    if visited.insert(child_task_id) {
677                                        let child_info = get_info(
678                                            ctx,
679                                            child_task_id,
680                                            Some((task_id, count)),
681                                            visited,
682                                        );
683                                        info.push_str(&indent(&child_info));
684                                        if !info.ends_with('\n') {
685                                            info.push('\n');
686                                        }
687                                    } else {
688                                        writeln!(
689                                            info,
690                                            "  {child_task_id} {task_description} {count} \
691                                             (already visited)"
692                                        )
693                                        .unwrap();
694                                    }
695                                }
696                            }
697                            info
698                        }
699                        let info = get_info(&mut ctx, task_id, None, &mut visited);
700                        format!(
701                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
702                        )
703                    }
704                });
705                drop(reader_task);
706                drop(task);
707                if !task_ids_to_schedule.is_empty() {
708                    let mut queue = AggregationUpdateQueue::new();
709                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
710                    queue.execute(&mut ctx);
711                }
712
713                return Ok(Err(listener));
714            }
715        }
716
717        if let Some(value) = check_in_progress(self, &task, reader, options.tracking, &ctx) {
718            return value;
719        }
720
721        if let Some(output) = get!(task, Output) {
722            let result = match output {
723                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
724                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
725                OutputValue::Error(error) => Err(error
726                    .with_task_context(ctx.get_task_description(task_id), Some(task_id))
727                    .into()),
728            };
729            if let Some(mut reader_task) = reader_task
730                && options.tracking.should_track(result.is_err())
731                && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
732            {
733                #[cfg(feature = "trace_task_output_dependencies")]
734                let _span = tracing::trace_span!(
735                    "add output dependency",
736                    task = %task_id,
737                    dependent_task = ?reader
738                )
739                .entered();
740                let _ = task.add(CachedDataItem::OutputDependent {
741                    task: reader.unwrap(),
742                    value: (),
743                });
744                drop(task);
745
746                // Note: We use `task_pair` earlier to lock the task and its reader at the same
747                // time. If we didn't and just locked the reader here, an invalidation could occur
748                // between grabbing the locks. If that happened, and if the task is "outdated" or
749                // doesn't have the dependency edge yet, the invalidation would be lost.
750
751                if reader_task
752                    .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
753                    .is_none()
754                {
755                    let _ = reader_task.add(CachedDataItem::OutputDependency {
756                        target: task_id,
757                        value: (),
758                    });
759                }
760            }
761
762            return result;
763        }
764        drop(reader_task);
765
766        let note = move || {
767            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
768            move || {
769                if let Some(reader_desc) = reader_desc.as_ref() {
770                    format!("try_read_task_output (recompute) from {}", (reader_desc)())
771                } else {
772                    "try_read_task_output (recompute, untracked)".to_string()
773                }
774            }
775        };
776
777        // Output doesn't exist. We need to schedule the task to compute it.
778        let (item, listener) = CachedDataItem::new_scheduled_with_listener(
779            TaskExecutionReason::OutputNotAvailable,
780            || self.get_task_desc_fn(task_id),
781            note,
782        );
783        // It's not possible that the task is InProgress at this point. If it is InProgress {
784        // done: true } it must have Output and would early return.
785        task.add_new(item);
786        ctx.schedule_task(task);
787
788        Ok(Err(listener))
789    }
790
791    fn try_read_task_cell(
792        &self,
793        task_id: TaskId,
794        reader: Option<TaskId>,
795        cell: CellId,
796        options: ReadCellOptions,
797        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
798    ) -> Result<Result<TypedCellContent, EventListener>> {
799        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
800
801        fn add_cell_dependency(
802            task_id: TaskId,
803            mut task: impl TaskGuard,
804            reader: Option<TaskId>,
805            reader_task: Option<impl TaskGuard>,
806            cell: CellId,
807        ) {
808            if let Some(mut reader_task) = reader_task
809                && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
810            {
811                let _ = task.add(CachedDataItem::CellDependent {
812                    cell,
813                    task: reader.unwrap(),
814                    value: (),
815                });
816                drop(task);
817
818                // Note: We use `task_pair` earlier to lock the task and its reader at the same
819                // time. If we didn't and just locked the reader here, an invalidation could occur
820                // between grabbing the locks. If that happened, and if the task is "outdated" or
821                // doesn't have the dependency edge yet, the invalidation would be lost.
822
823                let target = CellRef {
824                    task: task_id,
825                    cell,
826                };
827                if reader_task
828                    .remove(&CachedDataItemKey::OutdatedCellDependency { target })
829                    .is_none()
830                {
831                    let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
832                }
833            }
834        }
835
836        let mut ctx = self.execute_context(turbo_tasks);
837        let (mut task, reader_task) = if self.should_track_dependencies()
838            && !matches!(options.tracking, ReadTracking::Untracked)
839            && let Some(reader_id) = reader
840            && reader_id != task_id
841        {
842            // Having a task_pair here is not optimal, but otherwise this would lead to a race
843            // condition. See below.
844            // TODO(sokra): solve that in a more performant way.
845            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::Data);
846            (task, Some(reader))
847        } else {
848            (ctx.task(task_id, TaskDataCategory::Data), None)
849        };
850
851        let content = if options.final_read_hint {
852            remove!(task, CellData { cell })
853        } else if let Some(content) = get!(task, CellData { cell }) {
854            let content = content.clone();
855            Some(content)
856        } else {
857            None
858        };
859        if let Some(content) = content {
860            if options.tracking.should_track(false) {
861                add_cell_dependency(task_id, task, reader, reader_task, cell);
862            }
863            return Ok(Ok(TypedCellContent(
864                cell.type_id,
865                CellContent(Some(content.reference)),
866            )));
867        }
868
869        let in_progress = get!(task, InProgress);
870        if matches!(
871            in_progress,
872            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
873        ) {
874            return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
875        }
876        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
877
878        // Check cell index range (cell might not exist at all)
879        let max_id = get!(
880            task,
881            CellTypeMaxIndex {
882                cell_type: cell.type_id
883            }
884        )
885        .copied();
886        let Some(max_id) = max_id else {
887            if options.tracking.should_track(true) {
888                add_cell_dependency(task_id, task, reader, reader_task, cell);
889            }
890            bail!(
891                "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
892                ctx.get_task_description(task_id)
893            );
894        };
895        if cell.index >= max_id {
896            if options.tracking.should_track(true) {
897                add_cell_dependency(task_id, task, reader, reader_task, cell);
898            }
899            bail!(
900                "Cell {cell:?} no longer exists in task {} (index out of bounds)",
901                ctx.get_task_description(task_id)
902            );
903        }
904        drop(reader_task);
905
906        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
907        // task the get the cell content.
908
909        // Listen to the cell and potentially schedule the task
910        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
911        if !new_listener {
912            return Ok(Err(listener));
913        }
914
915        let _span = tracing::trace_span!(
916            "recomputation",
917            cell_type = get_value_type(cell.type_id).global_name,
918            cell_index = cell.index
919        )
920        .entered();
921
922        // Schedule the task, if not already scheduled
923        if is_cancelled {
924            bail!("{} was canceled", ctx.get_task_description(task_id));
925        }
926        task.add_new(CachedDataItem::new_scheduled(
927            TaskExecutionReason::CellNotAvailable,
928            || self.get_task_desc_fn(task_id),
929        ));
930        ctx.schedule_task(task);
931
932        Ok(Err(listener))
933    }
934
935    fn listen_to_cell(
936        &self,
937        task: &mut impl TaskGuard,
938        task_id: TaskId,
939        reader: Option<TaskId>,
940        cell: CellId,
941    ) -> (EventListener, bool) {
942        let note = move || {
943            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
944            move || {
945                if let Some(reader_desc) = reader_desc.as_ref() {
946                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
947                } else {
948                    "try_read_task_cell (in progress, untracked)".to_string()
949                }
950            }
951        };
952        if let Some(in_progress) = get!(task, InProgressCell { cell }) {
953            // Someone else is already computing the cell
954            let listener = in_progress.event.listen_with_note(note);
955            return (listener, false);
956        }
957        let in_progress = InProgressCellState::new(task_id, cell);
958        let listener = in_progress.event.listen_with_note(note);
959        task.add_new(CachedDataItem::InProgressCell {
960            cell,
961            value: in_progress,
962        });
963        (listener, true)
964    }
965
966    fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
967        if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
968            return Some(task_type);
969        }
970        if self.should_restore()
971            && self.local_is_partial.load(Ordering::Acquire)
972            && !task_id.is_transient()
973            && let Some(task_type) = unsafe {
974                self.backing_storage
975                    .reverse_lookup_task_cache(None, task_id)
976                    .expect("Failed to lookup task type")
977            }
978        {
979            let _ = self.task_cache.try_insert(task_type.clone(), task_id);
980            return Some(task_type);
981        }
982        None
983    }
984
985    fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
986        let task_type = self.lookup_task_type(task_id);
987        move || {
988            task_type.as_ref().map_or_else(
989                || format!("{task_id:?} transient"),
990                |task_type| format!("{task_id:?} {task_type}"),
991            )
992        }
993    }
994
995    fn snapshot(&self) -> Option<(Instant, bool)> {
996        let start = Instant::now();
997        debug_assert!(self.should_persist());
998        let mut snapshot_request = self.snapshot_request.lock();
999        snapshot_request.snapshot_requested = true;
1000        let active_operations = self
1001            .in_progress_operations
1002            .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1003        if active_operations != 0 {
1004            self.operations_suspended
1005                .wait_while(&mut snapshot_request, |_| {
1006                    self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
1007                });
1008        }
1009        let suspended_operations = snapshot_request
1010            .suspended_operations
1011            .iter()
1012            .map(|op| op.arc().clone())
1013            .collect::<Vec<_>>();
1014        drop(snapshot_request);
1015        self.storage.start_snapshot();
1016        let mut persisted_task_cache_log = self
1017            .persisted_task_cache_log
1018            .as_ref()
1019            .map(|l| l.take(|i| i))
1020            .unwrap_or_default();
1021        let mut snapshot_request = self.snapshot_request.lock();
1022        snapshot_request.snapshot_requested = false;
1023        self.in_progress_operations
1024            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1025        self.snapshot_completed.notify_all();
1026        let snapshot_time = Instant::now();
1027        drop(snapshot_request);
1028
1029        let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
1030            if task_id.is_transient() {
1031                return (None, None);
1032            }
1033            let len = inner.len();
1034
1035            let meta_restored = inner.state().meta_restored();
1036            let data_restored = inner.state().data_restored();
1037
1038            let mut meta = meta_restored.then(|| Vec::with_capacity(len));
1039            let mut data = data_restored.then(|| Vec::with_capacity(len));
1040            for (key, value) in inner.iter_all() {
1041                if key.is_persistent() && value.is_persistent() {
1042                    match key.category() {
1043                        TaskDataCategory::Meta => {
1044                            if let Some(meta) = &mut meta {
1045                                meta.push(CachedDataItem::from_key_and_value_ref(key, value))
1046                            }
1047                        }
1048                        TaskDataCategory::Data => {
1049                            if let Some(data) = &mut data {
1050                                data.push(CachedDataItem::from_key_and_value_ref(key, value))
1051                            }
1052                        }
1053                        _ => {}
1054                    }
1055                }
1056            }
1057
1058            (meta, data)
1059        };
1060        let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
1061            (
1062                task_id,
1063                meta.map(|d| self.backing_storage.serialize(task_id, &d)),
1064                data.map(|d| self.backing_storage.serialize(task_id, &d)),
1065            )
1066        };
1067        let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
1068            if task_id.is_transient() {
1069                return (task_id, None, None);
1070            }
1071            let len = inner.len();
1072            let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
1073            let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
1074            for (key, value) in inner.iter_all() {
1075                if key.is_persistent() && value.is_persistent() {
1076                    match key.category() {
1077                        TaskDataCategory::Meta => {
1078                            if let Some(meta) = &mut meta {
1079                                meta.push(CachedDataItem::from_key_and_value_ref(key, value));
1080                            }
1081                        }
1082                        TaskDataCategory::Data => {
1083                            if let Some(data) = &mut data {
1084                                data.push(CachedDataItem::from_key_and_value_ref(key, value));
1085                            }
1086                        }
1087                        _ => {}
1088                    }
1089                }
1090            }
1091            (
1092                task_id,
1093                meta.map(|meta| self.backing_storage.serialize(task_id, &meta)),
1094                data.map(|data| self.backing_storage.serialize(task_id, &data)),
1095            )
1096        };
1097
1098        let snapshot = {
1099            let _span = tracing::trace_span!("take snapshot");
1100            self.storage
1101                .take_snapshot(&preprocess, &process, &process_snapshot)
1102        };
1103
1104        #[cfg(feature = "print_cache_item_size")]
1105        #[derive(Default)]
1106        struct TaskCacheStats {
1107            data: usize,
1108            data_count: usize,
1109            meta: usize,
1110            meta_count: usize,
1111        }
1112        #[cfg(feature = "print_cache_item_size")]
1113        impl TaskCacheStats {
1114            fn add_data(&mut self, len: usize) {
1115                self.data += len;
1116                self.data_count += 1;
1117            }
1118
1119            fn add_meta(&mut self, len: usize) {
1120                self.meta += len;
1121                self.meta_count += 1;
1122            }
1123        }
1124        #[cfg(feature = "print_cache_item_size")]
1125        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1126            Mutex::new(FxHashMap::default());
1127
1128        let task_snapshots = snapshot
1129            .into_iter()
1130            .filter_map(|iter| {
1131                let mut iter = iter
1132                    .filter_map(
1133                        |(task_id, meta, data): (
1134                            _,
1135                            Option<Result<SmallVec<_>>>,
1136                            Option<Result<SmallVec<_>>>,
1137                        )| {
1138                            let meta = match meta {
1139                                Some(Ok(meta)) => {
1140                                    #[cfg(feature = "print_cache_item_size")]
1141                                    task_cache_stats
1142                                        .lock()
1143                                        .entry(self.get_task_description(task_id))
1144                                        .or_default()
1145                                        .add_meta(meta.len());
1146                                    Some(meta)
1147                                }
1148                                None => None,
1149                                Some(Err(err)) => {
1150                                    println!(
1151                                        "Serializing task {} failed (meta): {:?}",
1152                                        self.get_task_description(task_id),
1153                                        err
1154                                    );
1155                                    None
1156                                }
1157                            };
1158                            let data = match data {
1159                                Some(Ok(data)) => {
1160                                    #[cfg(feature = "print_cache_item_size")]
1161                                    task_cache_stats
1162                                        .lock()
1163                                        .entry(self.get_task_description(task_id))
1164                                        .or_default()
1165                                        .add_data(data.len());
1166                                    Some(data)
1167                                }
1168                                None => None,
1169                                Some(Err(err)) => {
1170                                    println!(
1171                                        "Serializing task {} failed (data): {:?}",
1172                                        self.get_task_description(task_id),
1173                                        err
1174                                    );
1175                                    None
1176                                }
1177                            };
1178                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1179                        },
1180                    )
1181                    .peekable();
1182                iter.peek().is_some().then_some(iter)
1183            })
1184            .collect::<Vec<_>>();
1185
1186        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1187
1188        let mut new_items = false;
1189
1190        if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
1191            new_items = true;
1192            if let Err(err) = self.backing_storage.save_snapshot(
1193                self.session_id,
1194                suspended_operations,
1195                persisted_task_cache_log,
1196                task_snapshots,
1197            ) {
1198                println!("Persisting failed: {err:?}");
1199                return None;
1200            }
1201            #[cfg(feature = "print_cache_item_size")]
1202            {
1203                let mut task_cache_stats = task_cache_stats
1204                    .into_inner()
1205                    .into_iter()
1206                    .collect::<Vec<_>>();
1207                if !task_cache_stats.is_empty() {
1208                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1209                        (stats_b.data + stats_b.meta, key_b)
1210                            .cmp(&(stats_a.data + stats_a.meta, key_a))
1211                    });
1212                    println!("Task cache stats:");
1213                    for (task_desc, stats) in task_cache_stats {
1214                        use std::ops::Div;
1215
1216                        use turbo_tasks::util::FormatBytes;
1217
1218                        println!(
1219                            "  {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1220                            FormatBytes(stats.data + stats.meta),
1221                            FormatBytes(stats.meta),
1222                            stats.meta_count,
1223                            FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1224                            FormatBytes(stats.data),
1225                            stats.data_count,
1226                            FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1227                        );
1228                    }
1229                }
1230            }
1231        }
1232
1233        if new_items {
1234            let elapsed = start.elapsed();
1235            // avoid spamming the event queue with information about fast operations
1236            if elapsed > Duration::from_secs(10) {
1237                turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1238                    "Finished writing to filesystem cache".to_string(),
1239                    elapsed,
1240                )));
1241            }
1242        }
1243
1244        Some((snapshot_time, new_items))
1245    }
1246
1247    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1248        if self.should_restore() {
1249            // Continue all uncompleted operations
1250            // They can't be interrupted by a snapshot since the snapshotting job has not been
1251            // scheduled yet.
1252            let uncompleted_operations = self
1253                .backing_storage
1254                .uncompleted_operations()
1255                .expect("Failed to get uncompleted operations");
1256            if !uncompleted_operations.is_empty() {
1257                let mut ctx = self.execute_context(turbo_tasks);
1258                for op in uncompleted_operations {
1259                    op.execute(&mut ctx);
1260                }
1261            }
1262        }
1263
1264        if self.should_persist() {
1265            // Schedule the snapshot job
1266            let _span = Span::none().entered();
1267            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1268        }
1269    }
1270
1271    fn stopping(&self) {
1272        self.stopping.store(true, Ordering::Release);
1273        self.stopping_event.notify(usize::MAX);
1274    }
1275
1276    #[allow(unused_variables)]
1277    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1278        #[cfg(feature = "verify_aggregation_graph")]
1279        {
1280            self.is_idle.store(false, Ordering::Release);
1281            self.verify_aggregation_graph(turbo_tasks, false);
1282        }
1283        if self.should_persist() {
1284            let _span = tracing::info_span!("persist on stop").entered();
1285            self.snapshot();
1286        }
1287        self.task_cache.drop_contents();
1288        drop_contents(&self.transient_tasks);
1289        self.storage.drop_contents();
1290        if let Err(err) = self.backing_storage.shutdown() {
1291            println!("Shutting down failed: {err}");
1292        }
1293    }
1294
1295    #[allow(unused_variables)]
1296    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1297        self.idle_start_event.notify(usize::MAX);
1298
1299        #[cfg(feature = "verify_aggregation_graph")]
1300        {
1301            use tokio::select;
1302
1303            self.is_idle.store(true, Ordering::Release);
1304            let this = self.clone();
1305            let turbo_tasks = turbo_tasks.pin();
1306            tokio::task::spawn(async move {
1307                select! {
1308                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1309                        // do nothing
1310                    }
1311                    _ = this.idle_end_event.listen() => {
1312                        return;
1313                    }
1314                }
1315                if !this.is_idle.load(Ordering::Relaxed) {
1316                    return;
1317                }
1318                this.verify_aggregation_graph(&*turbo_tasks, true);
1319            });
1320        }
1321    }
1322
1323    fn idle_end(&self) {
1324        #[cfg(feature = "verify_aggregation_graph")]
1325        self.is_idle.store(false, Ordering::Release);
1326        self.idle_end_event.notify(usize::MAX);
1327    }
1328
1329    fn get_or_create_persistent_task(
1330        &self,
1331        task_type: CachedTaskType,
1332        parent_task: Option<TaskId>,
1333        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1334    ) -> TaskId {
1335        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1336            self.track_cache_hit(&task_type);
1337            self.connect_child(parent_task, task_id, turbo_tasks);
1338            return task_id;
1339        }
1340
1341        let check_backing_storage =
1342            self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1343        let tx = check_backing_storage
1344            .then(|| self.backing_storage.start_read_transaction())
1345            .flatten();
1346        let task_id = {
1347            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1348            if let Some(task_id) = unsafe {
1349                check_backing_storage
1350                    .then(|| {
1351                        self.backing_storage
1352                            .forward_lookup_task_cache(tx.as_ref(), &task_type)
1353                            .expect("Failed to lookup task id")
1354                    })
1355                    .flatten()
1356            } {
1357                self.track_cache_hit(&task_type);
1358                let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1359                task_id
1360            } else {
1361                let task_type = Arc::new(task_type);
1362                let task_id = self.persisted_task_id_factory.get();
1363                let task_id = if let Err(existing_task_id) =
1364                    self.task_cache.try_insert(task_type.clone(), task_id)
1365                {
1366                    self.track_cache_hit(&task_type);
1367                    // Safety: We just created the id and failed to insert it.
1368                    unsafe {
1369                        self.persisted_task_id_factory.reuse(task_id);
1370                    }
1371                    existing_task_id
1372                } else {
1373                    self.track_cache_miss(&task_type);
1374                    task_id
1375                };
1376                if let Some(log) = &self.persisted_task_cache_log {
1377                    log.lock(task_id).push((task_type, task_id));
1378                }
1379                task_id
1380            }
1381        };
1382
1383        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1384        unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1385
1386        task_id
1387    }
1388
1389    fn get_or_create_transient_task(
1390        &self,
1391        task_type: CachedTaskType,
1392        parent_task: Option<TaskId>,
1393        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1394    ) -> TaskId {
1395        if let Some(parent_task) = parent_task
1396            && !parent_task.is_transient()
1397        {
1398            self.panic_persistent_calling_transient(
1399                self.lookup_task_type(parent_task).as_deref(),
1400                Some(&task_type),
1401                /* cell_id */ None,
1402            );
1403        }
1404        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1405            self.track_cache_hit(&task_type);
1406            self.connect_child(parent_task, task_id, turbo_tasks);
1407            return task_id;
1408        }
1409
1410        let task_type = Arc::new(task_type);
1411        let task_id = self.transient_task_id_factory.get();
1412        if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1413            self.track_cache_hit(&task_type);
1414            // Safety: We just created the id and failed to insert it.
1415            unsafe {
1416                self.transient_task_id_factory.reuse(task_id);
1417            }
1418            self.connect_child(parent_task, existing_task_id, turbo_tasks);
1419            return existing_task_id;
1420        }
1421
1422        self.track_cache_miss(&task_type);
1423        self.connect_child(parent_task, task_id, turbo_tasks);
1424
1425        task_id
1426    }
1427
1428    /// Generate an object that implements [`fmt::Display`] explaining why the given
1429    /// [`CachedTaskType`] is transient.
1430    fn debug_trace_transient_task(
1431        &self,
1432        task_type: &CachedTaskType,
1433        cell_id: Option<CellId>,
1434    ) -> DebugTraceTransientTask {
1435        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1436        // from tracing the same task many times, so use a visited_set
1437        fn inner_id(
1438            backend: &TurboTasksBackendInner<impl BackingStorage>,
1439            task_id: TaskId,
1440            cell_type_id: Option<ValueTypeId>,
1441            visited_set: &mut FxHashSet<TaskId>,
1442        ) -> DebugTraceTransientTask {
1443            if let Some(task_type) = backend.lookup_task_type(task_id) {
1444                if visited_set.contains(&task_id) {
1445                    let task_name = task_type.get_name();
1446                    DebugTraceTransientTask::Collapsed {
1447                        task_name,
1448                        cell_type_id,
1449                    }
1450                } else {
1451                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1452                }
1453            } else {
1454                DebugTraceTransientTask::Uncached { cell_type_id }
1455            }
1456        }
1457        fn inner_cached(
1458            backend: &TurboTasksBackendInner<impl BackingStorage>,
1459            task_type: &CachedTaskType,
1460            cell_type_id: Option<ValueTypeId>,
1461            visited_set: &mut FxHashSet<TaskId>,
1462        ) -> DebugTraceTransientTask {
1463            let task_name = task_type.get_name();
1464
1465            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1466                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1467                    // `task_id` should never be `None` at this point, as that would imply a
1468                    // non-local task is returning a local `Vc`...
1469                    // Just ignore if it happens, as we're likely already panicking.
1470                    return None;
1471                };
1472                if task_id.is_transient() {
1473                    Some(Box::new(inner_id(
1474                        backend,
1475                        task_id,
1476                        cause_self_raw_vc.try_get_type_id(),
1477                        visited_set,
1478                    )))
1479                } else {
1480                    None
1481                }
1482            });
1483            let cause_args = task_type
1484                .arg
1485                .get_raw_vcs()
1486                .into_iter()
1487                .filter_map(|raw_vc| {
1488                    let Some(task_id) = raw_vc.try_get_task_id() else {
1489                        // `task_id` should never be `None` (see comment above)
1490                        return None;
1491                    };
1492                    if !task_id.is_transient() {
1493                        return None;
1494                    }
1495                    Some((task_id, raw_vc.try_get_type_id()))
1496                })
1497                .collect::<IndexSet<_>>() // dedupe
1498                .into_iter()
1499                .map(|(task_id, cell_type_id)| {
1500                    inner_id(backend, task_id, cell_type_id, visited_set)
1501                })
1502                .collect();
1503
1504            DebugTraceTransientTask::Cached {
1505                task_name,
1506                cell_type_id,
1507                cause_self,
1508                cause_args,
1509            }
1510        }
1511        inner_cached(
1512            self,
1513            task_type,
1514            cell_id.map(|c| c.type_id),
1515            &mut FxHashSet::default(),
1516        )
1517    }
1518
1519    fn invalidate_task(
1520        &self,
1521        task_id: TaskId,
1522        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1523    ) {
1524        if !self.should_track_dependencies() {
1525            panic!("Dependency tracking is disabled so invalidation is not allowed");
1526        }
1527        operation::InvalidateOperation::run(
1528            smallvec![task_id],
1529            #[cfg(feature = "trace_task_dirty")]
1530            TaskDirtyCause::Invalidator,
1531            self.execute_context(turbo_tasks),
1532        );
1533    }
1534
1535    fn invalidate_tasks(
1536        &self,
1537        tasks: &[TaskId],
1538        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1539    ) {
1540        if !self.should_track_dependencies() {
1541            panic!("Dependency tracking is disabled so invalidation is not allowed");
1542        }
1543        operation::InvalidateOperation::run(
1544            tasks.iter().copied().collect(),
1545            #[cfg(feature = "trace_task_dirty")]
1546            TaskDirtyCause::Unknown,
1547            self.execute_context(turbo_tasks),
1548        );
1549    }
1550
1551    fn invalidate_tasks_set(
1552        &self,
1553        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1554        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1555    ) {
1556        if !self.should_track_dependencies() {
1557            panic!("Dependency tracking is disabled so invalidation is not allowed");
1558        }
1559        operation::InvalidateOperation::run(
1560            tasks.iter().copied().collect(),
1561            #[cfg(feature = "trace_task_dirty")]
1562            TaskDirtyCause::Unknown,
1563            self.execute_context(turbo_tasks),
1564        );
1565    }
1566
1567    fn invalidate_serialization(
1568        &self,
1569        task_id: TaskId,
1570        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1571    ) {
1572        if task_id.is_transient() {
1573            return;
1574        }
1575        let mut ctx = self.execute_context(turbo_tasks);
1576        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1577        task.invalidate_serialization();
1578    }
1579
1580    fn get_task_description(&self, task_id: TaskId) -> String {
1581        self.lookup_task_type(task_id).map_or_else(
1582            || format!("{task_id:?} transient"),
1583            |task_type| task_type.to_string(),
1584        )
1585    }
1586
1587    fn task_execution_canceled(
1588        &self,
1589        task_id: TaskId,
1590        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1591    ) {
1592        let mut ctx = self.execute_context(turbo_tasks);
1593        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1594        if let Some(in_progress) = remove!(task, InProgress) {
1595            match in_progress {
1596                InProgressState::Scheduled {
1597                    done_event,
1598                    reason: _,
1599                } => done_event.notify(usize::MAX),
1600                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1601                    done_event.notify(usize::MAX)
1602                }
1603                InProgressState::Canceled => {}
1604            }
1605        }
1606        task.add_new(CachedDataItem::InProgress {
1607            value: InProgressState::Canceled,
1608        });
1609    }
1610
1611    fn try_start_task_execution(
1612        &self,
1613        task_id: TaskId,
1614        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1615    ) -> Option<TaskExecutionSpec<'_>> {
1616        enum TaskType {
1617            Cached(Arc<CachedTaskType>),
1618            Transient(Arc<TransientTask>),
1619        }
1620        let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1621            (TaskType::Cached(task_type), false)
1622        } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1623            (
1624                TaskType::Transient(task_type.clone()),
1625                matches!(**task_type, TransientTask::Once(_)),
1626            )
1627        } else {
1628            return None;
1629        };
1630        let execution_reason;
1631        {
1632            let mut ctx = self.execute_context(turbo_tasks);
1633            let mut task = ctx.task(task_id, TaskDataCategory::All);
1634            let in_progress = remove!(task, InProgress)?;
1635            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1636                task.add_new(CachedDataItem::InProgress { value: in_progress });
1637                return None;
1638            };
1639            execution_reason = reason;
1640            task.add_new(CachedDataItem::InProgress {
1641                value: InProgressState::InProgress(Box::new(InProgressStateInner {
1642                    stale: false,
1643                    once_task,
1644                    done_event,
1645                    session_dependent: false,
1646                    marked_as_completed: false,
1647                    new_children: Default::default(),
1648                })),
1649            });
1650
1651            // Make all current collectibles outdated (remove left-over outdated collectibles)
1652            enum Collectible {
1653                Current(CollectibleRef, i32),
1654                Outdated(CollectibleRef),
1655            }
1656            let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1657                    .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1658                    .collect::<Vec<_>>();
1659            for collectible in collectibles {
1660                match collectible {
1661                    Collectible::Current(collectible, value) => {
1662                        let _ =
1663                            task.insert(CachedDataItem::OutdatedCollectible { collectible, value });
1664                    }
1665                    Collectible::Outdated(collectible) => {
1666                        if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1667                            task.remove(&CachedDataItemKey::OutdatedCollectible { collectible });
1668                        }
1669                    }
1670                }
1671            }
1672
1673            if self.should_track_dependencies() {
1674                // Make all dependencies outdated
1675                enum Dep {
1676                    CurrentCell(CellRef),
1677                    CurrentOutput(TaskId),
1678                    OutdatedCell(CellRef),
1679                    OutdatedOutput(TaskId),
1680                }
1681                let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1682                    .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1683                    .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1684                    .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1685                    .collect::<Vec<_>>();
1686                for dep in dependencies {
1687                    match dep {
1688                        Dep::CurrentCell(cell) => {
1689                            let _ = task.add(CachedDataItem::OutdatedCellDependency {
1690                                target: cell,
1691                                value: (),
1692                            });
1693                        }
1694                        Dep::CurrentOutput(output) => {
1695                            let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1696                                target: output,
1697                                value: (),
1698                            });
1699                        }
1700                        Dep::OutdatedCell(cell) => {
1701                            if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1702                                task.remove(&CachedDataItemKey::OutdatedCellDependency {
1703                                    target: cell,
1704                                });
1705                            }
1706                        }
1707                        Dep::OutdatedOutput(output) => {
1708                            if !task
1709                                .has_key(&CachedDataItemKey::OutputDependency { target: output })
1710                            {
1711                                task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1712                                    target: output,
1713                                });
1714                            }
1715                        }
1716                    }
1717                }
1718            }
1719        }
1720
1721        let (span, future) = match task_type {
1722            TaskType::Cached(task_type) => {
1723                let CachedTaskType {
1724                    native_fn,
1725                    this,
1726                    arg,
1727                } = &*task_type;
1728                (
1729                    native_fn.span(task_id.persistence(), execution_reason),
1730                    native_fn.execute(*this, &**arg),
1731                )
1732            }
1733            TaskType::Transient(task_type) => {
1734                let span = tracing::trace_span!("turbo_tasks::root_task");
1735                let future = match &*task_type {
1736                    TransientTask::Root(f) => f(),
1737                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1738                };
1739                (span, future)
1740            }
1741        };
1742        Some(TaskExecutionSpec { future, span })
1743    }
1744
1745    fn task_execution_completed(
1746        &self,
1747        task_id: TaskId,
1748        result: Result<RawVc, TurboTasksExecutionError>,
1749        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1750        stateful: bool,
1751        has_invalidator: bool,
1752        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1753    ) -> bool {
1754        // Task completion is a 4 step process:
1755        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1756        //    aggregation number of the task and the new children.
1757        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1758        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1759        // 4. Shrink the task memory to reduce footprint of the task.
1760
1761        // Due to persistence it is possible that the process is cancelled after any step. This is
1762        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1763        // in-memory representation.
1764
1765        // The task might be invalidated during this process, so we need to check the stale flag
1766        // at the start of every step.
1767
1768        #[cfg(not(feature = "trace_task_details"))]
1769        let _span = tracing::trace_span!("task execution completed").entered();
1770        #[cfg(feature = "trace_task_details")]
1771        let span = tracing::trace_span!(
1772            "task execution completed",
1773            task_id = display(task_id),
1774            result = match result.as_ref() {
1775                Ok(value) => display(either::Either::Left(value)),
1776                Err(err) => display(either::Either::Right(err)),
1777            },
1778            immutable = tracing::field::Empty,
1779            new_output = tracing::field::Empty,
1780            output_dependents = tracing::field::Empty,
1781            stale = tracing::field::Empty,
1782        )
1783        .entered();
1784        let mut ctx = self.execute_context(turbo_tasks);
1785
1786        let Some(TaskExecutionCompletePrepareResult {
1787            new_children,
1788            mut removed_data,
1789            is_now_immutable,
1790            #[cfg(feature = "verify_determinism")]
1791            no_output_set,
1792            new_output,
1793            output_dependent_tasks,
1794        }) = self.task_execution_completed_prepare(
1795            &mut ctx,
1796            #[cfg(feature = "trace_task_details")]
1797            &span,
1798            task_id,
1799            result,
1800            cell_counters,
1801            stateful,
1802            has_invalidator,
1803        )
1804        else {
1805            // Task was stale and has been rescheduled
1806            #[cfg(feature = "trace_task_details")]
1807            span.record("stale", "true");
1808            return true;
1809        };
1810
1811        #[cfg(feature = "trace_task_details")]
1812        span.record("new_output", new_output.is_some());
1813        #[cfg(feature = "trace_task_details")]
1814        span.record("output_dependents", output_dependent_tasks.len());
1815
1816        // When restoring from filesystem cache the following might not be executed (since we can
1817        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1818        // would be executed again.
1819
1820        if !output_dependent_tasks.is_empty() {
1821            self.task_execution_completed_invalidate_output_dependent(
1822                &mut ctx,
1823                task_id,
1824                output_dependent_tasks,
1825            );
1826        }
1827
1828        let has_new_children = !new_children.is_empty();
1829
1830        if has_new_children {
1831            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1832        }
1833
1834        if has_new_children
1835            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1836        {
1837            // Task was stale and has been rescheduled
1838            #[cfg(feature = "trace_task_details")]
1839            span.record("stale", "true");
1840            return true;
1841        }
1842
1843        if self.task_execution_completed_finish(
1844            &mut ctx,
1845            task_id,
1846            #[cfg(feature = "verify_determinism")]
1847            no_output_set,
1848            new_output,
1849            &mut removed_data,
1850            is_now_immutable,
1851        ) {
1852            // Task was stale and has been rescheduled
1853            #[cfg(feature = "trace_task_details")]
1854            span.record("stale", "true");
1855            return true;
1856        }
1857
1858        drop(removed_data);
1859
1860        self.task_execution_completed_cleanup(&mut ctx, task_id);
1861
1862        false
1863    }
1864
1865    fn task_execution_completed_prepare(
1866        &self,
1867        ctx: &mut impl ExecuteContext<'_>,
1868        #[cfg(feature = "trace_task_details")] span: &Span,
1869        task_id: TaskId,
1870        result: Result<RawVc, TurboTasksExecutionError>,
1871        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1872        stateful: bool,
1873        has_invalidator: bool,
1874    ) -> Option<TaskExecutionCompletePrepareResult> {
1875        let mut task = ctx.task(task_id, TaskDataCategory::All);
1876        let Some(in_progress) = get_mut!(task, InProgress) else {
1877            panic!("Task execution completed, but task is not in progress: {task:#?}");
1878        };
1879        if matches!(in_progress, InProgressState::Canceled) {
1880            return Some(TaskExecutionCompletePrepareResult {
1881                new_children: Default::default(),
1882                removed_data: Default::default(),
1883                is_now_immutable: false,
1884                #[cfg(feature = "verify_determinism")]
1885                no_output_set: false,
1886                new_output: None,
1887                output_dependent_tasks: Default::default(),
1888            });
1889        }
1890        let &mut InProgressState::InProgress(box InProgressStateInner {
1891            stale,
1892            ref mut new_children,
1893            session_dependent,
1894            ..
1895        }) = in_progress
1896        else {
1897            panic!("Task execution completed, but task is not in progress: {task:#?}");
1898        };
1899
1900        // If the task is stale, reschedule it
1901        #[cfg(not(feature = "no_fast_stale"))]
1902        if stale {
1903            let Some(InProgressState::InProgress(box InProgressStateInner {
1904                done_event,
1905                mut new_children,
1906                ..
1907            })) = remove!(task, InProgress)
1908            else {
1909                unreachable!();
1910            };
1911            task.add_new(CachedDataItem::InProgress {
1912                value: InProgressState::Scheduled {
1913                    done_event,
1914                    reason: TaskExecutionReason::Stale,
1915                },
1916            });
1917            // Remove old children from new_children to leave only the children that had their
1918            // active count increased
1919            for task in iter_many!(task, Child { task } => task) {
1920                new_children.remove(&task);
1921            }
1922            drop(task);
1923
1924            // We need to undo the active count increase for the children since we throw away the
1925            // new_children list now.
1926            AggregationUpdateQueue::run(
1927                AggregationUpdateJob::DecreaseActiveCounts {
1928                    task_ids: new_children.into_iter().collect(),
1929                },
1930                ctx,
1931            );
1932            return None;
1933        }
1934
1935        // take the children from the task to process them
1936        let mut new_children = take(new_children);
1937
1938        // handle stateful
1939        if stateful {
1940            let _ = task.add(CachedDataItem::Stateful { value: () });
1941        }
1942
1943        // handle has_invalidator
1944        if has_invalidator {
1945            let _ = task.add(CachedDataItem::HasInvalidator { value: () });
1946        }
1947
1948        // handle cell counters: update max index and remove cells that are no longer used
1949        let old_counters: FxHashMap<_, _> =
1950            get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1951        let mut counters_to_remove = old_counters.clone();
1952        for (&cell_type, &max_index) in cell_counters.iter() {
1953            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1954                if old_max_index != max_index {
1955                    task.insert(CachedDataItem::CellTypeMaxIndex {
1956                        cell_type,
1957                        value: max_index,
1958                    });
1959                }
1960            } else {
1961                task.add_new(CachedDataItem::CellTypeMaxIndex {
1962                    cell_type,
1963                    value: max_index,
1964                });
1965            }
1966        }
1967        for (cell_type, _) in counters_to_remove {
1968            task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1969        }
1970
1971        let mut queue = AggregationUpdateQueue::new();
1972
1973        let mut removed_data = Vec::new();
1974        let mut old_edges = Vec::new();
1975
1976        let has_children = !new_children.is_empty();
1977        let is_immutable = task.is_immutable();
1978        let task_dependencies_for_immutable =
1979            // Task was previously marked as immutable
1980            if !is_immutable
1981            // Task is not session dependent (session dependent tasks can change between sessions)
1982            && !session_dependent
1983            // Task has no invalidator
1984            && !task.has_key(&CachedDataItemKey::HasInvalidator {})
1985            // Task has no dependencies on collectibles
1986            && count!(task, CollectiblesDependency) == 0
1987        {
1988            Some(
1989                // Collect all dependencies on tasks to check if all dependencies are immutable
1990                iter_many!(task, OutputDependency { target } => target)
1991                    .chain(iter_many!(task, CellDependency { target } => target.task))
1992                    .collect::<FxHashSet<_>>(),
1993            )
1994        } else {
1995            None
1996        };
1997
1998        if has_children {
1999            // Prepare all new children
2000            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2001
2002            // Filter actual new children
2003            old_edges.extend(
2004                iter_many!(task, Child { task } => task)
2005                    .filter(|task| !new_children.remove(task))
2006                    .map(OutdatedEdge::Child),
2007            );
2008        } else {
2009            old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
2010        }
2011
2012        // Remove no longer existing cells and
2013        // find all outdated data items (removed cells, outdated edges)
2014        // Note: For persistent tasks we only want to call extract_if when there are actual cells to
2015        // remove to avoid tracking that as modification.
2016        if task_id.is_transient() || iter_many!(task, CellData { cell }
2017            if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
2018        ).count() > 0 {
2019            removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
2020                matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
2021                            .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
2022            }));
2023        }
2024
2025        old_edges.extend(
2026            task.iter(CachedDataItemType::OutdatedCollectible)
2027                .filter_map(|(key, value)| match (key, value) {
2028                    (
2029                        CachedDataItemKey::OutdatedCollectible { collectible },
2030                        CachedDataItemValueRef::OutdatedCollectible { value },
2031                    ) => Some(OutdatedEdge::Collectible(collectible, *value)),
2032                    _ => None,
2033                }),
2034        );
2035
2036        if self.should_track_dependencies() {
2037            old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
2038            old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
2039            old_edges.extend(
2040                iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
2041                    |(cell, task)| {
2042                        if cell_counters
2043                            .get(&cell.type_id)
2044                            .is_none_or(|start_index| cell.index >= *start_index)
2045                            && let Some(old_counter) = old_counters.get(&cell.type_id)
2046                            && cell.index < *old_counter
2047                        {
2048                            return Some(OutdatedEdge::RemovedCellDependent {
2049                                task_id: task,
2050                                #[cfg(feature = "trace_task_dirty")]
2051                                value_type_id: cell.type_id,
2052                            });
2053                        }
2054                        None
2055                    },
2056                ),
2057            );
2058        }
2059
2060        // Check if output need to be updated
2061        let current_output = get!(task, Output);
2062        #[cfg(feature = "verify_determinism")]
2063        let no_output_set = current_output.is_none();
2064        let new_output = match result {
2065            Ok(RawVc::TaskOutput(output_task_id)) => {
2066                if let Some(OutputValue::Output(current_task_id)) = current_output
2067                    && *current_task_id == output_task_id
2068                {
2069                    None
2070                } else {
2071                    Some(OutputValue::Output(output_task_id))
2072                }
2073            }
2074            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2075                if let Some(OutputValue::Cell(CellRef {
2076                    task: current_task_id,
2077                    cell: current_cell,
2078                })) = current_output
2079                    && *current_task_id == output_task_id
2080                    && *current_cell == cell
2081                {
2082                    None
2083                } else {
2084                    Some(OutputValue::Cell(CellRef {
2085                        task: output_task_id,
2086                        cell,
2087                    }))
2088                }
2089            }
2090            Ok(RawVc::LocalOutput(..)) => {
2091                panic!("Non-local tasks must not return a local Vc");
2092            }
2093            Err(err) => {
2094                if let Some(OutputValue::Error(old_error)) = current_output
2095                    && old_error == &err
2096                {
2097                    None
2098                } else {
2099                    Some(OutputValue::Error(err))
2100                }
2101            }
2102        };
2103        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2104        // When output has changed, grab the dependent tasks
2105        if new_output.is_some() && ctx.should_track_dependencies() {
2106            output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
2107        }
2108
2109        drop(task);
2110
2111        // Check if the task can be marked as immutable
2112        let mut is_now_immutable = false;
2113        if let Some(dependencies) = task_dependencies_for_immutable
2114            && dependencies
2115                .iter()
2116                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).is_immutable())
2117        {
2118            is_now_immutable = true;
2119        }
2120        #[cfg(feature = "trace_task_details")]
2121        span.record("immutable", is_immutable || is_now_immutable);
2122
2123        if !queue.is_empty() || !old_edges.is_empty() {
2124            #[cfg(feature = "trace_task_completion")]
2125            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2126            // Remove outdated edges first, before removing in_progress+dirty flag.
2127            // We need to make sure all outdated edges are removed before the task can potentially
2128            // be scheduled and executed again
2129            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2130        }
2131
2132        Some(TaskExecutionCompletePrepareResult {
2133            new_children,
2134            removed_data,
2135            is_now_immutable,
2136            #[cfg(feature = "verify_determinism")]
2137            no_output_set,
2138            new_output,
2139            output_dependent_tasks,
2140        })
2141    }
2142
2143    fn task_execution_completed_invalidate_output_dependent(
2144        &self,
2145        ctx: &mut impl ExecuteContext<'_>,
2146        task_id: TaskId,
2147        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2148    ) {
2149        debug_assert!(!output_dependent_tasks.is_empty());
2150
2151        let mut queue = AggregationUpdateQueue::new();
2152        for dependent_task_id in output_dependent_tasks {
2153            #[cfg(feature = "trace_task_output_dependencies")]
2154            let span = tracing::trace_span!(
2155                "invalidate output dependency",
2156                task = %task_id,
2157                dependent_task = %dependent_task_id,
2158                result = tracing::field::Empty,
2159            )
2160            .entered();
2161            if ctx.is_once_task(dependent_task_id) {
2162                // once tasks are never invalidated
2163                #[cfg(feature = "trace_task_output_dependencies")]
2164                span.record("result", "once task");
2165                continue;
2166            }
2167            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2168            if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
2169                // output dependency is outdated, so it hasn't read the output yet
2170                // and doesn't need to be invalidated
2171                #[cfg(feature = "trace_task_output_dependencies")]
2172                span.record("result", "outdated dependency");
2173                continue;
2174            }
2175            if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
2176                // output dependency has been removed, so the task doesn't depend on the
2177                // output anymore and doesn't need to be invalidated
2178                #[cfg(feature = "trace_task_output_dependencies")]
2179                span.record("result", "no backward dependency");
2180                continue;
2181            }
2182            make_task_dirty_internal(
2183                dependent,
2184                dependent_task_id,
2185                true,
2186                #[cfg(feature = "trace_task_dirty")]
2187                TaskDirtyCause::OutputChange { task_id },
2188                &mut queue,
2189                ctx,
2190            );
2191            #[cfg(feature = "trace_task_output_dependencies")]
2192            span.record("result", "marked dirty");
2193        }
2194
2195        queue.execute(ctx);
2196    }
2197
2198    fn task_execution_completed_unfinished_children_dirty(
2199        &self,
2200        ctx: &mut impl ExecuteContext<'_>,
2201        new_children: &FxHashSet<TaskId>,
2202    ) {
2203        debug_assert!(!new_children.is_empty());
2204
2205        let mut queue = AggregationUpdateQueue::new();
2206        for &child_id in new_children {
2207            let child_task = ctx.task(child_id, TaskDataCategory::Meta);
2208            if !child_task.has_key(&CachedDataItemKey::Output {}) {
2209                make_task_dirty_internal(
2210                    child_task,
2211                    child_id,
2212                    false,
2213                    #[cfg(feature = "trace_task_dirty")]
2214                    TaskDirtyCause::InitialDirty,
2215                    &mut queue,
2216                    ctx,
2217                );
2218            }
2219        }
2220
2221        queue.execute(ctx);
2222    }
2223
2224    fn task_execution_completed_connect(
2225        &self,
2226        ctx: &mut impl ExecuteContext<'_>,
2227        task_id: TaskId,
2228        new_children: FxHashSet<TaskId>,
2229    ) -> bool {
2230        debug_assert!(!new_children.is_empty());
2231
2232        let mut task = ctx.task(task_id, TaskDataCategory::All);
2233        let Some(in_progress) = get!(task, InProgress) else {
2234            panic!("Task execution completed, but task is not in progress: {task:#?}");
2235        };
2236        if matches!(in_progress, InProgressState::Canceled) {
2237            // Task was canceled in the meantime, so we don't connect the children
2238            return false;
2239        }
2240        let InProgressState::InProgress(box InProgressStateInner {
2241            #[cfg(not(feature = "no_fast_stale"))]
2242            stale,
2243            ..
2244        }) = in_progress
2245        else {
2246            panic!("Task execution completed, but task is not in progress: {task:#?}");
2247        };
2248
2249        // If the task is stale, reschedule it
2250        #[cfg(not(feature = "no_fast_stale"))]
2251        if *stale {
2252            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2253                remove!(task, InProgress)
2254            else {
2255                unreachable!();
2256            };
2257            task.add_new(CachedDataItem::InProgress {
2258                value: InProgressState::Scheduled {
2259                    done_event,
2260                    reason: TaskExecutionReason::Stale,
2261                },
2262            });
2263            drop(task);
2264
2265            // All `new_children` are currently hold active with an active count and we need to undo
2266            // that. (We already filtered out the old children from that list)
2267            AggregationUpdateQueue::run(
2268                AggregationUpdateJob::DecreaseActiveCounts {
2269                    task_ids: new_children.into_iter().collect(),
2270                },
2271                ctx,
2272            );
2273            return true;
2274        }
2275
2276        let has_active_count = ctx.should_track_activeness()
2277            && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
2278        connect_children(
2279            ctx,
2280            task_id,
2281            task,
2282            new_children,
2283            has_active_count,
2284            ctx.should_track_activeness(),
2285        );
2286
2287        false
2288    }
2289
2290    fn task_execution_completed_finish(
2291        &self,
2292        ctx: &mut impl ExecuteContext<'_>,
2293        task_id: TaskId,
2294        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2295        new_output: Option<OutputValue>,
2296        removed_data: &mut Vec<CachedDataItem>,
2297        is_now_immutable: bool,
2298    ) -> bool {
2299        let mut task = ctx.task(task_id, TaskDataCategory::All);
2300        let Some(in_progress) = remove!(task, InProgress) else {
2301            panic!("Task execution completed, but task is not in progress: {task:#?}");
2302        };
2303        if matches!(in_progress, InProgressState::Canceled) {
2304            // Task was canceled in the meantime, so we don't finish it
2305            return false;
2306        }
2307        let InProgressState::InProgress(box InProgressStateInner {
2308            done_event,
2309            once_task: _,
2310            stale,
2311            session_dependent,
2312            marked_as_completed: _,
2313            new_children,
2314        }) = in_progress
2315        else {
2316            panic!("Task execution completed, but task is not in progress: {task:#?}");
2317        };
2318        debug_assert!(new_children.is_empty());
2319
2320        // If the task is stale, reschedule it
2321        if stale {
2322            task.add_new(CachedDataItem::InProgress {
2323                value: InProgressState::Scheduled {
2324                    done_event,
2325                    reason: TaskExecutionReason::Stale,
2326                },
2327            });
2328            return true;
2329        }
2330
2331        // Set the output if it has changed
2332        let mut old_content = None;
2333        if let Some(value) = new_output {
2334            old_content = task.insert(CachedDataItem::Output { value });
2335        }
2336
2337        // If the task is not stateful and has no mutable children, it does not have a way to be
2338        // invalidated and we can mark it as immutable.
2339        if is_now_immutable {
2340            let _ = task.add(CachedDataItem::Immutable { value: () });
2341        }
2342
2343        // Notify in progress cells
2344        removed_data.extend(task.extract_if(
2345            CachedDataItemType::InProgressCell,
2346            |key, value| match (key, value) {
2347                (
2348                    CachedDataItemKey::InProgressCell { .. },
2349                    CachedDataItemValueRef::InProgressCell { value },
2350                ) => {
2351                    value.event.notify(usize::MAX);
2352                    true
2353                }
2354                _ => false,
2355            },
2356        ));
2357
2358        // Update the dirty state
2359        let old_dirty_state = get!(task, Dirty).copied();
2360
2361        let new_dirty_state = if session_dependent {
2362            Some(DirtyState {
2363                clean_in_session: Some(self.session_id),
2364            })
2365        } else {
2366            None
2367        };
2368
2369        let dirty_changed = old_dirty_state != new_dirty_state;
2370        let data_update = if dirty_changed {
2371            if let Some(new_dirty_state) = new_dirty_state {
2372                task.insert(CachedDataItem::Dirty {
2373                    value: new_dirty_state,
2374                });
2375            } else {
2376                task.remove(&CachedDataItemKey::Dirty {});
2377            }
2378
2379            if old_dirty_state.is_some() || new_dirty_state.is_some() {
2380                let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
2381                    .cloned()
2382                    .unwrap_or_default();
2383                if let Some(old_dirty_state) = old_dirty_state {
2384                    dirty_containers.update_with_dirty_state(&old_dirty_state);
2385                }
2386                let aggregated_update = match (old_dirty_state, new_dirty_state) {
2387                    (None, None) => unreachable!(),
2388                    (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
2389                    (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
2390                    (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
2391                };
2392                if !aggregated_update.is_zero() {
2393                    if aggregated_update.get(self.session_id) < 0
2394                        && let Some(activeness_state) = get_mut!(task, Activeness)
2395                    {
2396                        activeness_state.all_clean_event.notify(usize::MAX);
2397                        activeness_state.unset_active_until_clean();
2398                        if activeness_state.is_empty() {
2399                            task.remove(&CachedDataItemKey::Activeness {});
2400                        }
2401                    }
2402                    AggregationUpdateJob::data_update(
2403                        &mut task,
2404                        AggregatedDataUpdate::new()
2405                            .dirty_container_update(task_id, aggregated_update),
2406                    )
2407                } else {
2408                    None
2409                }
2410            } else {
2411                None
2412            }
2413        } else {
2414            None
2415        };
2416
2417        #[cfg(feature = "verify_determinism")]
2418        let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2419        #[cfg(not(feature = "verify_determinism"))]
2420        let reschedule = false;
2421        if reschedule {
2422            task.add_new(CachedDataItem::InProgress {
2423                value: InProgressState::Scheduled {
2424                    done_event,
2425                    reason: TaskExecutionReason::Stale,
2426                },
2427            });
2428            drop(task);
2429        } else {
2430            drop(task);
2431
2432            // Notify dependent tasks that are waiting for this task to finish
2433            done_event.notify(usize::MAX);
2434        }
2435
2436        drop(old_content);
2437
2438        if let Some(data_update) = data_update {
2439            AggregationUpdateQueue::run(data_update, ctx);
2440        }
2441
2442        reschedule
2443    }
2444
2445    fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
2446        let mut task = ctx.task(task_id, TaskDataCategory::All);
2447        task.shrink_to_fit(CachedDataItemType::CellData);
2448        task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
2449        task.shrink_to_fit(CachedDataItemType::CellDependency);
2450        task.shrink_to_fit(CachedDataItemType::OutputDependency);
2451        task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
2452        drop(task);
2453    }
2454
2455    fn run_backend_job<'a>(
2456        self: &'a Arc<Self>,
2457        job: TurboTasksBackendJob,
2458        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2459    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2460        Box::pin(async move {
2461            match job {
2462                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2463                    debug_assert!(self.should_persist());
2464
2465                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2466                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2467                    loop {
2468                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2469                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2470                        const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
2471
2472                        let (time, mut reason) =
2473                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2474                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2475                            } else {
2476                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2477                            };
2478
2479                        let until = last_snapshot + time;
2480                        if until > Instant::now() {
2481                            let mut stop_listener = self.stopping_event.listen();
2482                            if self.stopping.load(Ordering::Acquire) {
2483                                return;
2484                            }
2485                            let mut idle_start_listener = self.idle_start_event.listen();
2486                            let mut idle_end_listener = self.idle_end_event.listen();
2487                            let mut idle_time = if turbo_tasks.is_idle() {
2488                                Instant::now() + IDLE_TIMEOUT
2489                            } else {
2490                                far_future()
2491                            };
2492                            loop {
2493                                tokio::select! {
2494                                    _ = &mut stop_listener => {
2495                                        return;
2496                                    },
2497                                    _ = &mut idle_start_listener => {
2498                                        idle_time = Instant::now() + IDLE_TIMEOUT;
2499                                        idle_start_listener = self.idle_start_event.listen()
2500                                    },
2501                                    _ = &mut idle_end_listener => {
2502                                        idle_time = until + IDLE_TIMEOUT;
2503                                        idle_end_listener = self.idle_end_event.listen()
2504                                    },
2505                                    _ = tokio::time::sleep_until(until) => {
2506                                        break;
2507                                    },
2508                                    _ = tokio::time::sleep_until(idle_time) => {
2509                                        if turbo_tasks.is_idle() {
2510                                            reason = "idle timeout";
2511                                            break;
2512                                        }
2513                                    },
2514                                }
2515                            }
2516                        }
2517
2518                        let _span = info_span!("persist", reason = reason).entered();
2519                        let this = self.clone();
2520                        let snapshot = this.snapshot();
2521                        if let Some((snapshot_start, new_data)) = snapshot {
2522                            last_snapshot = snapshot_start;
2523                            if new_data {
2524                                continue;
2525                            }
2526                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2527                            self.last_snapshot.store(
2528                                last_snapshot.as_millis().try_into().unwrap(),
2529                                Ordering::Relaxed,
2530                            );
2531
2532                            let _span = Span::none().entered();
2533                            turbo_tasks.schedule_backend_background_job(
2534                                TurboTasksBackendJob::FollowUpSnapshot,
2535                            );
2536                            return;
2537                        }
2538                    }
2539                }
2540                TurboTasksBackendJob::Prefetch { data, range } => {
2541                    let range = if let Some(range) = range {
2542                        range
2543                    } else {
2544                        if data.len() > 128 {
2545                            let chunk_size = good_chunk_size(data.len());
2546                            let chunks = data.len().div_ceil(chunk_size);
2547                            for i in 0..chunks {
2548                                turbo_tasks.schedule_backend_background_job(
2549                                    TurboTasksBackendJob::Prefetch {
2550                                        data: data.clone(),
2551                                        range: Some(
2552                                            (i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2553                                        ),
2554                                    },
2555                                );
2556                            }
2557                            return;
2558                        }
2559                        0..data.len()
2560                    };
2561
2562                    let _span = trace_span!("prefetching").entered();
2563                    let mut ctx = self.execute_context(turbo_tasks);
2564                    for i in range {
2565                        let (&task, &with_data) = data.get_index(i).unwrap();
2566                        let category = if with_data {
2567                            TaskDataCategory::All
2568                        } else {
2569                            TaskDataCategory::Meta
2570                        };
2571                        // Prefetch the task
2572                        drop(ctx.task(task, category));
2573                    }
2574                }
2575            }
2576        })
2577    }
2578
2579    fn try_read_own_task_cell(
2580        &self,
2581        task_id: TaskId,
2582        cell: CellId,
2583        _options: ReadCellOptions,
2584        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2585    ) -> Result<TypedCellContent> {
2586        let mut ctx = self.execute_context(turbo_tasks);
2587        let task = ctx.task(task_id, TaskDataCategory::Data);
2588        if let Some(content) = get!(task, CellData { cell }) {
2589            Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
2590        } else {
2591            Ok(CellContent(None).into_typed(cell.type_id))
2592        }
2593    }
2594
2595    fn read_task_collectibles(
2596        &self,
2597        task_id: TaskId,
2598        collectible_type: TraitTypeId,
2599        reader_id: Option<TaskId>,
2600        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2601    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2602        let mut ctx = self.execute_context(turbo_tasks);
2603        let mut collectibles = AutoMap::default();
2604        {
2605            let mut task = ctx.task(task_id, TaskDataCategory::All);
2606            // Ensure it's an root node
2607            loop {
2608                let aggregation_number = get_aggregation_number(&task);
2609                if is_root_node(aggregation_number) {
2610                    break;
2611                }
2612                drop(task);
2613                AggregationUpdateQueue::run(
2614                    AggregationUpdateJob::UpdateAggregationNumber {
2615                        task_id,
2616                        base_aggregation_number: u32::MAX,
2617                        distance: None,
2618                    },
2619                    &mut ctx,
2620                );
2621                task = ctx.task(task_id, TaskDataCategory::All);
2622            }
2623            for collectible in iter_many!(
2624                task,
2625                AggregatedCollectible {
2626                    collectible
2627                } count if collectible.collectible_type == collectible_type && *count > 0 => {
2628                    collectible.cell
2629                }
2630            ) {
2631                *collectibles
2632                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2633                    .or_insert(0) += 1;
2634            }
2635            for (collectible, count) in iter_many!(
2636                task,
2637                Collectible {
2638                    collectible
2639                } count if collectible.collectible_type == collectible_type => {
2640                    (collectible.cell, *count)
2641                }
2642            ) {
2643                *collectibles
2644                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2645                    .or_insert(0) += count;
2646            }
2647            if let Some(reader_id) = reader_id {
2648                let _ = task.add(CachedDataItem::CollectiblesDependent {
2649                    collectible_type,
2650                    task: reader_id,
2651                    value: (),
2652                });
2653            }
2654        }
2655        if let Some(reader_id) = reader_id {
2656            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2657            let target = CollectiblesRef {
2658                task: task_id,
2659                collectible_type,
2660            };
2661            if reader
2662                .remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target })
2663                .is_none()
2664            {
2665                let _ = reader.add(CachedDataItem::CollectiblesDependency { target, value: () });
2666            }
2667        }
2668        collectibles
2669    }
2670
2671    fn emit_collectible(
2672        &self,
2673        collectible_type: TraitTypeId,
2674        collectible: RawVc,
2675        task_id: TaskId,
2676        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2677    ) {
2678        self.assert_valid_collectible(task_id, collectible);
2679
2680        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2681            panic!("Collectibles need to be resolved");
2682        };
2683        let cell = CellRef {
2684            task: collectible_task,
2685            cell,
2686        };
2687        operation::UpdateCollectibleOperation::run(
2688            task_id,
2689            CollectibleRef {
2690                collectible_type,
2691                cell,
2692            },
2693            1,
2694            self.execute_context(turbo_tasks),
2695        );
2696    }
2697
2698    fn unemit_collectible(
2699        &self,
2700        collectible_type: TraitTypeId,
2701        collectible: RawVc,
2702        count: u32,
2703        task_id: TaskId,
2704        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2705    ) {
2706        self.assert_valid_collectible(task_id, collectible);
2707
2708        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2709            panic!("Collectibles need to be resolved");
2710        };
2711        let cell = CellRef {
2712            task: collectible_task,
2713            cell,
2714        };
2715        operation::UpdateCollectibleOperation::run(
2716            task_id,
2717            CollectibleRef {
2718                collectible_type,
2719                cell,
2720            },
2721            -(i32::try_from(count).unwrap()),
2722            self.execute_context(turbo_tasks),
2723        );
2724    }
2725
2726    fn update_task_cell(
2727        &self,
2728        task_id: TaskId,
2729        cell: CellId,
2730        content: CellContent,
2731        verification_mode: VerificationMode,
2732        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2733    ) {
2734        operation::UpdateCellOperation::run(
2735            task_id,
2736            cell,
2737            content,
2738            verification_mode,
2739            self.execute_context(turbo_tasks),
2740        );
2741    }
2742
2743    fn mark_own_task_as_session_dependent(
2744        &self,
2745        task_id: TaskId,
2746        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2747    ) {
2748        if !self.should_track_dependencies() {
2749            // Without dependency tracking we don't need session dependent tasks
2750            return;
2751        }
2752        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2753        let mut ctx = self.execute_context(turbo_tasks);
2754        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2755        let aggregation_number = get_aggregation_number(&task);
2756        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2757            drop(task);
2758            // We want to use a high aggregation number to avoid large aggregation chains for
2759            // session dependent tasks (which change on every run)
2760            AggregationUpdateQueue::run(
2761                AggregationUpdateJob::UpdateAggregationNumber {
2762                    task_id,
2763                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2764                    distance: None,
2765                },
2766                &mut ctx,
2767            );
2768            task = ctx.task(task_id, TaskDataCategory::Meta);
2769        }
2770        if let Some(InProgressState::InProgress(box InProgressStateInner {
2771            session_dependent,
2772            ..
2773        })) = get_mut!(task, InProgress)
2774        {
2775            *session_dependent = true;
2776        }
2777    }
2778
2779    fn mark_own_task_as_finished(
2780        &self,
2781        task: TaskId,
2782        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2783    ) {
2784        let mut ctx = self.execute_context(turbo_tasks);
2785        let mut task = ctx.task(task, TaskDataCategory::Data);
2786        if let Some(InProgressState::InProgress(box InProgressStateInner {
2787            marked_as_completed,
2788            ..
2789        })) = get_mut!(task, InProgress)
2790        {
2791            *marked_as_completed = true;
2792            // TODO this should remove the dirty state (also check session_dependent)
2793            // but this would break some assumptions for strongly consistent reads.
2794            // Client tasks are not connected yet, so we wouldn't wait for them.
2795            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
2796        }
2797    }
2798
2799    fn set_own_task_aggregation_number(
2800        &self,
2801        task: TaskId,
2802        aggregation_number: u32,
2803        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2804    ) {
2805        let mut ctx = self.execute_context(turbo_tasks);
2806        AggregationUpdateQueue::run(
2807            AggregationUpdateJob::UpdateAggregationNumber {
2808                task_id: task,
2809                base_aggregation_number: aggregation_number,
2810                distance: None,
2811            },
2812            &mut ctx,
2813        );
2814    }
2815
2816    fn connect_task(
2817        &self,
2818        task: TaskId,
2819        parent_task: Option<TaskId>,
2820        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2821    ) {
2822        self.assert_not_persistent_calling_transient(parent_task, task, None);
2823        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2824    }
2825
2826    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2827        let task_id = self.transient_task_id_factory.get();
2828        let root_type = match task_type {
2829            TransientTaskType::Root(_) => RootType::RootTask,
2830            TransientTaskType::Once(_) => RootType::OnceTask,
2831        };
2832        self.transient_tasks.insert(
2833            task_id,
2834            Arc::new(match task_type {
2835                TransientTaskType::Root(f) => TransientTask::Root(f),
2836                TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2837            }),
2838        );
2839        {
2840            let mut task = self.storage.access_mut(task_id);
2841            task.add(CachedDataItem::AggregationNumber {
2842                value: AggregationNumber {
2843                    base: u32::MAX,
2844                    distance: 0,
2845                    effective: u32::MAX,
2846                },
2847            });
2848            if self.should_track_activeness() {
2849                task.add(CachedDataItem::Activeness {
2850                    value: ActivenessState::new_root(root_type, task_id),
2851                });
2852            }
2853            task.add(CachedDataItem::new_scheduled(
2854                TaskExecutionReason::Initial,
2855                move || {
2856                    move || match root_type {
2857                        RootType::RootTask => "Root Task".to_string(),
2858                        RootType::OnceTask => "Once Task".to_string(),
2859                    }
2860                },
2861            ));
2862        }
2863        #[cfg(feature = "verify_aggregation_graph")]
2864        self.root_tasks.lock().insert(task_id);
2865        task_id
2866    }
2867
2868    fn dispose_root_task(
2869        &self,
2870        task_id: TaskId,
2871        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2872    ) {
2873        #[cfg(feature = "verify_aggregation_graph")]
2874        self.root_tasks.lock().remove(&task_id);
2875
2876        let mut ctx = self.execute_context(turbo_tasks);
2877        let mut task = ctx.task(task_id, TaskDataCategory::All);
2878        let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2879        let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2880            .map_or(false, |dirty_containers| {
2881                dirty_containers.get(self.session_id) > 0
2882            });
2883        if is_dirty || has_dirty_containers {
2884            if let Some(activeness_state) = get_mut!(task, Activeness) {
2885                // We will finish the task, but it would be removed after the task is done
2886                activeness_state.unset_root_type();
2887                activeness_state.set_active_until_clean();
2888            };
2889        } else if let Some(activeness_state) = remove!(task, Activeness) {
2890            // Technically nobody should be listening to this event, but just in case
2891            // we notify it anyway
2892            activeness_state.all_clean_event.notify(usize::MAX);
2893        }
2894    }
2895
2896    #[cfg(feature = "verify_aggregation_graph")]
2897    fn verify_aggregation_graph(
2898        &self,
2899        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2900        idle: bool,
2901    ) {
2902        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2903            return;
2904        }
2905        use std::{collections::VecDeque, env, io::stdout};
2906
2907        use crate::backend::operation::{get_uppers, is_aggregating_node};
2908
2909        let mut ctx = self.execute_context(turbo_tasks);
2910        let root_tasks = self.root_tasks.lock().clone();
2911
2912        for task_id in root_tasks.into_iter() {
2913            let mut queue = VecDeque::new();
2914            let mut visited = FxHashSet::default();
2915            let mut aggregated_nodes = FxHashSet::default();
2916            let mut collectibles = FxHashMap::default();
2917            let root_task_id = task_id;
2918            visited.insert(task_id);
2919            aggregated_nodes.insert(task_id);
2920            queue.push_back(task_id);
2921            let mut counter = 0;
2922            while let Some(task_id) = queue.pop_front() {
2923                counter += 1;
2924                if counter % 100000 == 0 {
2925                    println!(
2926                        "queue={}, visited={}, aggregated_nodes={}",
2927                        queue.len(),
2928                        visited.len(),
2929                        aggregated_nodes.len()
2930                    );
2931                }
2932                let task = ctx.task(task_id, TaskDataCategory::All);
2933                if idle && !self.is_idle.load(Ordering::Relaxed) {
2934                    return;
2935                }
2936
2937                let uppers = get_uppers(&task);
2938                if task_id != root_task_id
2939                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2940                {
2941                    panic!(
2942                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2943                         {:?})",
2944                        task_id,
2945                        ctx.get_task_description(task_id),
2946                        uppers
2947                    );
2948                }
2949
2950                let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2951                for collectible in aggregated_collectibles {
2952                    collectibles
2953                        .entry(collectible)
2954                        .or_insert_with(|| (false, Vec::new()))
2955                        .1
2956                        .push(task_id);
2957                }
2958
2959                let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2960                for collectible in own_collectibles {
2961                    if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2962                        *flag = true
2963                    } else {
2964                        panic!(
2965                            "Task {} has a collectible {:?} that is not in any upper task",
2966                            task_id, collectible
2967                        );
2968                    }
2969                }
2970
2971                let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2972                let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2973                    .is_some_and(|count| count.get(self.session_id) > 0);
2974                let should_be_in_upper = is_dirty || has_dirty_container;
2975
2976                let aggregation_number = get_aggregation_number(&task);
2977                if is_aggregating_node(aggregation_number) {
2978                    aggregated_nodes.insert(task_id);
2979                }
2980                // println!(
2981                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
2982                //     ctx.get_task_description(task_id),
2983                //     uppers
2984                // );
2985
2986                for child_id in iter_many!(task, Child { task } => task) {
2987                    // println!("{task_id}: child -> {child_id}");
2988                    if visited.insert(child_id) {
2989                        queue.push_back(child_id);
2990                    }
2991                }
2992                drop(task);
2993
2994                if should_be_in_upper {
2995                    for upper_id in uppers {
2996                        let task = ctx.task(task_id, TaskDataCategory::All);
2997                        let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2998                            .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2999                        if !in_upper {
3000                            panic!(
3001                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
3002                                 ({})",
3003                                task_id,
3004                                ctx.get_task_description(task_id),
3005                                upper_id,
3006                                ctx.get_task_description(upper_id)
3007                            );
3008                        }
3009                    }
3010                }
3011            }
3012
3013            for (collectible, (flag, task_ids)) in collectibles {
3014                if !flag {
3015                    use std::io::Write;
3016                    let mut stdout = stdout().lock();
3017                    writeln!(
3018                        stdout,
3019                        "{:?} that is not emitted in any child task but in these aggregated \
3020                         tasks: {:#?}",
3021                        collectible,
3022                        task_ids
3023                            .iter()
3024                            .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
3025                            .collect::<Vec<_>>()
3026                    )
3027                    .unwrap();
3028
3029                    let task_id = collectible.cell.task;
3030                    let mut queue = {
3031                        let task = ctx.task(task_id, TaskDataCategory::All);
3032                        get_uppers(&task)
3033                    };
3034                    let mut visited = FxHashSet::default();
3035                    for &upper_id in queue.iter() {
3036                        visited.insert(upper_id);
3037                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3038                    }
3039                    while let Some(task_id) = queue.pop() {
3040                        let desc = ctx.get_task_description(task_id);
3041                        let task = ctx.task(task_id, TaskDataCategory::All);
3042                        let aggregated_collectible =
3043                            get!(task, AggregatedCollectible { collectible })
3044                                .copied()
3045                                .unwrap_or_default();
3046                        let uppers = get_uppers(&task);
3047                        drop(task);
3048                        writeln!(
3049                            stdout,
3050                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3051                        )
3052                        .unwrap();
3053                        if task_ids.contains(&task_id) {
3054                            writeln!(
3055                                stdout,
3056                                "Task has an upper connection to an aggregated task that doesn't \
3057                                 reference it. Upper connection is invalid!"
3058                            )
3059                            .unwrap();
3060                        }
3061                        for upper_id in uppers {
3062                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3063                            if !visited.contains(&upper_id) {
3064                                queue.push(upper_id);
3065                            }
3066                        }
3067                    }
3068                    panic!("See stdout for more details");
3069                }
3070            }
3071        }
3072    }
3073
3074    fn assert_not_persistent_calling_transient(
3075        &self,
3076        parent_id: Option<TaskId>,
3077        child_id: TaskId,
3078        cell_id: Option<CellId>,
3079    ) {
3080        if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
3081            self.panic_persistent_calling_transient(
3082                parent_id
3083                    .and_then(|id| self.lookup_task_type(id))
3084                    .as_deref(),
3085                self.lookup_task_type(child_id).as_deref(),
3086                cell_id,
3087            );
3088        }
3089    }
3090
3091    fn panic_persistent_calling_transient(
3092        &self,
3093        parent: Option<&CachedTaskType>,
3094        child: Option<&CachedTaskType>,
3095        cell_id: Option<CellId>,
3096    ) {
3097        let transient_reason = if let Some(child) = child {
3098            Cow::Owned(format!(
3099                " The callee is transient because it depends on:\n{}",
3100                self.debug_trace_transient_task(child, cell_id),
3101            ))
3102        } else {
3103            Cow::Borrowed("")
3104        };
3105        panic!(
3106            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3107            parent.map_or("unknown", |t| t.get_name()),
3108            child.map_or("unknown", |t| t.get_name()),
3109            transient_reason,
3110        );
3111    }
3112
3113    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3114        // these checks occur in a potentially hot codepath, but they're cheap
3115        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3116            // This should never happen: The collectible APIs use ResolvedVc
3117            let task_info = if let Some(col_task_ty) = collectible
3118                .try_get_task_id()
3119                .and_then(|t| self.lookup_task_type(t))
3120            {
3121                Cow::Owned(format!(" (return type of {col_task_ty})"))
3122            } else {
3123                Cow::Borrowed("")
3124            };
3125            panic!("Collectible{task_info} must be a ResolvedVc")
3126        };
3127        if col_task_id.is_transient() && !task_id.is_transient() {
3128            let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
3129                Cow::Owned(format!(
3130                    ". The collectible is transient because it depends on:\n{}",
3131                    self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3132                ))
3133            } else {
3134                Cow::Borrowed("")
3135            };
3136            // this should never happen: How would a persistent function get a transient Vc?
3137            panic!(
3138                "Collectible is transient, transient collectibles cannot be emitted from \
3139                 persistent tasks{transient_reason}",
3140            )
3141        }
3142    }
3143}
3144
3145impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3146    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3147        self.0.startup(turbo_tasks);
3148    }
3149
3150    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3151        self.0.stopping();
3152    }
3153
3154    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3155        self.0.stop(turbo_tasks);
3156    }
3157
3158    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3159        self.0.idle_start(turbo_tasks);
3160    }
3161
3162    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3163        self.0.idle_end();
3164    }
3165
3166    fn get_or_create_persistent_task(
3167        &self,
3168        task_type: CachedTaskType,
3169        parent_task: Option<TaskId>,
3170        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3171    ) -> TaskId {
3172        self.0
3173            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3174    }
3175
3176    fn get_or_create_transient_task(
3177        &self,
3178        task_type: CachedTaskType,
3179        parent_task: Option<TaskId>,
3180        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3181    ) -> TaskId {
3182        self.0
3183            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3184    }
3185
3186    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3187        self.0.invalidate_task(task_id, turbo_tasks);
3188    }
3189
3190    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3191        self.0.invalidate_tasks(tasks, turbo_tasks);
3192    }
3193
3194    fn invalidate_tasks_set(
3195        &self,
3196        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3197        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3198    ) {
3199        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3200    }
3201
3202    fn invalidate_serialization(
3203        &self,
3204        task_id: TaskId,
3205        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3206    ) {
3207        self.0.invalidate_serialization(task_id, turbo_tasks);
3208    }
3209
3210    fn get_task_description(&self, task: TaskId) -> String {
3211        self.0.get_task_description(task)
3212    }
3213
3214    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3215        self.0.task_execution_canceled(task, turbo_tasks)
3216    }
3217
3218    fn try_start_task_execution(
3219        &self,
3220        task_id: TaskId,
3221        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3222    ) -> Option<TaskExecutionSpec<'_>> {
3223        self.0.try_start_task_execution(task_id, turbo_tasks)
3224    }
3225
3226    fn task_execution_completed(
3227        &self,
3228        task_id: TaskId,
3229        result: Result<RawVc, TurboTasksExecutionError>,
3230        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3231        stateful: bool,
3232        has_invalidator: bool,
3233        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3234    ) -> bool {
3235        self.0.task_execution_completed(
3236            task_id,
3237            result,
3238            cell_counters,
3239            stateful,
3240            has_invalidator,
3241            turbo_tasks,
3242        )
3243    }
3244
3245    type BackendJob = TurboTasksBackendJob;
3246
3247    fn run_backend_job<'a>(
3248        &'a self,
3249        job: Self::BackendJob,
3250        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3251    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3252        self.0.run_backend_job(job, turbo_tasks)
3253    }
3254
3255    fn try_read_task_output(
3256        &self,
3257        task_id: TaskId,
3258        reader: Option<TaskId>,
3259        options: ReadOutputOptions,
3260        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3261    ) -> Result<Result<RawVc, EventListener>> {
3262        self.0
3263            .try_read_task_output(task_id, reader, options, turbo_tasks)
3264    }
3265
3266    fn try_read_task_cell(
3267        &self,
3268        task_id: TaskId,
3269        cell: CellId,
3270        reader: Option<TaskId>,
3271        options: ReadCellOptions,
3272        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3273    ) -> Result<Result<TypedCellContent, EventListener>> {
3274        self.0
3275            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3276    }
3277
3278    fn try_read_own_task_cell(
3279        &self,
3280        task_id: TaskId,
3281        cell: CellId,
3282        options: ReadCellOptions,
3283        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3284    ) -> Result<TypedCellContent> {
3285        self.0
3286            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3287    }
3288
3289    fn read_task_collectibles(
3290        &self,
3291        task_id: TaskId,
3292        collectible_type: TraitTypeId,
3293        reader: Option<TaskId>,
3294        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3295    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3296        self.0
3297            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3298    }
3299
3300    fn emit_collectible(
3301        &self,
3302        collectible_type: TraitTypeId,
3303        collectible: RawVc,
3304        task_id: TaskId,
3305        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3306    ) {
3307        self.0
3308            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3309    }
3310
3311    fn unemit_collectible(
3312        &self,
3313        collectible_type: TraitTypeId,
3314        collectible: RawVc,
3315        count: u32,
3316        task_id: TaskId,
3317        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3318    ) {
3319        self.0
3320            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3321    }
3322
3323    fn update_task_cell(
3324        &self,
3325        task_id: TaskId,
3326        cell: CellId,
3327        content: CellContent,
3328        verification_mode: VerificationMode,
3329        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3330    ) {
3331        self.0
3332            .update_task_cell(task_id, cell, content, verification_mode, turbo_tasks);
3333    }
3334
3335    fn mark_own_task_as_finished(
3336        &self,
3337        task_id: TaskId,
3338        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3339    ) {
3340        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3341    }
3342
3343    fn set_own_task_aggregation_number(
3344        &self,
3345        task: TaskId,
3346        aggregation_number: u32,
3347        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3348    ) {
3349        self.0
3350            .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3351    }
3352
3353    fn mark_own_task_as_session_dependent(
3354        &self,
3355        task: TaskId,
3356        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3357    ) {
3358        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3359    }
3360
3361    fn connect_task(
3362        &self,
3363        task: TaskId,
3364        parent_task: Option<TaskId>,
3365        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3366    ) {
3367        self.0.connect_task(task, parent_task, turbo_tasks);
3368    }
3369
3370    fn create_transient_task(
3371        &self,
3372        task_type: TransientTaskType,
3373        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3374    ) -> TaskId {
3375        self.0.create_transient_task(task_type)
3376    }
3377
3378    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3379        self.0.dispose_root_task(task_id, turbo_tasks);
3380    }
3381
3382    fn task_statistics(&self) -> &TaskStatisticsApi {
3383        &self.0.task_statistics
3384    }
3385
3386    fn is_tracking_dependencies(&self) -> bool {
3387        self.0.options.dependency_tracking
3388    }
3389}
3390
3391enum DebugTraceTransientTask {
3392    Cached {
3393        task_name: &'static str,
3394        cell_type_id: Option<ValueTypeId>,
3395        cause_self: Option<Box<DebugTraceTransientTask>>,
3396        cause_args: Vec<DebugTraceTransientTask>,
3397    },
3398    /// This representation is used when this task is a duplicate of one previously shown
3399    Collapsed {
3400        task_name: &'static str,
3401        cell_type_id: Option<ValueTypeId>,
3402    },
3403    Uncached {
3404        cell_type_id: Option<ValueTypeId>,
3405    },
3406}
3407
3408impl DebugTraceTransientTask {
3409    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3410        let indent = "    ".repeat(level);
3411        f.write_str(&indent)?;
3412
3413        fn fmt_cell_type_id(
3414            f: &mut fmt::Formatter<'_>,
3415            cell_type_id: Option<ValueTypeId>,
3416        ) -> fmt::Result {
3417            if let Some(ty) = cell_type_id {
3418                write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3419            } else {
3420                Ok(())
3421            }
3422        }
3423
3424        // write the name and type
3425        match self {
3426            Self::Cached {
3427                task_name,
3428                cell_type_id,
3429                ..
3430            }
3431            | Self::Collapsed {
3432                task_name,
3433                cell_type_id,
3434                ..
3435            } => {
3436                f.write_str(task_name)?;
3437                fmt_cell_type_id(f, *cell_type_id)?;
3438                if matches!(self, Self::Collapsed { .. }) {
3439                    f.write_str(" (collapsed)")?;
3440                }
3441            }
3442            Self::Uncached { cell_type_id } => {
3443                f.write_str("unknown transient task")?;
3444                fmt_cell_type_id(f, *cell_type_id)?;
3445            }
3446        }
3447        f.write_char('\n')?;
3448
3449        // write any extra "cause" information we might have
3450        if let Self::Cached {
3451            cause_self,
3452            cause_args,
3453            ..
3454        } = self
3455        {
3456            if let Some(c) = cause_self {
3457                writeln!(f, "{indent}  self:")?;
3458                c.fmt_indented(f, level + 1)?;
3459            }
3460            if !cause_args.is_empty() {
3461                writeln!(f, "{indent}  args:")?;
3462                for c in cause_args {
3463                    c.fmt_indented(f, level + 1)?;
3464                }
3465            }
3466        }
3467        Ok(())
3468    }
3469}
3470
3471impl fmt::Display for DebugTraceTransientTask {
3472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3473        self.fmt_indented(f, 0)
3474    }
3475}
3476
3477// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3478fn far_future() -> Instant {
3479    // Roughly 30 years from now.
3480    // API does not provide a way to obtain max `Instant`
3481    // or convert specific date in the future to instant.
3482    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3483    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3484}