turbo_tasks_backend/backend/
mod.rs

1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6    borrow::Cow,
7    cmp::min,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    ops::Range,
13    pin::Pin,
14    sync::{
15        Arc,
16        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17    },
18};
19
20use anyhow::{Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, field::Empty, info_span, trace_span};
28use turbo_tasks::{
29    CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
30    ReadOutputOptions, ReadTracking, SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId,
31    TraitTypeId, TurboTasksBackendApi, ValueTypeId,
32    backend::{
33        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
34        TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
35    },
36    event::{Event, EventListener},
37    message_queue::TimingEvent,
38    registry::get_value_type,
39    task_statistics::TaskStatisticsApi,
40    trace::TraceRawVcs,
41    turbo_tasks,
42    util::{IdFactoryWithReuse, good_chunk_size},
43};
44
45pub use self::{operation::AnyOperation, storage::TaskDataCategory};
46#[cfg(feature = "trace_task_dirty")]
47use crate::backend::operation::TaskDirtyCause;
48use crate::{
49    backend::{
50        operation::{
51            AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
52            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
53            Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
54            get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
55        },
56        storage::{
57            InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
58            iter_many, remove,
59        },
60    },
61    backing_storage::BackingStorage,
62    data::{
63        ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
64        CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
65        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
66    },
67    utils::{
68        bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
69        ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
70    },
71};
72
73const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
74
75struct SnapshotRequest {
76    snapshot_requested: bool,
77    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
78}
79
80impl SnapshotRequest {
81    fn new() -> Self {
82        Self {
83            snapshot_requested: false,
84            suspended_operations: FxHashSet::default(),
85        }
86    }
87}
88
89type TransientTaskOnce =
90    Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
91
92pub enum TransientTask {
93    /// A root task that will track dependencies and re-execute when
94    /// dependencies change. Task will eventually settle to the correct
95    /// execution.
96    ///
97    /// Always active. Automatically scheduled.
98    Root(TransientTaskRoot),
99
100    // TODO implement these strongly consistency
101    /// A single root task execution. It won't track dependencies.
102    /// Task will definitely include all invalidations that happened before the
103    /// start of the task. It may or may not include invalidations that
104    /// happened after that. It may see these invalidations partially
105    /// applied.
106    ///
107    /// Active until done. Automatically scheduled.
108    Once(TransientTaskOnce),
109}
110
111pub enum StorageMode {
112    /// Queries the storage for cache entries that don't exist locally.
113    ReadOnly,
114    /// Queries the storage for cache entries that don't exist locally.
115    /// Keeps a log of all changes and regularly push them to the backing storage.
116    ReadWrite,
117}
118
119pub struct BackendOptions {
120    /// Enables dependency tracking.
121    ///
122    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
123    /// forever.
124    pub dependency_tracking: bool,
125
126    /// Enables active tracking.
127    ///
128    /// Automatically disabled when `dependency_tracking` is disabled.
129    ///
130    /// When disabled: All tasks are considered as active.
131    pub active_tracking: bool,
132
133    /// Enables the backing storage.
134    pub storage_mode: Option<StorageMode>,
135
136    /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
137    /// datastructures. If `None`, it will use the available parallelism.
138    pub num_workers: Option<usize>,
139
140    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
141    pub small_preallocation: bool,
142}
143
144impl Default for BackendOptions {
145    fn default() -> Self {
146        Self {
147            dependency_tracking: true,
148            active_tracking: true,
149            storage_mode: Some(StorageMode::ReadWrite),
150            num_workers: None,
151            small_preallocation: false,
152        }
153    }
154}
155
156pub enum TurboTasksBackendJob {
157    InitialSnapshot,
158    FollowUpSnapshot,
159    Prefetch {
160        data: Arc<FxIndexMap<TaskId, bool>>,
161        range: Option<Range<usize>>,
162    },
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
168
169struct TurboTasksBackendInner<B: BackingStorage> {
170    options: BackendOptions,
171
172    start_time: Instant,
173    session_id: SessionId,
174
175    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
176    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
177
178    persisted_task_cache_log: Option<TaskCacheLog>,
179    task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
180    transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
181
182    storage: Storage,
183
184    /// When true, the backing_storage has data that is not in the local storage.
185    local_is_partial: AtomicBool,
186
187    /// Number of executing operations + Highest bit is set when snapshot is
188    /// requested. When that bit is set, operations should pause until the
189    /// snapshot is completed. When the bit is set and in progress counter
190    /// reaches zero, `operations_completed_when_snapshot_requested` is
191    /// triggered.
192    in_progress_operations: AtomicUsize,
193
194    snapshot_request: Mutex<SnapshotRequest>,
195    /// Condition Variable that is triggered when `in_progress_operations`
196    /// reaches zero while snapshot is requested. All operations are either
197    /// completed or suspended.
198    operations_suspended: Condvar,
199    /// Condition Variable that is triggered when a snapshot is completed and
200    /// operations can continue.
201    snapshot_completed: Condvar,
202    /// The timestamp of the last started snapshot since [`Self::start_time`].
203    last_snapshot: AtomicU64,
204
205    stopping: AtomicBool,
206    stopping_event: Event,
207    idle_start_event: Event,
208    idle_end_event: Event,
209    #[cfg(feature = "verify_aggregation_graph")]
210    is_idle: AtomicBool,
211
212    task_statistics: TaskStatisticsApi,
213
214    backing_storage: B,
215
216    #[cfg(feature = "verify_aggregation_graph")]
217    root_tasks: Mutex<FxHashSet<TaskId>>,
218}
219
220impl<B: BackingStorage> TurboTasksBackend<B> {
221    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
222        Self(Arc::new(TurboTasksBackendInner::new(
223            options,
224            backing_storage,
225        )))
226    }
227
228    pub fn backing_storage(&self) -> &B {
229        &self.0.backing_storage
230    }
231}
232
233impl<B: BackingStorage> TurboTasksBackendInner<B> {
234    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
235        let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
236        let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
237        if !options.dependency_tracking {
238            options.active_tracking = false;
239        }
240        let small_preallocation = options.small_preallocation;
241        let next_task_id = backing_storage
242            .next_free_task_id()
243            .expect("Failed to get task id");
244        Self {
245            options,
246            start_time: Instant::now(),
247            session_id: backing_storage
248                .next_session_id()
249                .expect("Failed get session id"),
250            persisted_task_id_factory: IdFactoryWithReuse::new(
251                next_task_id,
252                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
253            ),
254            transient_task_id_factory: IdFactoryWithReuse::new(
255                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
256                TaskId::MAX,
257            ),
258            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
259            task_cache: BiMap::new(),
260            transient_tasks: FxDashMap::default(),
261            local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
262            storage: Storage::new(shard_amount, small_preallocation),
263            in_progress_operations: AtomicUsize::new(0),
264            snapshot_request: Mutex::new(SnapshotRequest::new()),
265            operations_suspended: Condvar::new(),
266            snapshot_completed: Condvar::new(),
267            last_snapshot: AtomicU64::new(0),
268            stopping: AtomicBool::new(false),
269            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
270            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
271            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
272            #[cfg(feature = "verify_aggregation_graph")]
273            is_idle: AtomicBool::new(false),
274            task_statistics: TaskStatisticsApi::default(),
275            backing_storage,
276            #[cfg(feature = "verify_aggregation_graph")]
277            root_tasks: Default::default(),
278        }
279    }
280
281    fn execute_context<'a>(
282        &'a self,
283        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
284    ) -> impl ExecuteContext<'a> {
285        ExecuteContextImpl::new(self, turbo_tasks)
286    }
287
288    fn session_id(&self) -> SessionId {
289        self.session_id
290    }
291
292    /// # Safety
293    ///
294    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
295    unsafe fn execute_context_with_tx<'e, 'tx>(
296        &'e self,
297        tx: Option<&'e B::ReadTransaction<'tx>>,
298        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
299    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
300    where
301        'tx: 'e,
302    {
303        // Safety: `tx` is from `self`.
304        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
305    }
306
307    fn suspending_requested(&self) -> bool {
308        self.should_persist()
309            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
310    }
311
312    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
313        #[cold]
314        fn operation_suspend_point_cold<B: BackingStorage>(
315            this: &TurboTasksBackendInner<B>,
316            suspend: impl FnOnce() -> AnyOperation,
317        ) {
318            let operation = Arc::new(suspend());
319            let mut snapshot_request = this.snapshot_request.lock();
320            if snapshot_request.snapshot_requested {
321                snapshot_request
322                    .suspended_operations
323                    .insert(operation.clone().into());
324                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
325                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
326                if value == SNAPSHOT_REQUESTED_BIT {
327                    this.operations_suspended.notify_all();
328                }
329                this.snapshot_completed
330                    .wait_while(&mut snapshot_request, |snapshot_request| {
331                        snapshot_request.snapshot_requested
332                    });
333                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
334                snapshot_request
335                    .suspended_operations
336                    .remove(&operation.into());
337            }
338        }
339
340        if self.suspending_requested() {
341            operation_suspend_point_cold(self, suspend);
342        }
343    }
344
345    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
346        if !self.should_persist() {
347            return OperationGuard { backend: None };
348        }
349        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
350        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
351            let mut snapshot_request = self.snapshot_request.lock();
352            if snapshot_request.snapshot_requested {
353                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
354                if value == SNAPSHOT_REQUESTED_BIT {
355                    self.operations_suspended.notify_all();
356                }
357                self.snapshot_completed
358                    .wait_while(&mut snapshot_request, |snapshot_request| {
359                        snapshot_request.snapshot_requested
360                    });
361                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
362            }
363        }
364        OperationGuard {
365            backend: Some(self),
366        }
367    }
368
369    fn should_persist(&self) -> bool {
370        matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
371    }
372
373    fn should_restore(&self) -> bool {
374        self.options.storage_mode.is_some()
375    }
376
377    fn should_track_dependencies(&self) -> bool {
378        self.options.dependency_tracking
379    }
380
381    fn should_track_activeness(&self) -> bool {
382        self.options.active_tracking
383    }
384
385    fn track_cache_hit(&self, task_type: &CachedTaskType) {
386        self.task_statistics
387            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
388    }
389
390    fn track_cache_miss(&self, task_type: &CachedTaskType) {
391        self.task_statistics
392            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
393    }
394}
395
396pub(crate) struct OperationGuard<'a, B: BackingStorage> {
397    backend: Option<&'a TurboTasksBackendInner<B>>,
398}
399
400impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
401    fn drop(&mut self) {
402        if let Some(backend) = self.backend {
403            let fetch_sub = backend
404                .in_progress_operations
405                .fetch_sub(1, Ordering::AcqRel);
406            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
407                backend.operations_suspended.notify_all();
408            }
409        }
410    }
411}
412
413/// Intermediate result of step 1 of task execution completion.
414struct TaskExecutionCompletePrepareResult {
415    pub new_children: FxHashSet<TaskId>,
416    pub removed_data: Vec<CachedDataItem>,
417    pub is_now_immutable: bool,
418    #[cfg(feature = "verify_determinism")]
419    pub no_output_set: bool,
420    pub new_output: Option<OutputValue>,
421    pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
422}
423
424// Operations
425impl<B: BackingStorage> TurboTasksBackendInner<B> {
426    /// # Safety
427    ///
428    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
429    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
430        &'l self,
431        tx: Option<&'l B::ReadTransaction<'tx>>,
432        parent_task: Option<TaskId>,
433        child_task: TaskId,
434        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
435    ) {
436        operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
437            self.execute_context_with_tx(tx, turbo_tasks)
438        });
439    }
440
441    fn connect_child(
442        &self,
443        parent_task: Option<TaskId>,
444        child_task: TaskId,
445        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
446    ) {
447        operation::ConnectChildOperation::run(
448            parent_task,
449            child_task,
450            self.execute_context(turbo_tasks),
451        );
452    }
453
454    fn try_read_task_output(
455        self: &Arc<Self>,
456        task_id: TaskId,
457        reader: Option<TaskId>,
458        options: ReadOutputOptions,
459        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
460    ) -> Result<Result<RawVc, EventListener>> {
461        self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
462
463        let mut ctx = self.execute_context(turbo_tasks);
464        let need_reader_task = if self.should_track_dependencies()
465            && !matches!(options.tracking, ReadTracking::Untracked)
466            && reader.is_some_and(|reader_id| reader_id != task_id)
467            && let Some(reader_id) = reader
468            && reader_id != task_id
469        {
470            Some(reader_id)
471        } else {
472            None
473        };
474        let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
475            // Having a task_pair here is not optimal, but otherwise this would lead to a race
476            // condition. See below.
477            // TODO(sokra): solve that in a more performant way.
478            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
479            (task, Some(reader))
480        } else {
481            (ctx.task(task_id, TaskDataCategory::All), None)
482        };
483
484        fn listen_to_done_event<B: BackingStorage>(
485            this: &TurboTasksBackendInner<B>,
486            reader: Option<TaskId>,
487            tracking: ReadTracking,
488            done_event: &Event,
489        ) -> EventListener {
490            done_event.listen_with_note(move || {
491                let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
492                move || {
493                    if let Some(reader_desc) = reader_desc.as_ref() {
494                        format!("try_read_task_output from {} ({})", reader_desc(), tracking)
495                    } else {
496                        format!("try_read_task_output ({})", tracking)
497                    }
498                }
499            })
500        }
501
502        fn check_in_progress<B: BackingStorage>(
503            this: &TurboTasksBackendInner<B>,
504            task: &impl TaskGuard,
505            reader: Option<TaskId>,
506            tracking: ReadTracking,
507            ctx: &impl ExecuteContext<'_>,
508        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
509        {
510            match get!(task, InProgress) {
511                Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
512                    listen_to_done_event(this, reader, tracking, done_event),
513                ))),
514                Some(InProgressState::InProgress(box InProgressStateInner {
515                    done_event, ..
516                })) => Some(Ok(Err(listen_to_done_event(
517                    this, reader, tracking, done_event,
518                )))),
519                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
520                    "{} was canceled",
521                    ctx.get_task_description(task.id())
522                ))),
523                None => None,
524            }
525        }
526
527        if matches!(options.consistency, ReadConsistency::Strong) {
528            // Ensure it's an root node
529            loop {
530                let aggregation_number = get_aggregation_number(&task);
531                if is_root_node(aggregation_number) {
532                    break;
533                }
534                drop(task);
535                drop(reader_task);
536                {
537                    let _span = tracing::trace_span!(
538                        "make root node for strongly consistent read",
539                        %task_id
540                    )
541                    .entered();
542                    AggregationUpdateQueue::run(
543                        AggregationUpdateJob::UpdateAggregationNumber {
544                            task_id,
545                            base_aggregation_number: u32::MAX,
546                            distance: None,
547                        },
548                        &mut ctx,
549                    );
550                }
551                (task, reader_task) = if let Some(reader_id) = need_reader_task {
552                    // TODO(sokra): see comment above
553                    let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
554                    (task, Some(reader))
555                } else {
556                    (ctx.task(task_id, TaskDataCategory::All), None)
557                }
558            }
559
560            let is_dirty =
561                get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
562
563            // Check the dirty count of the root node
564            let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
565                .cloned()
566                .unwrap_or_default()
567                .get(self.session_id);
568            if dirty_tasks > 0 || is_dirty {
569                let activeness = get_mut!(task, Activeness);
570                let mut task_ids_to_schedule: Vec<_> = Vec::new();
571                // When there are dirty task, subscribe to the all_clean_event
572                let activeness = if let Some(activeness) = activeness {
573                    // This makes sure all tasks stay active and this task won't stale.
574                    // active_until_clean is automatically removed when this
575                    // task is clean.
576                    activeness.set_active_until_clean();
577                    activeness
578                } else {
579                    // If we don't have a root state, add one. This also makes sure all tasks stay
580                    // active and this task won't stale. active_until_clean
581                    // is automatically removed when this task is clean.
582                    get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
583                        .set_active_until_clean();
584                    if ctx.should_track_activeness() {
585                        // A newly added Activeness need to make sure to schedule the tasks
586                        task_ids_to_schedule = get_many!(
587                            task,
588                            AggregatedDirtyContainer {
589                                task
590                            } count if count.get(self.session_id) > 0 => {
591                                task
592                            }
593                        );
594                        task_ids_to_schedule.push(task_id);
595                    }
596                    get!(task, Activeness).unwrap()
597                };
598                let listener = activeness.all_clean_event.listen_with_note(move || {
599                    let this = self.clone();
600                    let tt = turbo_tasks.pin();
601                    move || {
602                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
603                        let mut ctx = this.execute_context(tt);
604                        let mut visited = FxHashSet::default();
605                        fn indent(s: &str) -> String {
606                            s.split_inclusive('\n')
607                                .flat_map(|line: &str| ["  ", line].into_iter())
608                                .collect::<String>()
609                        }
610                        fn get_info(
611                            ctx: &mut impl ExecuteContext<'_>,
612                            task_id: TaskId,
613                            parent_and_count: Option<(TaskId, i32)>,
614                            visited: &mut FxHashSet<TaskId>,
615                        ) -> String {
616                            let task = ctx.task(task_id, TaskDataCategory::Data);
617                            let is_dirty = get!(task, Dirty)
618                                .map_or(false, |dirty_state| dirty_state.get(ctx.session_id()));
619                            let in_progress =
620                                get!(task, InProgress).map_or("not in progress", |p| match p {
621                                    InProgressState::InProgress(_) => "in progress",
622                                    InProgressState::Scheduled { .. } => "scheduled",
623                                    InProgressState::Canceled => "canceled",
624                                });
625                            let activeness = get!(task, Activeness).map_or_else(
626                                || "not active".to_string(),
627                                |activeness| format!("{activeness:?}"),
628                            );
629                            let aggregation_number = get_aggregation_number(&task);
630                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
631                            {
632                                let uppers = get_uppers(&task);
633                                !uppers.contains(&parent_task_id)
634                            } else {
635                                false
636                            };
637
638                            // Check the dirty count of the root node
639                            let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
640                                .cloned()
641                                .unwrap_or_default()
642                                .get(ctx.session_id());
643
644                            let task_description = ctx.get_task_description(task_id);
645                            let is_dirty = if is_dirty { ", dirty" } else { "" };
646                            let count = if let Some((_, count)) = parent_and_count {
647                                format!(" {count}")
648                            } else {
649                                String::new()
650                            };
651                            let mut info = format!(
652                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
653                                 {in_progress}, {activeness}{is_dirty})",
654                            );
655                            let children: Vec<_> = iter_many!(
656                                task,
657                                AggregatedDirtyContainer {
658                                    task
659                                } count => {
660                                    (task, count.get(ctx.session_id()))
661                                }
662                            )
663                            .filter(|(_, count)| *count > 0)
664                            .collect();
665                            drop(task);
666
667                            if missing_upper {
668                                info.push_str("\n  ERROR: missing upper connection");
669                            }
670
671                            if dirty_tasks > 0 || !children.is_empty() {
672                                writeln!(info, "\n  {dirty_tasks} dirty tasks:").unwrap();
673
674                                for (child_task_id, count) in children {
675                                    let task_description = ctx.get_task_description(child_task_id);
676                                    if visited.insert(child_task_id) {
677                                        let child_info = get_info(
678                                            ctx,
679                                            child_task_id,
680                                            Some((task_id, count)),
681                                            visited,
682                                        );
683                                        info.push_str(&indent(&child_info));
684                                        if !info.ends_with('\n') {
685                                            info.push('\n');
686                                        }
687                                    } else {
688                                        writeln!(
689                                            info,
690                                            "  {child_task_id} {task_description} {count} \
691                                             (already visited)"
692                                        )
693                                        .unwrap();
694                                    }
695                                }
696                            }
697                            info
698                        }
699                        let info = get_info(&mut ctx, task_id, None, &mut visited);
700                        format!(
701                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
702                        )
703                    }
704                });
705                drop(reader_task);
706                drop(task);
707                if !task_ids_to_schedule.is_empty() {
708                    let mut queue = AggregationUpdateQueue::new();
709                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
710                    queue.execute(&mut ctx);
711                }
712
713                return Ok(Err(listener));
714            }
715        }
716
717        if let Some(value) = check_in_progress(self, &task, reader, options.tracking, &ctx) {
718            return value;
719        }
720
721        if let Some(output) = get!(task, Output) {
722            let result = match output {
723                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
724                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
725                OutputValue::Error(error) => {
726                    let err: anyhow::Error = error.clone().into();
727                    Err(err.context(format!(
728                        "Execution of {} failed",
729                        ctx.get_task_description(task_id)
730                    )))
731                }
732            };
733            if let Some(mut reader_task) = reader_task
734                && options.tracking.should_track(result.is_err())
735                && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
736            {
737                let reader = reader.unwrap();
738                let _ = task.add(CachedDataItem::OutputDependent {
739                    task: reader,
740                    value: (),
741                });
742                drop(task);
743
744                // Note: We use `task_pair` earlier to lock the task and its reader at the same
745                // time. If we didn't and just locked the reader here, an invalidation could occur
746                // between grabbing the locks. If that happened, and if the task is "outdated" or
747                // doesn't have the dependency edge yet, the invalidation would be lost.
748
749                if reader_task
750                    .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
751                    .is_none()
752                {
753                    let _ = reader_task.add(CachedDataItem::OutputDependency {
754                        target: task_id,
755                        value: (),
756                    });
757                }
758            }
759
760            return result;
761        }
762        drop(reader_task);
763
764        let note = move || {
765            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
766            move || {
767                if let Some(reader_desc) = reader_desc.as_ref() {
768                    format!("try_read_task_output (recompute) from {}", (reader_desc)())
769                } else {
770                    "try_read_task_output (recompute, untracked)".to_string()
771                }
772            }
773        };
774
775        // Output doesn't exist. We need to schedule the task to compute it.
776        let (item, listener) = CachedDataItem::new_scheduled_with_listener(
777            TaskExecutionReason::OutputNotAvailable,
778            || self.get_task_desc_fn(task_id),
779            note,
780        );
781        // It's not possible that the task is InProgress at this point. If it is InProgress {
782        // done: true } it must have Output and would early return.
783        task.add_new(item);
784        ctx.schedule_task(task);
785
786        Ok(Err(listener))
787    }
788
789    fn try_read_task_cell(
790        &self,
791        task_id: TaskId,
792        reader: Option<TaskId>,
793        cell: CellId,
794        options: ReadCellOptions,
795        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
796    ) -> Result<Result<TypedCellContent, EventListener>> {
797        self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
798
799        fn add_cell_dependency(
800            task_id: TaskId,
801            mut task: impl TaskGuard,
802            reader: Option<TaskId>,
803            reader_task: Option<impl TaskGuard>,
804            cell: CellId,
805        ) {
806            if let Some(mut reader_task) = reader_task
807                && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
808            {
809                let _ = task.add(CachedDataItem::CellDependent {
810                    cell,
811                    task: reader.unwrap(),
812                    value: (),
813                });
814                drop(task);
815
816                // Note: We use `task_pair` earlier to lock the task and its reader at the same
817                // time. If we didn't and just locked the reader here, an invalidation could occur
818                // between grabbing the locks. If that happened, and if the task is "outdated" or
819                // doesn't have the dependency edge yet, the invalidation would be lost.
820
821                let target = CellRef {
822                    task: task_id,
823                    cell,
824                };
825                if reader_task
826                    .remove(&CachedDataItemKey::OutdatedCellDependency { target })
827                    .is_none()
828                {
829                    let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
830                }
831            }
832        }
833
834        let mut ctx = self.execute_context(turbo_tasks);
835        let (mut task, reader_task) = if self.should_track_dependencies()
836            && !matches!(options.tracking, ReadTracking::Untracked)
837            && let Some(reader_id) = reader
838            && reader_id != task_id
839        {
840            // Having a task_pair here is not optimal, but otherwise this would lead to a race
841            // condition. See below.
842            // TODO(sokra): solve that in a more performant way.
843            let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::Data);
844            (task, Some(reader))
845        } else {
846            (ctx.task(task_id, TaskDataCategory::Data), None)
847        };
848
849        let content = if options.final_read_hint {
850            remove!(task, CellData { cell })
851        } else if let Some(content) = get!(task, CellData { cell }) {
852            let content = content.clone();
853            Some(content)
854        } else {
855            None
856        };
857        if let Some(content) = content {
858            if options.tracking.should_track(false) {
859                add_cell_dependency(task_id, task, reader, reader_task, cell);
860            }
861            return Ok(Ok(TypedCellContent(
862                cell.type_id,
863                CellContent(Some(content.reference)),
864            )));
865        }
866
867        let in_progress = get!(task, InProgress);
868        if matches!(
869            in_progress,
870            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
871        ) {
872            return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
873        }
874        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
875
876        // Check cell index range (cell might not exist at all)
877        let max_id = get!(
878            task,
879            CellTypeMaxIndex {
880                cell_type: cell.type_id
881            }
882        )
883        .copied();
884        let Some(max_id) = max_id else {
885            if options.tracking.should_track(true) {
886                add_cell_dependency(task_id, task, reader, reader_task, cell);
887            }
888            bail!(
889                "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
890                ctx.get_task_description(task_id)
891            );
892        };
893        if cell.index >= max_id {
894            if options.tracking.should_track(true) {
895                add_cell_dependency(task_id, task, reader, reader_task, cell);
896            }
897            bail!(
898                "Cell {cell:?} no longer exists in task {} (index out of bounds)",
899                ctx.get_task_description(task_id)
900            );
901        }
902        drop(reader_task);
903
904        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
905        // task the get the cell content.
906
907        // Listen to the cell and potentially schedule the task
908        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
909        if !new_listener {
910            return Ok(Err(listener));
911        }
912
913        let _span = tracing::trace_span!(
914            "recomputation",
915            cell_type = get_value_type(cell.type_id).global_name,
916            cell_index = cell.index
917        )
918        .entered();
919
920        // Schedule the task, if not already scheduled
921        if is_cancelled {
922            bail!("{} was canceled", ctx.get_task_description(task_id));
923        }
924        task.add_new(CachedDataItem::new_scheduled(
925            TaskExecutionReason::CellNotAvailable,
926            || self.get_task_desc_fn(task_id),
927        ));
928        ctx.schedule_task(task);
929
930        Ok(Err(listener))
931    }
932
933    fn listen_to_cell(
934        &self,
935        task: &mut impl TaskGuard,
936        task_id: TaskId,
937        reader: Option<TaskId>,
938        cell: CellId,
939    ) -> (EventListener, bool) {
940        let note = move || {
941            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
942            move || {
943                if let Some(reader_desc) = reader_desc.as_ref() {
944                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
945                } else {
946                    "try_read_task_cell (in progress, untracked)".to_string()
947                }
948            }
949        };
950        if let Some(in_progress) = get!(task, InProgressCell { cell }) {
951            // Someone else is already computing the cell
952            let listener = in_progress.event.listen_with_note(note);
953            return (listener, false);
954        }
955        let in_progress = InProgressCellState::new(task_id, cell);
956        let listener = in_progress.event.listen_with_note(note);
957        task.add_new(CachedDataItem::InProgressCell {
958            cell,
959            value: in_progress,
960        });
961        (listener, true)
962    }
963
964    fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
965        if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
966            return Some(task_type);
967        }
968        if self.should_restore()
969            && self.local_is_partial.load(Ordering::Acquire)
970            && !task_id.is_transient()
971            && let Some(task_type) = unsafe {
972                self.backing_storage
973                    .reverse_lookup_task_cache(None, task_id)
974                    .expect("Failed to lookup task type")
975            }
976        {
977            let _ = self.task_cache.try_insert(task_type.clone(), task_id);
978            return Some(task_type);
979        }
980        None
981    }
982
983    fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
984        let task_type = self.lookup_task_type(task_id);
985        move || {
986            task_type.as_ref().map_or_else(
987                || format!("{task_id:?} transient"),
988                |task_type| format!("{task_id:?} {task_type}"),
989            )
990        }
991    }
992
993    fn snapshot(&self) -> Option<(Instant, bool)> {
994        let start = Instant::now();
995        debug_assert!(self.should_persist());
996        let mut snapshot_request = self.snapshot_request.lock();
997        snapshot_request.snapshot_requested = true;
998        let active_operations = self
999            .in_progress_operations
1000            .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1001        if active_operations != 0 {
1002            self.operations_suspended
1003                .wait_while(&mut snapshot_request, |_| {
1004                    self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
1005                });
1006        }
1007        let suspended_operations = snapshot_request
1008            .suspended_operations
1009            .iter()
1010            .map(|op| op.arc().clone())
1011            .collect::<Vec<_>>();
1012        drop(snapshot_request);
1013        self.storage.start_snapshot();
1014        let mut persisted_task_cache_log = self
1015            .persisted_task_cache_log
1016            .as_ref()
1017            .map(|l| l.take(|i| i))
1018            .unwrap_or_default();
1019        let mut snapshot_request = self.snapshot_request.lock();
1020        snapshot_request.snapshot_requested = false;
1021        self.in_progress_operations
1022            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1023        self.snapshot_completed.notify_all();
1024        let snapshot_time = Instant::now();
1025        drop(snapshot_request);
1026
1027        let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
1028            if task_id.is_transient() {
1029                return (None, None);
1030            }
1031            let len = inner.len();
1032
1033            let meta_restored = inner.state().meta_restored();
1034            let data_restored = inner.state().data_restored();
1035
1036            let mut meta = meta_restored.then(|| Vec::with_capacity(len));
1037            let mut data = data_restored.then(|| Vec::with_capacity(len));
1038            for (key, value) in inner.iter_all() {
1039                if key.is_persistent() && value.is_persistent() {
1040                    match key.category() {
1041                        TaskDataCategory::Meta => {
1042                            if let Some(meta) = &mut meta {
1043                                meta.push(CachedDataItem::from_key_and_value_ref(key, value))
1044                            }
1045                        }
1046                        TaskDataCategory::Data => {
1047                            if let Some(data) = &mut data {
1048                                data.push(CachedDataItem::from_key_and_value_ref(key, value))
1049                            }
1050                        }
1051                        _ => {}
1052                    }
1053                }
1054            }
1055
1056            (meta, data)
1057        };
1058        let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
1059            (
1060                task_id,
1061                meta.map(|d| self.backing_storage.serialize(task_id, &d)),
1062                data.map(|d| self.backing_storage.serialize(task_id, &d)),
1063            )
1064        };
1065        let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
1066            if task_id.is_transient() {
1067                return (task_id, None, None);
1068            }
1069            let len = inner.len();
1070            let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
1071            let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
1072            for (key, value) in inner.iter_all() {
1073                if key.is_persistent() && value.is_persistent() {
1074                    match key.category() {
1075                        TaskDataCategory::Meta => {
1076                            if let Some(meta) = &mut meta {
1077                                meta.push(CachedDataItem::from_key_and_value_ref(key, value));
1078                            }
1079                        }
1080                        TaskDataCategory::Data => {
1081                            if let Some(data) = &mut data {
1082                                data.push(CachedDataItem::from_key_and_value_ref(key, value));
1083                            }
1084                        }
1085                        _ => {}
1086                    }
1087                }
1088            }
1089            (
1090                task_id,
1091                meta.map(|meta| self.backing_storage.serialize(task_id, &meta)),
1092                data.map(|data| self.backing_storage.serialize(task_id, &data)),
1093            )
1094        };
1095
1096        let snapshot = {
1097            let _span = tracing::trace_span!("take snapshot");
1098            self.storage
1099                .take_snapshot(&preprocess, &process, &process_snapshot)
1100        };
1101
1102        #[cfg(feature = "print_cache_item_size")]
1103        #[derive(Default)]
1104        struct TaskCacheStats {
1105            data: usize,
1106            data_count: usize,
1107            meta: usize,
1108            meta_count: usize,
1109        }
1110        #[cfg(feature = "print_cache_item_size")]
1111        impl TaskCacheStats {
1112            fn add_data(&mut self, len: usize) {
1113                self.data += len;
1114                self.data_count += 1;
1115            }
1116
1117            fn add_meta(&mut self, len: usize) {
1118                self.meta += len;
1119                self.meta_count += 1;
1120            }
1121        }
1122        #[cfg(feature = "print_cache_item_size")]
1123        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1124            Mutex::new(FxHashMap::default());
1125
1126        let task_snapshots = snapshot
1127            .into_iter()
1128            .filter_map(|iter| {
1129                let mut iter = iter
1130                    .filter_map(
1131                        |(task_id, meta, data): (
1132                            _,
1133                            Option<Result<SmallVec<_>>>,
1134                            Option<Result<SmallVec<_>>>,
1135                        )| {
1136                            let meta = match meta {
1137                                Some(Ok(meta)) => {
1138                                    #[cfg(feature = "print_cache_item_size")]
1139                                    task_cache_stats
1140                                        .lock()
1141                                        .entry(self.get_task_description(task_id))
1142                                        .or_default()
1143                                        .add_meta(meta.len());
1144                                    Some(meta)
1145                                }
1146                                None => None,
1147                                Some(Err(err)) => {
1148                                    println!(
1149                                        "Serializing task {} failed (meta): {:?}",
1150                                        self.get_task_description(task_id),
1151                                        err
1152                                    );
1153                                    None
1154                                }
1155                            };
1156                            let data = match data {
1157                                Some(Ok(data)) => {
1158                                    #[cfg(feature = "print_cache_item_size")]
1159                                    task_cache_stats
1160                                        .lock()
1161                                        .entry(self.get_task_description(task_id))
1162                                        .or_default()
1163                                        .add_data(data.len());
1164                                    Some(data)
1165                                }
1166                                None => None,
1167                                Some(Err(err)) => {
1168                                    println!(
1169                                        "Serializing task {} failed (data): {:?}",
1170                                        self.get_task_description(task_id),
1171                                        err
1172                                    );
1173                                    None
1174                                }
1175                            };
1176                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1177                        },
1178                    )
1179                    .peekable();
1180                iter.peek().is_some().then_some(iter)
1181            })
1182            .collect::<Vec<_>>();
1183
1184        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1185
1186        let mut new_items = false;
1187
1188        if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
1189            new_items = true;
1190            if let Err(err) = self.backing_storage.save_snapshot(
1191                self.session_id,
1192                suspended_operations,
1193                persisted_task_cache_log,
1194                task_snapshots,
1195            ) {
1196                println!("Persisting failed: {err:?}");
1197                return None;
1198            }
1199            #[cfg(feature = "print_cache_item_size")]
1200            {
1201                let mut task_cache_stats = task_cache_stats
1202                    .into_inner()
1203                    .into_iter()
1204                    .collect::<Vec<_>>();
1205                if !task_cache_stats.is_empty() {
1206                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1207                        (stats_b.data + stats_b.meta, key_b)
1208                            .cmp(&(stats_a.data + stats_a.meta, key_a))
1209                    });
1210                    println!("Task cache stats:");
1211                    for (task_desc, stats) in task_cache_stats {
1212                        use std::ops::Div;
1213
1214                        use turbo_tasks::util::FormatBytes;
1215
1216                        println!(
1217                            "  {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1218                            FormatBytes(stats.data + stats.meta),
1219                            FormatBytes(stats.meta),
1220                            stats.meta_count,
1221                            FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1222                            FormatBytes(stats.data),
1223                            stats.data_count,
1224                            FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1225                        );
1226                    }
1227                }
1228            }
1229        }
1230
1231        if new_items {
1232            let elapsed = start.elapsed();
1233            // avoid spamming the event queue with information about fast operations
1234            if elapsed > Duration::from_secs(10) {
1235                turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1236                    "Finished writing to filesystem cache".to_string(),
1237                    elapsed,
1238                )));
1239            }
1240        }
1241
1242        Some((snapshot_time, new_items))
1243    }
1244
1245    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1246        if self.should_restore() {
1247            // Continue all uncompleted operations
1248            // They can't be interrupted by a snapshot since the snapshotting job has not been
1249            // scheduled yet.
1250            let uncompleted_operations = self
1251                .backing_storage
1252                .uncompleted_operations()
1253                .expect("Failed to get uncompleted operations");
1254            if !uncompleted_operations.is_empty() {
1255                let mut ctx = self.execute_context(turbo_tasks);
1256                for op in uncompleted_operations {
1257                    op.execute(&mut ctx);
1258                }
1259            }
1260        }
1261
1262        if self.should_persist() {
1263            // Schedule the snapshot job
1264            let _span = Span::none().entered();
1265            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1266        }
1267    }
1268
1269    fn stopping(&self) {
1270        self.stopping.store(true, Ordering::Release);
1271        self.stopping_event.notify(usize::MAX);
1272    }
1273
1274    #[allow(unused_variables)]
1275    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1276        #[cfg(feature = "verify_aggregation_graph")]
1277        {
1278            self.is_idle.store(false, Ordering::Release);
1279            self.verify_aggregation_graph(turbo_tasks, false);
1280        }
1281        if self.should_persist() {
1282            let _span = tracing::info_span!("persist on stop").entered();
1283            self.snapshot();
1284        }
1285        self.task_cache.drop_contents();
1286        drop_contents(&self.transient_tasks);
1287        self.storage.drop_contents();
1288        if let Err(err) = self.backing_storage.shutdown() {
1289            println!("Shutting down failed: {err}");
1290        }
1291    }
1292
1293    #[allow(unused_variables)]
1294    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1295        self.idle_start_event.notify(usize::MAX);
1296
1297        #[cfg(feature = "verify_aggregation_graph")]
1298        {
1299            use tokio::select;
1300
1301            self.is_idle.store(true, Ordering::Release);
1302            let this = self.clone();
1303            let turbo_tasks = turbo_tasks.pin();
1304            tokio::task::spawn(async move {
1305                select! {
1306                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1307                        // do nothing
1308                    }
1309                    _ = this.idle_end_event.listen() => {
1310                        return;
1311                    }
1312                }
1313                if !this.is_idle.load(Ordering::Relaxed) {
1314                    return;
1315                }
1316                this.verify_aggregation_graph(&*turbo_tasks, true);
1317            });
1318        }
1319    }
1320
1321    fn idle_end(&self) {
1322        #[cfg(feature = "verify_aggregation_graph")]
1323        self.is_idle.store(false, Ordering::Release);
1324        self.idle_end_event.notify(usize::MAX);
1325    }
1326
1327    fn get_or_create_persistent_task(
1328        &self,
1329        task_type: CachedTaskType,
1330        parent_task: Option<TaskId>,
1331        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1332    ) -> TaskId {
1333        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1334            self.track_cache_hit(&task_type);
1335            self.connect_child(parent_task, task_id, turbo_tasks);
1336            return task_id;
1337        }
1338
1339        let check_backing_storage =
1340            self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1341        let tx = check_backing_storage
1342            .then(|| self.backing_storage.start_read_transaction())
1343            .flatten();
1344        let task_id = {
1345            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1346            if let Some(task_id) = unsafe {
1347                check_backing_storage
1348                    .then(|| {
1349                        self.backing_storage
1350                            .forward_lookup_task_cache(tx.as_ref(), &task_type)
1351                            .expect("Failed to lookup task id")
1352                    })
1353                    .flatten()
1354            } {
1355                self.track_cache_hit(&task_type);
1356                let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1357                task_id
1358            } else {
1359                let task_type = Arc::new(task_type);
1360                let task_id = self.persisted_task_id_factory.get();
1361                let task_id = if let Err(existing_task_id) =
1362                    self.task_cache.try_insert(task_type.clone(), task_id)
1363                {
1364                    self.track_cache_hit(&task_type);
1365                    // Safety: We just created the id and failed to insert it.
1366                    unsafe {
1367                        self.persisted_task_id_factory.reuse(task_id);
1368                    }
1369                    existing_task_id
1370                } else {
1371                    self.track_cache_miss(&task_type);
1372                    task_id
1373                };
1374                if let Some(log) = &self.persisted_task_cache_log {
1375                    log.lock(task_id).push((task_type, task_id));
1376                }
1377                task_id
1378            }
1379        };
1380
1381        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1382        unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1383
1384        task_id
1385    }
1386
1387    fn get_or_create_transient_task(
1388        &self,
1389        task_type: CachedTaskType,
1390        parent_task: Option<TaskId>,
1391        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1392    ) -> TaskId {
1393        if let Some(parent_task) = parent_task
1394            && !parent_task.is_transient()
1395        {
1396            self.panic_persistent_calling_transient(
1397                self.lookup_task_type(parent_task).as_deref(),
1398                Some(&task_type),
1399                /* cell_id */ None,
1400            );
1401        }
1402        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1403            self.track_cache_hit(&task_type);
1404            self.connect_child(parent_task, task_id, turbo_tasks);
1405            return task_id;
1406        }
1407
1408        let task_type = Arc::new(task_type);
1409        let task_id = self.transient_task_id_factory.get();
1410        if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1411            self.track_cache_hit(&task_type);
1412            // Safety: We just created the id and failed to insert it.
1413            unsafe {
1414                self.transient_task_id_factory.reuse(task_id);
1415            }
1416            self.connect_child(parent_task, existing_task_id, turbo_tasks);
1417            return existing_task_id;
1418        }
1419
1420        self.track_cache_miss(&task_type);
1421        self.connect_child(parent_task, task_id, turbo_tasks);
1422
1423        task_id
1424    }
1425
1426    /// Generate an object that implements [`fmt::Display`] explaining why the given
1427    /// [`CachedTaskType`] is transient.
1428    fn debug_trace_transient_task(
1429        &self,
1430        task_type: &CachedTaskType,
1431        cell_id: Option<CellId>,
1432    ) -> DebugTraceTransientTask {
1433        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1434        // from tracing the same task many times, so use a visited_set
1435        fn inner_id(
1436            backend: &TurboTasksBackendInner<impl BackingStorage>,
1437            task_id: TaskId,
1438            cell_type_id: Option<ValueTypeId>,
1439            visited_set: &mut FxHashSet<TaskId>,
1440        ) -> DebugTraceTransientTask {
1441            if let Some(task_type) = backend.lookup_task_type(task_id) {
1442                if visited_set.contains(&task_id) {
1443                    let task_name = task_type.get_name();
1444                    DebugTraceTransientTask::Collapsed {
1445                        task_name,
1446                        cell_type_id,
1447                    }
1448                } else {
1449                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1450                }
1451            } else {
1452                DebugTraceTransientTask::Uncached { cell_type_id }
1453            }
1454        }
1455        fn inner_cached(
1456            backend: &TurboTasksBackendInner<impl BackingStorage>,
1457            task_type: &CachedTaskType,
1458            cell_type_id: Option<ValueTypeId>,
1459            visited_set: &mut FxHashSet<TaskId>,
1460        ) -> DebugTraceTransientTask {
1461            let task_name = task_type.get_name();
1462
1463            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1464                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1465                    // `task_id` should never be `None` at this point, as that would imply a
1466                    // non-local task is returning a local `Vc`...
1467                    // Just ignore if it happens, as we're likely already panicking.
1468                    return None;
1469                };
1470                if task_id.is_transient() {
1471                    Some(Box::new(inner_id(
1472                        backend,
1473                        task_id,
1474                        cause_self_raw_vc.try_get_type_id(),
1475                        visited_set,
1476                    )))
1477                } else {
1478                    None
1479                }
1480            });
1481            let cause_args = task_type
1482                .arg
1483                .get_raw_vcs()
1484                .into_iter()
1485                .filter_map(|raw_vc| {
1486                    let Some(task_id) = raw_vc.try_get_task_id() else {
1487                        // `task_id` should never be `None` (see comment above)
1488                        return None;
1489                    };
1490                    if !task_id.is_transient() {
1491                        return None;
1492                    }
1493                    Some((task_id, raw_vc.try_get_type_id()))
1494                })
1495                .collect::<IndexSet<_>>() // dedupe
1496                .into_iter()
1497                .map(|(task_id, cell_type_id)| {
1498                    inner_id(backend, task_id, cell_type_id, visited_set)
1499                })
1500                .collect();
1501
1502            DebugTraceTransientTask::Cached {
1503                task_name,
1504                cell_type_id,
1505                cause_self,
1506                cause_args,
1507            }
1508        }
1509        inner_cached(
1510            self,
1511            task_type,
1512            cell_id.map(|c| c.type_id),
1513            &mut FxHashSet::default(),
1514        )
1515    }
1516
1517    fn invalidate_task(
1518        &self,
1519        task_id: TaskId,
1520        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1521    ) {
1522        if !self.should_track_dependencies() {
1523            panic!("Dependency tracking is disabled so invalidation is not allowed");
1524        }
1525        operation::InvalidateOperation::run(
1526            smallvec![task_id],
1527            #[cfg(feature = "trace_task_dirty")]
1528            TaskDirtyCause::Invalidator,
1529            self.execute_context(turbo_tasks),
1530        );
1531    }
1532
1533    fn invalidate_tasks(
1534        &self,
1535        tasks: &[TaskId],
1536        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1537    ) {
1538        if !self.should_track_dependencies() {
1539            panic!("Dependency tracking is disabled so invalidation is not allowed");
1540        }
1541        operation::InvalidateOperation::run(
1542            tasks.iter().copied().collect(),
1543            #[cfg(feature = "trace_task_dirty")]
1544            TaskDirtyCause::Unknown,
1545            self.execute_context(turbo_tasks),
1546        );
1547    }
1548
1549    fn invalidate_tasks_set(
1550        &self,
1551        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1552        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1553    ) {
1554        if !self.should_track_dependencies() {
1555            panic!("Dependency tracking is disabled so invalidation is not allowed");
1556        }
1557        operation::InvalidateOperation::run(
1558            tasks.iter().copied().collect(),
1559            #[cfg(feature = "trace_task_dirty")]
1560            TaskDirtyCause::Unknown,
1561            self.execute_context(turbo_tasks),
1562        );
1563    }
1564
1565    fn invalidate_serialization(
1566        &self,
1567        task_id: TaskId,
1568        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1569    ) {
1570        if task_id.is_transient() {
1571            return;
1572        }
1573        let mut ctx = self.execute_context(turbo_tasks);
1574        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1575        task.invalidate_serialization();
1576    }
1577
1578    fn get_task_description(&self, task_id: TaskId) -> String {
1579        self.lookup_task_type(task_id).map_or_else(
1580            || format!("{task_id:?} transient"),
1581            |task_type| task_type.to_string(),
1582        )
1583    }
1584
1585    fn task_execution_canceled(
1586        &self,
1587        task_id: TaskId,
1588        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1589    ) {
1590        let mut ctx = self.execute_context(turbo_tasks);
1591        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1592        if let Some(in_progress) = remove!(task, InProgress) {
1593            match in_progress {
1594                InProgressState::Scheduled {
1595                    done_event,
1596                    reason: _,
1597                } => done_event.notify(usize::MAX),
1598                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1599                    done_event.notify(usize::MAX)
1600                }
1601                InProgressState::Canceled => {}
1602            }
1603        }
1604        task.add_new(CachedDataItem::InProgress {
1605            value: InProgressState::Canceled,
1606        });
1607    }
1608
1609    fn try_start_task_execution(
1610        &self,
1611        task_id: TaskId,
1612        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1613    ) -> Option<TaskExecutionSpec<'_>> {
1614        enum TaskType {
1615            Cached(Arc<CachedTaskType>),
1616            Transient(Arc<TransientTask>),
1617        }
1618        let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1619            (TaskType::Cached(task_type), false)
1620        } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1621            (
1622                TaskType::Transient(task_type.clone()),
1623                matches!(**task_type, TransientTask::Once(_)),
1624            )
1625        } else {
1626            return None;
1627        };
1628        let execution_reason;
1629        {
1630            let mut ctx = self.execute_context(turbo_tasks);
1631            let mut task = ctx.task(task_id, TaskDataCategory::All);
1632            let in_progress = remove!(task, InProgress)?;
1633            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1634                task.add_new(CachedDataItem::InProgress { value: in_progress });
1635                return None;
1636            };
1637            execution_reason = reason;
1638            task.add_new(CachedDataItem::InProgress {
1639                value: InProgressState::InProgress(Box::new(InProgressStateInner {
1640                    stale: false,
1641                    once_task,
1642                    done_event,
1643                    session_dependent: false,
1644                    marked_as_completed: false,
1645                    new_children: Default::default(),
1646                })),
1647            });
1648
1649            // Make all current collectibles outdated (remove left-over outdated collectibles)
1650            enum Collectible {
1651                Current(CollectibleRef, i32),
1652                Outdated(CollectibleRef),
1653            }
1654            let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1655                    .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1656                    .collect::<Vec<_>>();
1657            for collectible in collectibles {
1658                match collectible {
1659                    Collectible::Current(collectible, value) => {
1660                        let _ =
1661                            task.insert(CachedDataItem::OutdatedCollectible { collectible, value });
1662                    }
1663                    Collectible::Outdated(collectible) => {
1664                        if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1665                            task.remove(&CachedDataItemKey::OutdatedCollectible { collectible });
1666                        }
1667                    }
1668                }
1669            }
1670
1671            if self.should_track_dependencies() {
1672                // Make all dependencies outdated
1673                enum Dep {
1674                    CurrentCell(CellRef),
1675                    CurrentOutput(TaskId),
1676                    OutdatedCell(CellRef),
1677                    OutdatedOutput(TaskId),
1678                }
1679                let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1680                    .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1681                    .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1682                    .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1683                    .collect::<Vec<_>>();
1684                for dep in dependencies {
1685                    match dep {
1686                        Dep::CurrentCell(cell) => {
1687                            let _ = task.add(CachedDataItem::OutdatedCellDependency {
1688                                target: cell,
1689                                value: (),
1690                            });
1691                        }
1692                        Dep::CurrentOutput(output) => {
1693                            let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1694                                target: output,
1695                                value: (),
1696                            });
1697                        }
1698                        Dep::OutdatedCell(cell) => {
1699                            if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1700                                task.remove(&CachedDataItemKey::OutdatedCellDependency {
1701                                    target: cell,
1702                                });
1703                            }
1704                        }
1705                        Dep::OutdatedOutput(output) => {
1706                            if !task
1707                                .has_key(&CachedDataItemKey::OutputDependency { target: output })
1708                            {
1709                                task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1710                                    target: output,
1711                                });
1712                            }
1713                        }
1714                    }
1715                }
1716            }
1717        }
1718
1719        let (span, future) = match task_type {
1720            TaskType::Cached(task_type) => {
1721                let CachedTaskType {
1722                    native_fn,
1723                    this,
1724                    arg,
1725                } = &*task_type;
1726                (
1727                    native_fn.span(task_id.persistence(), execution_reason),
1728                    native_fn.execute(*this, &**arg),
1729                )
1730            }
1731            TaskType::Transient(task_type) => {
1732                let span = tracing::trace_span!("turbo_tasks::root_task");
1733                let future = match &*task_type {
1734                    TransientTask::Root(f) => f(),
1735                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1736                };
1737                (span, future)
1738            }
1739        };
1740        Some(TaskExecutionSpec { future, span })
1741    }
1742
1743    fn task_execution_completed(
1744        &self,
1745        task_id: TaskId,
1746        result: Result<RawVc, TurboTasksExecutionError>,
1747        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1748        stateful: bool,
1749        has_invalidator: bool,
1750        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1751    ) -> bool {
1752        // Task completion is a 4 step process:
1753        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1754        //    aggregation number of the task and the new children.
1755        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1756        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1757        // 4. Shrink the task memory to reduce footprint of the task.
1758
1759        // Due to persistence it is possible that the process is cancelled after any step. This is
1760        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1761        // in-memory representation.
1762
1763        // The task might be invalidated during this process, so we need to check the stale flag
1764        // at the start of every step.
1765
1766        let span = tracing::trace_span!("task execution completed", immutable = Empty).entered();
1767        let mut ctx = self.execute_context(turbo_tasks);
1768
1769        let Some(TaskExecutionCompletePrepareResult {
1770            new_children,
1771            mut removed_data,
1772            is_now_immutable,
1773            #[cfg(feature = "verify_determinism")]
1774            no_output_set,
1775            new_output,
1776            output_dependent_tasks,
1777        }) = self.task_execution_completed_prepare(
1778            &mut ctx,
1779            &span,
1780            task_id,
1781            result,
1782            cell_counters,
1783            stateful,
1784            has_invalidator,
1785        )
1786        else {
1787            // Task was stale and has been rescheduled
1788            return true;
1789        };
1790
1791        // When restoring from filesystem cache the following might not be executed (since we can
1792        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1793        // would be executed again.
1794
1795        if !output_dependent_tasks.is_empty() {
1796            self.task_execution_completed_invalidate_output_dependent(
1797                &mut ctx,
1798                task_id,
1799                output_dependent_tasks,
1800            );
1801        }
1802
1803        let has_new_children = !new_children.is_empty();
1804
1805        if has_new_children {
1806            self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1807        }
1808
1809        if has_new_children
1810            && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1811        {
1812            // Task was stale and has been rescheduled
1813            return true;
1814        }
1815
1816        if self.task_execution_completed_finish(
1817            &mut ctx,
1818            task_id,
1819            #[cfg(feature = "verify_determinism")]
1820            no_output_set,
1821            new_output,
1822            &mut removed_data,
1823            is_now_immutable,
1824        ) {
1825            // Task was stale and has been rescheduled
1826            return true;
1827        }
1828
1829        drop(removed_data);
1830
1831        self.task_execution_completed_cleanup(&mut ctx, task_id);
1832
1833        false
1834    }
1835
1836    fn task_execution_completed_prepare(
1837        &self,
1838        ctx: &mut impl ExecuteContext<'_>,
1839        span: &Span,
1840        task_id: TaskId,
1841        result: Result<RawVc, TurboTasksExecutionError>,
1842        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1843        stateful: bool,
1844        has_invalidator: bool,
1845    ) -> Option<TaskExecutionCompletePrepareResult> {
1846        let mut task = ctx.task(task_id, TaskDataCategory::All);
1847        let Some(in_progress) = get_mut!(task, InProgress) else {
1848            panic!("Task execution completed, but task is not in progress: {task:#?}");
1849        };
1850        if matches!(in_progress, InProgressState::Canceled) {
1851            return Some(TaskExecutionCompletePrepareResult {
1852                new_children: Default::default(),
1853                removed_data: Default::default(),
1854                is_now_immutable: false,
1855                #[cfg(feature = "verify_determinism")]
1856                no_output_set: false,
1857                new_output: None,
1858                output_dependent_tasks: Default::default(),
1859            });
1860        }
1861        let &mut InProgressState::InProgress(box InProgressStateInner {
1862            stale,
1863            ref mut new_children,
1864            session_dependent,
1865            ..
1866        }) = in_progress
1867        else {
1868            panic!("Task execution completed, but task is not in progress: {task:#?}");
1869        };
1870
1871        // If the task is stale, reschedule it
1872        #[cfg(not(feature = "no_fast_stale"))]
1873        if stale {
1874            let Some(InProgressState::InProgress(box InProgressStateInner {
1875                done_event,
1876                mut new_children,
1877                ..
1878            })) = remove!(task, InProgress)
1879            else {
1880                unreachable!();
1881            };
1882            task.add_new(CachedDataItem::InProgress {
1883                value: InProgressState::Scheduled {
1884                    done_event,
1885                    reason: TaskExecutionReason::Stale,
1886                },
1887            });
1888            // Remove old children from new_children to leave only the children that had their
1889            // active count increased
1890            for task in iter_many!(task, Child { task } => task) {
1891                new_children.remove(&task);
1892            }
1893            drop(task);
1894
1895            // We need to undo the active count increase for the children since we throw away the
1896            // new_children list now.
1897            AggregationUpdateQueue::run(
1898                AggregationUpdateJob::DecreaseActiveCounts {
1899                    task_ids: new_children.into_iter().collect(),
1900                },
1901                ctx,
1902            );
1903            return None;
1904        }
1905
1906        // take the children from the task to process them
1907        let mut new_children = take(new_children);
1908
1909        // handle stateful
1910        if stateful {
1911            let _ = task.add(CachedDataItem::Stateful { value: () });
1912        }
1913
1914        // handle has_invalidator
1915        if has_invalidator {
1916            let _ = task.add(CachedDataItem::HasInvalidator { value: () });
1917        }
1918
1919        // handle cell counters: update max index and remove cells that are no longer used
1920        let old_counters: FxHashMap<_, _> =
1921            get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1922        let mut counters_to_remove = old_counters.clone();
1923        for (&cell_type, &max_index) in cell_counters.iter() {
1924            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1925                if old_max_index != max_index {
1926                    task.insert(CachedDataItem::CellTypeMaxIndex {
1927                        cell_type,
1928                        value: max_index,
1929                    });
1930                }
1931            } else {
1932                task.add_new(CachedDataItem::CellTypeMaxIndex {
1933                    cell_type,
1934                    value: max_index,
1935                });
1936            }
1937        }
1938        for (cell_type, _) in counters_to_remove {
1939            task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1940        }
1941
1942        let mut queue = AggregationUpdateQueue::new();
1943
1944        let mut removed_data = Vec::new();
1945        let mut old_edges = Vec::new();
1946
1947        let has_children = !new_children.is_empty();
1948        let is_immutable = task.is_immutable();
1949        let task_dependencies_for_immutable =
1950            // Task was previously marked as immutable
1951            if !is_immutable
1952            // Task is not session dependent (session dependent tasks can change between sessions)
1953            && !session_dependent
1954            // Task has no invalidator
1955            && !task.has_key(&CachedDataItemKey::HasInvalidator {})
1956            // Task has no dependencies on collectibles
1957            && count!(task, CollectiblesDependency) == 0
1958        {
1959            Some(
1960                // Collect all dependencies on tasks to check if all dependencies are immutable
1961                iter_many!(task, OutputDependency { target } => target)
1962                    .chain(iter_many!(task, CellDependency { target } => target.task))
1963                    .collect::<FxHashSet<_>>(),
1964            )
1965        } else {
1966            None
1967        };
1968
1969        if has_children {
1970            // Prepare all new children
1971            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1972
1973            // Filter actual new children
1974            old_edges.extend(
1975                iter_many!(task, Child { task } => task)
1976                    .filter(|task| !new_children.remove(task))
1977                    .map(OutdatedEdge::Child),
1978            );
1979        } else {
1980            old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
1981        }
1982
1983        // Remove no longer existing cells and
1984        // find all outdated data items (removed cells, outdated edges)
1985        // Note: For persistent tasks we only want to call extract_if when there are actual cells to
1986        // remove to avoid tracking that as modification.
1987        if task_id.is_transient() || iter_many!(task, CellData { cell }
1988            if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
1989        ).count() > 0 {
1990            removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
1991                matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
1992                            .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
1993            }));
1994        }
1995
1996        old_edges.extend(
1997            task.iter(CachedDataItemType::OutdatedCollectible)
1998                .filter_map(|(key, value)| match (key, value) {
1999                    (
2000                        CachedDataItemKey::OutdatedCollectible { collectible },
2001                        CachedDataItemValueRef::OutdatedCollectible { value },
2002                    ) => Some(OutdatedEdge::Collectible(collectible, *value)),
2003                    _ => None,
2004                }),
2005        );
2006
2007        if self.should_track_dependencies() {
2008            old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
2009            old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
2010            old_edges.extend(
2011                iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
2012                    |(cell, task)| {
2013                        if cell_counters
2014                            .get(&cell.type_id)
2015                            .is_none_or(|start_index| cell.index >= *start_index)
2016                            && let Some(old_counter) = old_counters.get(&cell.type_id)
2017                            && cell.index < *old_counter
2018                        {
2019                            return Some(OutdatedEdge::RemovedCellDependent {
2020                                task_id: task,
2021                                #[cfg(feature = "trace_task_dirty")]
2022                                value_type_id: cell.type_id,
2023                            });
2024                        }
2025                        None
2026                    },
2027                ),
2028            );
2029        }
2030
2031        // Check if output need to be updated
2032        let current_output = get!(task, Output);
2033        #[cfg(feature = "verify_determinism")]
2034        let no_output_set = current_output.is_none();
2035        let new_output = match result {
2036            Ok(RawVc::TaskOutput(output_task_id)) => {
2037                if let Some(OutputValue::Output(current_task_id)) = current_output
2038                    && *current_task_id == output_task_id
2039                {
2040                    None
2041                } else {
2042                    Some(OutputValue::Output(output_task_id))
2043                }
2044            }
2045            Ok(RawVc::TaskCell(output_task_id, cell)) => {
2046                if let Some(OutputValue::Cell(CellRef {
2047                    task: current_task_id,
2048                    cell: current_cell,
2049                })) = current_output
2050                    && *current_task_id == output_task_id
2051                    && *current_cell == cell
2052                {
2053                    None
2054                } else {
2055                    Some(OutputValue::Cell(CellRef {
2056                        task: output_task_id,
2057                        cell,
2058                    }))
2059                }
2060            }
2061            Ok(RawVc::LocalOutput(..)) => {
2062                panic!("Non-local tasks must not return a local Vc");
2063            }
2064            Err(err) => {
2065                if let Some(OutputValue::Error(old_error)) = current_output
2066                    && old_error == &err
2067                {
2068                    None
2069                } else {
2070                    Some(OutputValue::Error(err))
2071                }
2072            }
2073        };
2074        let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2075        // When output has changed, grab the dependent tasks
2076        if new_output.is_some() && ctx.should_track_dependencies() {
2077            output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
2078        }
2079
2080        drop(task);
2081
2082        // Check if the task can be marked as immutable
2083        let mut is_now_immutable = false;
2084        if let Some(dependencies) = task_dependencies_for_immutable
2085            && dependencies
2086                .iter()
2087                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).is_immutable())
2088        {
2089            is_now_immutable = true;
2090        }
2091        span.record("immutable", is_immutable || is_now_immutable);
2092
2093        if !queue.is_empty() || !old_edges.is_empty() {
2094            #[cfg(feature = "trace_task_completion")]
2095            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2096            // Remove outdated edges first, before removing in_progress+dirty flag.
2097            // We need to make sure all outdated edges are removed before the task can potentially
2098            // be scheduled and executed again
2099            CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2100        }
2101
2102        Some(TaskExecutionCompletePrepareResult {
2103            new_children,
2104            removed_data,
2105            is_now_immutable,
2106            #[cfg(feature = "verify_determinism")]
2107            no_output_set,
2108            new_output,
2109            output_dependent_tasks,
2110        })
2111    }
2112
2113    fn task_execution_completed_invalidate_output_dependent(
2114        &self,
2115        ctx: &mut impl ExecuteContext<'_>,
2116        task_id: TaskId,
2117        output_dependent_tasks: SmallVec<[TaskId; 4]>,
2118    ) {
2119        debug_assert!(!output_dependent_tasks.is_empty());
2120
2121        let mut queue = AggregationUpdateQueue::new();
2122        for dependent_task_id in output_dependent_tasks {
2123            if ctx.is_once_task(dependent_task_id) {
2124                // once tasks are never invalidated
2125                continue;
2126            }
2127            let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2128            if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
2129                // output dependency is outdated, so it hasn't read the output yet
2130                // and doesn't need to be invalidated
2131                continue;
2132            }
2133            if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
2134                // output dependency has been removed, so the task doesn't depend on the
2135                // output anymore and doesn't need to be invalidated
2136                continue;
2137            }
2138            make_task_dirty_internal(
2139                dependent,
2140                dependent_task_id,
2141                true,
2142                #[cfg(feature = "trace_task_dirty")]
2143                TaskDirtyCause::OutputChange { task_id },
2144                &mut queue,
2145                ctx,
2146            );
2147        }
2148
2149        queue.execute(ctx);
2150    }
2151
2152    fn task_execution_completed_unfinished_children_dirty(
2153        &self,
2154        ctx: &mut impl ExecuteContext<'_>,
2155        new_children: &FxHashSet<TaskId>,
2156    ) {
2157        debug_assert!(!new_children.is_empty());
2158
2159        let mut queue = AggregationUpdateQueue::new();
2160        for &child_id in new_children {
2161            let child_task = ctx.task(child_id, TaskDataCategory::Meta);
2162            if !child_task.has_key(&CachedDataItemKey::Output {}) {
2163                make_task_dirty_internal(
2164                    child_task,
2165                    child_id,
2166                    false,
2167                    #[cfg(feature = "trace_task_dirty")]
2168                    TaskDirtyCause::InitialDirty,
2169                    &mut queue,
2170                    ctx,
2171                );
2172            }
2173        }
2174
2175        queue.execute(ctx);
2176    }
2177
2178    fn task_execution_completed_connect(
2179        &self,
2180        ctx: &mut impl ExecuteContext<'_>,
2181        task_id: TaskId,
2182        new_children: FxHashSet<TaskId>,
2183    ) -> bool {
2184        debug_assert!(!new_children.is_empty());
2185
2186        let mut task = ctx.task(task_id, TaskDataCategory::All);
2187        let Some(in_progress) = get!(task, InProgress) else {
2188            panic!("Task execution completed, but task is not in progress: {task:#?}");
2189        };
2190        if matches!(in_progress, InProgressState::Canceled) {
2191            // Task was canceled in the meantime, so we don't connect the children
2192            return false;
2193        }
2194        let InProgressState::InProgress(box InProgressStateInner {
2195            #[cfg(not(feature = "no_fast_stale"))]
2196            stale,
2197            ..
2198        }) = in_progress
2199        else {
2200            panic!("Task execution completed, but task is not in progress: {task:#?}");
2201        };
2202
2203        // If the task is stale, reschedule it
2204        #[cfg(not(feature = "no_fast_stale"))]
2205        if *stale {
2206            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2207                remove!(task, InProgress)
2208            else {
2209                unreachable!();
2210            };
2211            task.add_new(CachedDataItem::InProgress {
2212                value: InProgressState::Scheduled {
2213                    done_event,
2214                    reason: TaskExecutionReason::Stale,
2215                },
2216            });
2217            drop(task);
2218
2219            // All `new_children` are currently hold active with an active count and we need to undo
2220            // that. (We already filtered out the old children from that list)
2221            AggregationUpdateQueue::run(
2222                AggregationUpdateJob::DecreaseActiveCounts {
2223                    task_ids: new_children.into_iter().collect(),
2224                },
2225                ctx,
2226            );
2227            return true;
2228        }
2229
2230        let has_active_count = ctx.should_track_activeness()
2231            && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
2232        connect_children(
2233            ctx,
2234            task_id,
2235            task,
2236            new_children,
2237            has_active_count,
2238            ctx.should_track_activeness(),
2239        );
2240
2241        false
2242    }
2243
2244    fn task_execution_completed_finish(
2245        &self,
2246        ctx: &mut impl ExecuteContext<'_>,
2247        task_id: TaskId,
2248        #[cfg(feature = "verify_determinism")] no_output_set: bool,
2249        new_output: Option<OutputValue>,
2250        removed_data: &mut Vec<CachedDataItem>,
2251        is_now_immutable: bool,
2252    ) -> bool {
2253        let mut task = ctx.task(task_id, TaskDataCategory::All);
2254        let Some(in_progress) = remove!(task, InProgress) else {
2255            panic!("Task execution completed, but task is not in progress: {task:#?}");
2256        };
2257        if matches!(in_progress, InProgressState::Canceled) {
2258            // Task was canceled in the meantime, so we don't finish it
2259            return false;
2260        }
2261        let InProgressState::InProgress(box InProgressStateInner {
2262            done_event,
2263            once_task: _,
2264            stale,
2265            session_dependent,
2266            marked_as_completed: _,
2267            new_children,
2268        }) = in_progress
2269        else {
2270            panic!("Task execution completed, but task is not in progress: {task:#?}");
2271        };
2272        debug_assert!(new_children.is_empty());
2273
2274        // If the task is stale, reschedule it
2275        if stale {
2276            task.add_new(CachedDataItem::InProgress {
2277                value: InProgressState::Scheduled {
2278                    done_event,
2279                    reason: TaskExecutionReason::Stale,
2280                },
2281            });
2282            return true;
2283        }
2284
2285        // Set the output if it has changed
2286        let mut old_content = None;
2287        if let Some(value) = new_output {
2288            old_content = task.insert(CachedDataItem::Output { value });
2289        }
2290
2291        // If the task is not stateful and has no mutable children, it does not have a way to be
2292        // invalidated and we can mark it as immutable.
2293        if is_now_immutable {
2294            let _ = task.add(CachedDataItem::Immutable { value: () });
2295        }
2296
2297        // Notify in progress cells
2298        removed_data.extend(task.extract_if(
2299            CachedDataItemType::InProgressCell,
2300            |key, value| match (key, value) {
2301                (
2302                    CachedDataItemKey::InProgressCell { .. },
2303                    CachedDataItemValueRef::InProgressCell { value },
2304                ) => {
2305                    value.event.notify(usize::MAX);
2306                    true
2307                }
2308                _ => false,
2309            },
2310        ));
2311
2312        // Update the dirty state
2313        let old_dirty_state = get!(task, Dirty).copied();
2314
2315        let new_dirty_state = if session_dependent {
2316            Some(DirtyState {
2317                clean_in_session: Some(self.session_id),
2318            })
2319        } else {
2320            None
2321        };
2322
2323        let dirty_changed = old_dirty_state != new_dirty_state;
2324        let data_update = if dirty_changed {
2325            if let Some(new_dirty_state) = new_dirty_state {
2326                task.insert(CachedDataItem::Dirty {
2327                    value: new_dirty_state,
2328                });
2329            } else {
2330                task.remove(&CachedDataItemKey::Dirty {});
2331            }
2332
2333            if old_dirty_state.is_some() || new_dirty_state.is_some() {
2334                let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
2335                    .cloned()
2336                    .unwrap_or_default();
2337                if let Some(old_dirty_state) = old_dirty_state {
2338                    dirty_containers.update_with_dirty_state(&old_dirty_state);
2339                }
2340                let aggregated_update = match (old_dirty_state, new_dirty_state) {
2341                    (None, None) => unreachable!(),
2342                    (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
2343                    (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
2344                    (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
2345                };
2346                if !aggregated_update.is_zero() {
2347                    if aggregated_update.get(self.session_id) < 0
2348                        && let Some(activeness_state) = get_mut!(task, Activeness)
2349                    {
2350                        activeness_state.all_clean_event.notify(usize::MAX);
2351                        activeness_state.unset_active_until_clean();
2352                        if activeness_state.is_empty() {
2353                            task.remove(&CachedDataItemKey::Activeness {});
2354                        }
2355                    }
2356                    AggregationUpdateJob::data_update(
2357                        &mut task,
2358                        AggregatedDataUpdate::new()
2359                            .dirty_container_update(task_id, aggregated_update),
2360                    )
2361                } else {
2362                    None
2363                }
2364            } else {
2365                None
2366            }
2367        } else {
2368            None
2369        };
2370
2371        #[cfg(feature = "verify_determinism")]
2372        let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2373        #[cfg(not(feature = "verify_determinism"))]
2374        let reschedule = false;
2375        if reschedule {
2376            task.add_new(CachedDataItem::InProgress {
2377                value: InProgressState::Scheduled {
2378                    done_event,
2379                    reason: TaskExecutionReason::Stale,
2380                },
2381            });
2382            drop(task);
2383        } else {
2384            drop(task);
2385
2386            // Notify dependent tasks that are waiting for this task to finish
2387            done_event.notify(usize::MAX);
2388        }
2389
2390        drop(old_content);
2391
2392        if let Some(data_update) = data_update {
2393            AggregationUpdateQueue::run(data_update, ctx);
2394        }
2395
2396        reschedule
2397    }
2398
2399    fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
2400        let mut task = ctx.task(task_id, TaskDataCategory::All);
2401        task.shrink_to_fit(CachedDataItemType::CellData);
2402        task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
2403        task.shrink_to_fit(CachedDataItemType::CellDependency);
2404        task.shrink_to_fit(CachedDataItemType::OutputDependency);
2405        task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
2406        drop(task);
2407    }
2408
2409    fn run_backend_job<'a>(
2410        self: &'a Arc<Self>,
2411        job: TurboTasksBackendJob,
2412        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2413    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2414        Box::pin(async move {
2415            match job {
2416                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2417                    debug_assert!(self.should_persist());
2418
2419                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2420                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2421                    loop {
2422                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2423                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2424                        const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
2425
2426                        let (time, mut reason) =
2427                            if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2428                                (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2429                            } else {
2430                                (SNAPSHOT_INTERVAL, "regular snapshot interval")
2431                            };
2432
2433                        let until = last_snapshot + time;
2434                        if until > Instant::now() {
2435                            let mut stop_listener = self.stopping_event.listen();
2436                            if self.stopping.load(Ordering::Acquire) {
2437                                return;
2438                            }
2439                            let mut idle_start_listener = self.idle_start_event.listen();
2440                            let mut idle_end_listener = self.idle_end_event.listen();
2441                            let mut idle_time = if turbo_tasks.is_idle() {
2442                                Instant::now() + IDLE_TIMEOUT
2443                            } else {
2444                                far_future()
2445                            };
2446                            loop {
2447                                tokio::select! {
2448                                    _ = &mut stop_listener => {
2449                                        return;
2450                                    },
2451                                    _ = &mut idle_start_listener => {
2452                                        idle_time = Instant::now() + IDLE_TIMEOUT;
2453                                        idle_start_listener = self.idle_start_event.listen()
2454                                    },
2455                                    _ = &mut idle_end_listener => {
2456                                        idle_time = until + IDLE_TIMEOUT;
2457                                        idle_end_listener = self.idle_end_event.listen()
2458                                    },
2459                                    _ = tokio::time::sleep_until(until) => {
2460                                        break;
2461                                    },
2462                                    _ = tokio::time::sleep_until(idle_time) => {
2463                                        if turbo_tasks.is_idle() {
2464                                            reason = "idle timeout";
2465                                            break;
2466                                        }
2467                                    },
2468                                }
2469                            }
2470                        }
2471
2472                        let _span = info_span!("persist", reason = reason).entered();
2473                        let this = self.clone();
2474                        let snapshot = this.snapshot();
2475                        if let Some((snapshot_start, new_data)) = snapshot {
2476                            last_snapshot = snapshot_start;
2477                            if new_data {
2478                                continue;
2479                            }
2480                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2481                            self.last_snapshot.store(
2482                                last_snapshot.as_millis().try_into().unwrap(),
2483                                Ordering::Relaxed,
2484                            );
2485
2486                            let _span = Span::none().entered();
2487                            turbo_tasks.schedule_backend_background_job(
2488                                TurboTasksBackendJob::FollowUpSnapshot,
2489                            );
2490                            return;
2491                        }
2492                    }
2493                }
2494                TurboTasksBackendJob::Prefetch { data, range } => {
2495                    let range = if let Some(range) = range {
2496                        range
2497                    } else {
2498                        if data.len() > 128 {
2499                            let chunk_size = good_chunk_size(data.len());
2500                            let chunks = data.len().div_ceil(chunk_size);
2501                            for i in 0..chunks {
2502                                turbo_tasks.schedule_backend_background_job(
2503                                    TurboTasksBackendJob::Prefetch {
2504                                        data: data.clone(),
2505                                        range: Some(
2506                                            (i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2507                                        ),
2508                                    },
2509                                );
2510                            }
2511                            return;
2512                        }
2513                        0..data.len()
2514                    };
2515
2516                    let _span = trace_span!("prefetching").entered();
2517                    let mut ctx = self.execute_context(turbo_tasks);
2518                    for i in range {
2519                        let (&task, &with_data) = data.get_index(i).unwrap();
2520                        let category = if with_data {
2521                            TaskDataCategory::All
2522                        } else {
2523                            TaskDataCategory::Meta
2524                        };
2525                        // Prefetch the task
2526                        drop(ctx.task(task, category));
2527                    }
2528                }
2529            }
2530        })
2531    }
2532
2533    fn try_read_own_task_cell(
2534        &self,
2535        task_id: TaskId,
2536        cell: CellId,
2537        _options: ReadCellOptions,
2538        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2539    ) -> Result<TypedCellContent> {
2540        let mut ctx = self.execute_context(turbo_tasks);
2541        let task = ctx.task(task_id, TaskDataCategory::Data);
2542        if let Some(content) = get!(task, CellData { cell }) {
2543            Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
2544        } else {
2545            Ok(CellContent(None).into_typed(cell.type_id))
2546        }
2547    }
2548
2549    fn read_task_collectibles(
2550        &self,
2551        task_id: TaskId,
2552        collectible_type: TraitTypeId,
2553        reader_id: Option<TaskId>,
2554        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2555    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2556        let mut ctx = self.execute_context(turbo_tasks);
2557        let mut collectibles = AutoMap::default();
2558        {
2559            let mut task = ctx.task(task_id, TaskDataCategory::All);
2560            // Ensure it's an root node
2561            loop {
2562                let aggregation_number = get_aggregation_number(&task);
2563                if is_root_node(aggregation_number) {
2564                    break;
2565                }
2566                drop(task);
2567                AggregationUpdateQueue::run(
2568                    AggregationUpdateJob::UpdateAggregationNumber {
2569                        task_id,
2570                        base_aggregation_number: u32::MAX,
2571                        distance: None,
2572                    },
2573                    &mut ctx,
2574                );
2575                task = ctx.task(task_id, TaskDataCategory::All);
2576            }
2577            for collectible in iter_many!(
2578                task,
2579                AggregatedCollectible {
2580                    collectible
2581                } count if collectible.collectible_type == collectible_type && *count > 0 => {
2582                    collectible.cell
2583                }
2584            ) {
2585                *collectibles
2586                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2587                    .or_insert(0) += 1;
2588            }
2589            for (collectible, count) in iter_many!(
2590                task,
2591                Collectible {
2592                    collectible
2593                } count if collectible.collectible_type == collectible_type => {
2594                    (collectible.cell, *count)
2595                }
2596            ) {
2597                *collectibles
2598                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2599                    .or_insert(0) += count;
2600            }
2601            if let Some(reader_id) = reader_id {
2602                let _ = task.add(CachedDataItem::CollectiblesDependent {
2603                    collectible_type,
2604                    task: reader_id,
2605                    value: (),
2606                });
2607            }
2608        }
2609        if let Some(reader_id) = reader_id {
2610            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2611            let target = CollectiblesRef {
2612                task: task_id,
2613                collectible_type,
2614            };
2615            if reader
2616                .remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target })
2617                .is_none()
2618            {
2619                let _ = reader.add(CachedDataItem::CollectiblesDependency { target, value: () });
2620            }
2621        }
2622        collectibles
2623    }
2624
2625    fn emit_collectible(
2626        &self,
2627        collectible_type: TraitTypeId,
2628        collectible: RawVc,
2629        task_id: TaskId,
2630        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2631    ) {
2632        self.assert_valid_collectible(task_id, collectible);
2633
2634        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2635            panic!("Collectibles need to be resolved");
2636        };
2637        let cell = CellRef {
2638            task: collectible_task,
2639            cell,
2640        };
2641        operation::UpdateCollectibleOperation::run(
2642            task_id,
2643            CollectibleRef {
2644                collectible_type,
2645                cell,
2646            },
2647            1,
2648            self.execute_context(turbo_tasks),
2649        );
2650    }
2651
2652    fn unemit_collectible(
2653        &self,
2654        collectible_type: TraitTypeId,
2655        collectible: RawVc,
2656        count: u32,
2657        task_id: TaskId,
2658        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2659    ) {
2660        self.assert_valid_collectible(task_id, collectible);
2661
2662        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2663            panic!("Collectibles need to be resolved");
2664        };
2665        let cell = CellRef {
2666            task: collectible_task,
2667            cell,
2668        };
2669        operation::UpdateCollectibleOperation::run(
2670            task_id,
2671            CollectibleRef {
2672                collectible_type,
2673                cell,
2674            },
2675            -(i32::try_from(count).unwrap()),
2676            self.execute_context(turbo_tasks),
2677        );
2678    }
2679
2680    fn update_task_cell(
2681        &self,
2682        task_id: TaskId,
2683        cell: CellId,
2684        content: CellContent,
2685        verification_mode: VerificationMode,
2686        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2687    ) {
2688        operation::UpdateCellOperation::run(
2689            task_id,
2690            cell,
2691            content,
2692            verification_mode,
2693            self.execute_context(turbo_tasks),
2694        );
2695    }
2696
2697    fn mark_own_task_as_session_dependent(
2698        &self,
2699        task_id: TaskId,
2700        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2701    ) {
2702        if !self.should_track_dependencies() {
2703            // Without dependency tracking we don't need session dependent tasks
2704            return;
2705        }
2706        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2707        let mut ctx = self.execute_context(turbo_tasks);
2708        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2709        let aggregation_number = get_aggregation_number(&task);
2710        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2711            drop(task);
2712            // We want to use a high aggregation number to avoid large aggregation chains for
2713            // session dependent tasks (which change on every run)
2714            AggregationUpdateQueue::run(
2715                AggregationUpdateJob::UpdateAggregationNumber {
2716                    task_id,
2717                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2718                    distance: None,
2719                },
2720                &mut ctx,
2721            );
2722            task = ctx.task(task_id, TaskDataCategory::Meta);
2723        }
2724        if let Some(InProgressState::InProgress(box InProgressStateInner {
2725            session_dependent,
2726            ..
2727        })) = get_mut!(task, InProgress)
2728        {
2729            *session_dependent = true;
2730        }
2731    }
2732
2733    fn mark_own_task_as_finished(
2734        &self,
2735        task: TaskId,
2736        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2737    ) {
2738        let mut ctx = self.execute_context(turbo_tasks);
2739        let mut task = ctx.task(task, TaskDataCategory::Data);
2740        if let Some(InProgressState::InProgress(box InProgressStateInner {
2741            marked_as_completed,
2742            ..
2743        })) = get_mut!(task, InProgress)
2744        {
2745            *marked_as_completed = true;
2746            // TODO this should remove the dirty state (also check session_dependent)
2747            // but this would break some assumptions for strongly consistent reads.
2748            // Client tasks are not connected yet, so we wouldn't wait for them.
2749            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
2750        }
2751    }
2752
2753    fn set_own_task_aggregation_number(
2754        &self,
2755        task: TaskId,
2756        aggregation_number: u32,
2757        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2758    ) {
2759        let mut ctx = self.execute_context(turbo_tasks);
2760        AggregationUpdateQueue::run(
2761            AggregationUpdateJob::UpdateAggregationNumber {
2762                task_id: task,
2763                base_aggregation_number: aggregation_number,
2764                distance: None,
2765            },
2766            &mut ctx,
2767        );
2768    }
2769
2770    fn connect_task(
2771        &self,
2772        task: TaskId,
2773        parent_task: Option<TaskId>,
2774        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2775    ) {
2776        self.assert_not_persistent_calling_transient(parent_task, task, None);
2777        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2778    }
2779
2780    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2781        let task_id = self.transient_task_id_factory.get();
2782        let root_type = match task_type {
2783            TransientTaskType::Root(_) => RootType::RootTask,
2784            TransientTaskType::Once(_) => RootType::OnceTask,
2785        };
2786        self.transient_tasks.insert(
2787            task_id,
2788            Arc::new(match task_type {
2789                TransientTaskType::Root(f) => TransientTask::Root(f),
2790                TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2791            }),
2792        );
2793        {
2794            let mut task = self.storage.access_mut(task_id);
2795            task.add(CachedDataItem::AggregationNumber {
2796                value: AggregationNumber {
2797                    base: u32::MAX,
2798                    distance: 0,
2799                    effective: u32::MAX,
2800                },
2801            });
2802            if self.should_track_activeness() {
2803                task.add(CachedDataItem::Activeness {
2804                    value: ActivenessState::new_root(root_type, task_id),
2805                });
2806            }
2807            task.add(CachedDataItem::new_scheduled(
2808                TaskExecutionReason::Initial,
2809                move || {
2810                    move || match root_type {
2811                        RootType::RootTask => "Root Task".to_string(),
2812                        RootType::OnceTask => "Once Task".to_string(),
2813                    }
2814                },
2815            ));
2816        }
2817        #[cfg(feature = "verify_aggregation_graph")]
2818        self.root_tasks.lock().insert(task_id);
2819        task_id
2820    }
2821
2822    fn dispose_root_task(
2823        &self,
2824        task_id: TaskId,
2825        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2826    ) {
2827        #[cfg(feature = "verify_aggregation_graph")]
2828        self.root_tasks.lock().remove(&task_id);
2829
2830        let mut ctx = self.execute_context(turbo_tasks);
2831        let mut task = ctx.task(task_id, TaskDataCategory::All);
2832        let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2833        let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2834            .map_or(false, |dirty_containers| {
2835                dirty_containers.get(self.session_id) > 0
2836            });
2837        if is_dirty || has_dirty_containers {
2838            if let Some(activeness_state) = get_mut!(task, Activeness) {
2839                // We will finish the task, but it would be removed after the task is done
2840                activeness_state.unset_root_type();
2841                activeness_state.set_active_until_clean();
2842            };
2843        } else if let Some(activeness_state) = remove!(task, Activeness) {
2844            // Technically nobody should be listening to this event, but just in case
2845            // we notify it anyway
2846            activeness_state.all_clean_event.notify(usize::MAX);
2847        }
2848    }
2849
2850    #[cfg(feature = "verify_aggregation_graph")]
2851    fn verify_aggregation_graph(
2852        &self,
2853        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2854        idle: bool,
2855    ) {
2856        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2857            return;
2858        }
2859        use std::{collections::VecDeque, env, io::stdout};
2860
2861        use crate::backend::operation::{get_uppers, is_aggregating_node};
2862
2863        let mut ctx = self.execute_context(turbo_tasks);
2864        let root_tasks = self.root_tasks.lock().clone();
2865
2866        for task_id in root_tasks.into_iter() {
2867            let mut queue = VecDeque::new();
2868            let mut visited = FxHashSet::default();
2869            let mut aggregated_nodes = FxHashSet::default();
2870            let mut collectibles = FxHashMap::default();
2871            let root_task_id = task_id;
2872            visited.insert(task_id);
2873            aggregated_nodes.insert(task_id);
2874            queue.push_back(task_id);
2875            let mut counter = 0;
2876            while let Some(task_id) = queue.pop_front() {
2877                counter += 1;
2878                if counter % 100000 == 0 {
2879                    println!(
2880                        "queue={}, visited={}, aggregated_nodes={}",
2881                        queue.len(),
2882                        visited.len(),
2883                        aggregated_nodes.len()
2884                    );
2885                }
2886                let task = ctx.task(task_id, TaskDataCategory::All);
2887                if idle && !self.is_idle.load(Ordering::Relaxed) {
2888                    return;
2889                }
2890
2891                let uppers = get_uppers(&task);
2892                if task_id != root_task_id
2893                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2894                {
2895                    panic!(
2896                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2897                         {:?})",
2898                        task_id,
2899                        ctx.get_task_description(task_id),
2900                        uppers
2901                    );
2902                }
2903
2904                let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2905                for collectible in aggregated_collectibles {
2906                    collectibles
2907                        .entry(collectible)
2908                        .or_insert_with(|| (false, Vec::new()))
2909                        .1
2910                        .push(task_id);
2911                }
2912
2913                let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2914                for collectible in own_collectibles {
2915                    if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2916                        *flag = true
2917                    } else {
2918                        panic!(
2919                            "Task {} has a collectible {:?} that is not in any upper task",
2920                            task_id, collectible
2921                        );
2922                    }
2923                }
2924
2925                let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2926                let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2927                    .is_some_and(|count| count.get(self.session_id) > 0);
2928                let should_be_in_upper = is_dirty || has_dirty_container;
2929
2930                let aggregation_number = get_aggregation_number(&task);
2931                if is_aggregating_node(aggregation_number) {
2932                    aggregated_nodes.insert(task_id);
2933                }
2934                // println!(
2935                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
2936                //     ctx.get_task_description(task_id),
2937                //     uppers
2938                // );
2939
2940                for child_id in iter_many!(task, Child { task } => task) {
2941                    // println!("{task_id}: child -> {child_id}");
2942                    if visited.insert(child_id) {
2943                        queue.push_back(child_id);
2944                    }
2945                }
2946                drop(task);
2947
2948                if should_be_in_upper {
2949                    for upper_id in uppers {
2950                        let task = ctx.task(task_id, TaskDataCategory::All);
2951                        let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2952                            .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2953                        if !in_upper {
2954                            panic!(
2955                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
2956                                 ({})",
2957                                task_id,
2958                                ctx.get_task_description(task_id),
2959                                upper_id,
2960                                ctx.get_task_description(upper_id)
2961                            );
2962                        }
2963                    }
2964                }
2965            }
2966
2967            for (collectible, (flag, task_ids)) in collectibles {
2968                if !flag {
2969                    use std::io::Write;
2970                    let mut stdout = stdout().lock();
2971                    writeln!(
2972                        stdout,
2973                        "{:?} that is not emitted in any child task but in these aggregated \
2974                         tasks: {:#?}",
2975                        collectible,
2976                        task_ids
2977                            .iter()
2978                            .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
2979                            .collect::<Vec<_>>()
2980                    )
2981                    .unwrap();
2982
2983                    let task_id = collectible.cell.task;
2984                    let mut queue = {
2985                        let task = ctx.task(task_id, TaskDataCategory::All);
2986                        get_uppers(&task)
2987                    };
2988                    let mut visited = FxHashSet::default();
2989                    for &upper_id in queue.iter() {
2990                        visited.insert(upper_id);
2991                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
2992                    }
2993                    while let Some(task_id) = queue.pop() {
2994                        let desc = ctx.get_task_description(task_id);
2995                        let task = ctx.task(task_id, TaskDataCategory::All);
2996                        let aggregated_collectible =
2997                            get!(task, AggregatedCollectible { collectible })
2998                                .copied()
2999                                .unwrap_or_default();
3000                        let uppers = get_uppers(&task);
3001                        drop(task);
3002                        writeln!(
3003                            stdout,
3004                            "upper {task_id} {desc} collectible={aggregated_collectible}"
3005                        )
3006                        .unwrap();
3007                        if task_ids.contains(&task_id) {
3008                            writeln!(
3009                                stdout,
3010                                "Task has an upper connection to an aggregated task that doesn't \
3011                                 reference it. Upper connection is invalid!"
3012                            )
3013                            .unwrap();
3014                        }
3015                        for upper_id in uppers {
3016                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3017                            if !visited.contains(&upper_id) {
3018                                queue.push(upper_id);
3019                            }
3020                        }
3021                    }
3022                    panic!("See stdout for more details");
3023                }
3024            }
3025        }
3026    }
3027
3028    fn assert_not_persistent_calling_transient(
3029        &self,
3030        parent_id: Option<TaskId>,
3031        child_id: TaskId,
3032        cell_id: Option<CellId>,
3033    ) {
3034        if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
3035            self.panic_persistent_calling_transient(
3036                parent_id
3037                    .and_then(|id| self.lookup_task_type(id))
3038                    .as_deref(),
3039                self.lookup_task_type(child_id).as_deref(),
3040                cell_id,
3041            );
3042        }
3043    }
3044
3045    fn panic_persistent_calling_transient(
3046        &self,
3047        parent: Option<&CachedTaskType>,
3048        child: Option<&CachedTaskType>,
3049        cell_id: Option<CellId>,
3050    ) {
3051        let transient_reason = if let Some(child) = child {
3052            Cow::Owned(format!(
3053                " The callee is transient because it depends on:\n{}",
3054                self.debug_trace_transient_task(child, cell_id),
3055            ))
3056        } else {
3057            Cow::Borrowed("")
3058        };
3059        panic!(
3060            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3061            parent.map_or("unknown", |t| t.get_name()),
3062            child.map_or("unknown", |t| t.get_name()),
3063            transient_reason,
3064        );
3065    }
3066
3067    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3068        // these checks occur in a potentially hot codepath, but they're cheap
3069        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3070            // This should never happen: The collectible APIs use ResolvedVc
3071            let task_info = if let Some(col_task_ty) = collectible
3072                .try_get_task_id()
3073                .and_then(|t| self.lookup_task_type(t))
3074            {
3075                Cow::Owned(format!(" (return type of {col_task_ty})"))
3076            } else {
3077                Cow::Borrowed("")
3078            };
3079            panic!("Collectible{task_info} must be a ResolvedVc")
3080        };
3081        if col_task_id.is_transient() && !task_id.is_transient() {
3082            let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
3083                Cow::Owned(format!(
3084                    ". The collectible is transient because it depends on:\n{}",
3085                    self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3086                ))
3087            } else {
3088                Cow::Borrowed("")
3089            };
3090            // this should never happen: How would a persistent function get a transient Vc?
3091            panic!(
3092                "Collectible is transient, transient collectibles cannot be emitted from \
3093                 persistent tasks{transient_reason}",
3094            )
3095        }
3096    }
3097}
3098
3099impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3100    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3101        self.0.startup(turbo_tasks);
3102    }
3103
3104    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3105        self.0.stopping();
3106    }
3107
3108    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3109        self.0.stop(turbo_tasks);
3110    }
3111
3112    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3113        self.0.idle_start(turbo_tasks);
3114    }
3115
3116    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3117        self.0.idle_end();
3118    }
3119
3120    fn get_or_create_persistent_task(
3121        &self,
3122        task_type: CachedTaskType,
3123        parent_task: Option<TaskId>,
3124        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3125    ) -> TaskId {
3126        self.0
3127            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3128    }
3129
3130    fn get_or_create_transient_task(
3131        &self,
3132        task_type: CachedTaskType,
3133        parent_task: Option<TaskId>,
3134        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3135    ) -> TaskId {
3136        self.0
3137            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3138    }
3139
3140    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3141        self.0.invalidate_task(task_id, turbo_tasks);
3142    }
3143
3144    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3145        self.0.invalidate_tasks(tasks, turbo_tasks);
3146    }
3147
3148    fn invalidate_tasks_set(
3149        &self,
3150        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3151        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3152    ) {
3153        self.0.invalidate_tasks_set(tasks, turbo_tasks);
3154    }
3155
3156    fn invalidate_serialization(
3157        &self,
3158        task_id: TaskId,
3159        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3160    ) {
3161        self.0.invalidate_serialization(task_id, turbo_tasks);
3162    }
3163
3164    fn get_task_description(&self, task: TaskId) -> String {
3165        self.0.get_task_description(task)
3166    }
3167
3168    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3169        self.0.task_execution_canceled(task, turbo_tasks)
3170    }
3171
3172    fn try_start_task_execution(
3173        &self,
3174        task_id: TaskId,
3175        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3176    ) -> Option<TaskExecutionSpec<'_>> {
3177        self.0.try_start_task_execution(task_id, turbo_tasks)
3178    }
3179
3180    fn task_execution_completed(
3181        &self,
3182        task_id: TaskId,
3183        result: Result<RawVc, TurboTasksExecutionError>,
3184        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3185        stateful: bool,
3186        has_invalidator: bool,
3187        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3188    ) -> bool {
3189        self.0.task_execution_completed(
3190            task_id,
3191            result,
3192            cell_counters,
3193            stateful,
3194            has_invalidator,
3195            turbo_tasks,
3196        )
3197    }
3198
3199    type BackendJob = TurboTasksBackendJob;
3200
3201    fn run_backend_job<'a>(
3202        &'a self,
3203        job: Self::BackendJob,
3204        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3205    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3206        self.0.run_backend_job(job, turbo_tasks)
3207    }
3208
3209    fn try_read_task_output(
3210        &self,
3211        task_id: TaskId,
3212        reader: Option<TaskId>,
3213        options: ReadOutputOptions,
3214        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3215    ) -> Result<Result<RawVc, EventListener>> {
3216        self.0
3217            .try_read_task_output(task_id, reader, options, turbo_tasks)
3218    }
3219
3220    fn try_read_task_cell(
3221        &self,
3222        task_id: TaskId,
3223        cell: CellId,
3224        reader: Option<TaskId>,
3225        options: ReadCellOptions,
3226        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3227    ) -> Result<Result<TypedCellContent, EventListener>> {
3228        self.0
3229            .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3230    }
3231
3232    fn try_read_own_task_cell(
3233        &self,
3234        task_id: TaskId,
3235        cell: CellId,
3236        options: ReadCellOptions,
3237        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3238    ) -> Result<TypedCellContent> {
3239        self.0
3240            .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3241    }
3242
3243    fn read_task_collectibles(
3244        &self,
3245        task_id: TaskId,
3246        collectible_type: TraitTypeId,
3247        reader: Option<TaskId>,
3248        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3249    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3250        self.0
3251            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3252    }
3253
3254    fn emit_collectible(
3255        &self,
3256        collectible_type: TraitTypeId,
3257        collectible: RawVc,
3258        task_id: TaskId,
3259        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3260    ) {
3261        self.0
3262            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3263    }
3264
3265    fn unemit_collectible(
3266        &self,
3267        collectible_type: TraitTypeId,
3268        collectible: RawVc,
3269        count: u32,
3270        task_id: TaskId,
3271        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3272    ) {
3273        self.0
3274            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3275    }
3276
3277    fn update_task_cell(
3278        &self,
3279        task_id: TaskId,
3280        cell: CellId,
3281        content: CellContent,
3282        verification_mode: VerificationMode,
3283        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3284    ) {
3285        self.0
3286            .update_task_cell(task_id, cell, content, verification_mode, turbo_tasks);
3287    }
3288
3289    fn mark_own_task_as_finished(
3290        &self,
3291        task_id: TaskId,
3292        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3293    ) {
3294        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3295    }
3296
3297    fn set_own_task_aggregation_number(
3298        &self,
3299        task: TaskId,
3300        aggregation_number: u32,
3301        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3302    ) {
3303        self.0
3304            .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3305    }
3306
3307    fn mark_own_task_as_session_dependent(
3308        &self,
3309        task: TaskId,
3310        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3311    ) {
3312        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3313    }
3314
3315    fn connect_task(
3316        &self,
3317        task: TaskId,
3318        parent_task: Option<TaskId>,
3319        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3320    ) {
3321        self.0.connect_task(task, parent_task, turbo_tasks);
3322    }
3323
3324    fn create_transient_task(
3325        &self,
3326        task_type: TransientTaskType,
3327        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3328    ) -> TaskId {
3329        self.0.create_transient_task(task_type)
3330    }
3331
3332    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3333        self.0.dispose_root_task(task_id, turbo_tasks);
3334    }
3335
3336    fn task_statistics(&self) -> &TaskStatisticsApi {
3337        &self.0.task_statistics
3338    }
3339
3340    fn is_tracking_dependencies(&self) -> bool {
3341        self.0.options.dependency_tracking
3342    }
3343}
3344
3345enum DebugTraceTransientTask {
3346    Cached {
3347        task_name: &'static str,
3348        cell_type_id: Option<ValueTypeId>,
3349        cause_self: Option<Box<DebugTraceTransientTask>>,
3350        cause_args: Vec<DebugTraceTransientTask>,
3351    },
3352    /// This representation is used when this task is a duplicate of one previously shown
3353    Collapsed {
3354        task_name: &'static str,
3355        cell_type_id: Option<ValueTypeId>,
3356    },
3357    Uncached {
3358        cell_type_id: Option<ValueTypeId>,
3359    },
3360}
3361
3362impl DebugTraceTransientTask {
3363    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3364        let indent = "    ".repeat(level);
3365        f.write_str(&indent)?;
3366
3367        fn fmt_cell_type_id(
3368            f: &mut fmt::Formatter<'_>,
3369            cell_type_id: Option<ValueTypeId>,
3370        ) -> fmt::Result {
3371            if let Some(ty) = cell_type_id {
3372                write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3373            } else {
3374                Ok(())
3375            }
3376        }
3377
3378        // write the name and type
3379        match self {
3380            Self::Cached {
3381                task_name,
3382                cell_type_id,
3383                ..
3384            }
3385            | Self::Collapsed {
3386                task_name,
3387                cell_type_id,
3388                ..
3389            } => {
3390                f.write_str(task_name)?;
3391                fmt_cell_type_id(f, *cell_type_id)?;
3392                if matches!(self, Self::Collapsed { .. }) {
3393                    f.write_str(" (collapsed)")?;
3394                }
3395            }
3396            Self::Uncached { cell_type_id } => {
3397                f.write_str("unknown transient task")?;
3398                fmt_cell_type_id(f, *cell_type_id)?;
3399            }
3400        }
3401        f.write_char('\n')?;
3402
3403        // write any extra "cause" information we might have
3404        if let Self::Cached {
3405            cause_self,
3406            cause_args,
3407            ..
3408        } = self
3409        {
3410            if let Some(c) = cause_self {
3411                writeln!(f, "{indent}  self:")?;
3412                c.fmt_indented(f, level + 1)?;
3413            }
3414            if !cause_args.is_empty() {
3415                writeln!(f, "{indent}  args:")?;
3416                for c in cause_args {
3417                    c.fmt_indented(f, level + 1)?;
3418                }
3419            }
3420        }
3421        Ok(())
3422    }
3423}
3424
3425impl fmt::Display for DebugTraceTransientTask {
3426    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3427        self.fmt_indented(f, 0)
3428    }
3429}
3430
3431// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3432fn far_future() -> Instant {
3433    // Roughly 30 years from now.
3434    // API does not provide a way to obtain max `Instant`
3435    // or convert specific date in the future to instant.
3436    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3437    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3438}