turbo_tasks_backend/backend/
mod.rs

1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6    borrow::Cow,
7    fmt::{self, Write},
8    future::Future,
9    hash::BuildHasherDefault,
10    mem::take,
11    pin::Pin,
12    sync::{
13        Arc,
14        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
15    },
16    thread::available_parallelism,
17};
18
19use anyhow::{Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use turbo_tasks::{
27    CellId, FunctionId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
28    SessionId, TRANSIENT_TASK_BIT, TaskId, TraitTypeId, TurboTasksBackendApi, ValueTypeId,
29    backend::{
30        Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
31        TransientTaskType, TurboTasksExecutionError, TypedCellContent,
32    },
33    event::{Event, EventListener},
34    registry::{self, get_value_type_global_name},
35    task_statistics::TaskStatisticsApi,
36    trace::TraceRawVcs,
37    util::IdFactoryWithReuse,
38};
39
40pub use self::{operation::AnyOperation, storage::TaskDataCategory};
41#[cfg(feature = "trace_task_dirty")]
42use crate::backend::operation::TaskDirtyCause;
43use crate::{
44    backend::{
45        operation::{
46            AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
47            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
48            Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
49            is_root_node, prepare_new_children,
50        },
51        storage::{
52            InnerStorageSnapshot, Storage, get, get_many, get_mut, get_mut_or_insert_with,
53            iter_many, remove,
54        },
55    },
56    backing_storage::BackingStorage,
57    data::{
58        ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
59        CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
60        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
61    },
62    utils::{
63        bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
64    },
65};
66
67const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
68const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) };
69
70const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
71
72struct SnapshotRequest {
73    snapshot_requested: bool,
74    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
75}
76
77impl SnapshotRequest {
78    fn new() -> Self {
79        Self {
80            snapshot_requested: false,
81            suspended_operations: FxHashSet::default(),
82        }
83    }
84}
85
86type TransientTaskOnce =
87    Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
88
89pub enum TransientTask {
90    /// A root task that will track dependencies and re-execute when
91    /// dependencies change. Task will eventually settle to the correct
92    /// execution.
93    ///
94    /// Always active. Automatically scheduled.
95    Root(TransientTaskRoot),
96
97    // TODO implement these strongly consistency
98    /// A single root task execution. It won't track dependencies.
99    /// Task will definitely include all invalidations that happened before the
100    /// start of the task. It may or may not include invalidations that
101    /// happened after that. It may see these invalidations partially
102    /// applied.
103    ///
104    /// Active until done. Automatically scheduled.
105    Once(TransientTaskOnce),
106}
107
108pub enum StorageMode {
109    /// Queries the storage for cache entries that don't exist locally.
110    ReadOnly,
111    /// Queries the storage for cache entries that don't exist locally.
112    /// Keeps a log of all changes and regularly push them to the backing storage.
113    ReadWrite,
114}
115
116pub struct BackendOptions {
117    /// Enables dependency tracking.
118    ///
119    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
120    /// forever.
121    pub dependency_tracking: bool,
122
123    /// Enables children tracking.
124    ///
125    /// When disabled: Strongly consistent reads are only eventually consistent. All tasks are
126    /// considered as active. Collectibles are disabled.
127    pub children_tracking: bool,
128
129    /// Enables active tracking.
130    ///
131    /// Automatically disabled when `dependency_tracking` is disabled.
132    ///
133    /// When disabled: All tasks are considered as active.
134    pub active_tracking: bool,
135
136    /// Enables the backing storage.
137    pub storage_mode: Option<StorageMode>,
138}
139
140impl Default for BackendOptions {
141    fn default() -> Self {
142        Self {
143            dependency_tracking: true,
144            children_tracking: true,
145            active_tracking: true,
146            storage_mode: Some(StorageMode::ReadWrite),
147        }
148    }
149}
150
151pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
152
153type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
154
155struct TurboTasksBackendInner<B: BackingStorage> {
156    options: BackendOptions,
157
158    start_time: Instant,
159    session_id: SessionId,
160
161    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
162    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
163
164    persisted_task_cache_log: Option<TaskCacheLog>,
165    task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
166    transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
167
168    storage: Storage,
169
170    /// Number of executing operations + Highest bit is set when snapshot is
171    /// requested. When that bit is set, operations should pause until the
172    /// snapshot is completed. When the bit is set and in progress counter
173    /// reaches zero, `operations_completed_when_snapshot_requested` is
174    /// triggered.
175    in_progress_operations: AtomicUsize,
176
177    snapshot_request: Mutex<SnapshotRequest>,
178    /// Condition Variable that is triggered when `in_progress_operations`
179    /// reaches zero while snapshot is requested. All operations are either
180    /// completed or suspended.
181    operations_suspended: Condvar,
182    /// Condition Variable that is triggered when a snapshot is completed and
183    /// operations can continue.
184    snapshot_completed: Condvar,
185    /// The timestamp of the last started snapshot since [`Self::start_time`].
186    last_snapshot: AtomicU64,
187
188    stopping: AtomicBool,
189    stopping_event: Event,
190    idle_start_event: Event,
191    idle_end_event: Event,
192    #[cfg(feature = "verify_aggregation_graph")]
193    is_idle: AtomicBool,
194
195    task_statistics: TaskStatisticsApi,
196
197    backing_storage: B,
198
199    #[cfg(feature = "verify_aggregation_graph")]
200    root_tasks: Mutex<FxHashSet<TaskId>>,
201}
202
203impl<B: BackingStorage> TurboTasksBackend<B> {
204    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
205        Self(Arc::new(TurboTasksBackendInner::new(
206            options,
207            backing_storage,
208        )))
209    }
210}
211
212impl<B: BackingStorage> TurboTasksBackendInner<B> {
213    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
214        let shard_amount =
215            (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
216        let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
217        if !options.dependency_tracking {
218            options.active_tracking = false;
219        }
220        Self {
221            options,
222            start_time: Instant::now(),
223            session_id: backing_storage.next_session_id(),
224            persisted_task_id_factory: IdFactoryWithReuse::new(
225                backing_storage.next_free_task_id(),
226                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
227            ),
228            transient_task_id_factory: IdFactoryWithReuse::new(
229                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
230                TaskId::MAX,
231            ),
232            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
233            task_cache: BiMap::new(),
234            transient_tasks: FxDashMap::default(),
235            storage: Storage::new(),
236            in_progress_operations: AtomicUsize::new(0),
237            snapshot_request: Mutex::new(SnapshotRequest::new()),
238            operations_suspended: Condvar::new(),
239            snapshot_completed: Condvar::new(),
240            last_snapshot: AtomicU64::new(0),
241            stopping: AtomicBool::new(false),
242            stopping_event: Event::new(|| "TurboTasksBackend::stopping_event".to_string()),
243            idle_start_event: Event::new(|| "TurboTasksBackend::idle_start_event".to_string()),
244            idle_end_event: Event::new(|| "TurboTasksBackend::idle_end_event".to_string()),
245            #[cfg(feature = "verify_aggregation_graph")]
246            is_idle: AtomicBool::new(false),
247            task_statistics: TaskStatisticsApi::default(),
248            backing_storage,
249            #[cfg(feature = "verify_aggregation_graph")]
250            root_tasks: Default::default(),
251        }
252    }
253
254    fn execute_context<'a>(
255        &'a self,
256        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
257    ) -> impl ExecuteContext<'a> {
258        ExecuteContextImpl::new(self, turbo_tasks)
259    }
260
261    fn session_id(&self) -> SessionId {
262        self.session_id
263    }
264
265    /// # Safety
266    ///
267    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
268    unsafe fn execute_context_with_tx<'e, 'tx>(
269        &'e self,
270        tx: Option<&'e B::ReadTransaction<'tx>>,
271        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
272    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
273    where
274        'tx: 'e,
275    {
276        // Safety: `tx` is from `self`.
277        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
278    }
279
280    fn suspending_requested(&self) -> bool {
281        self.should_persist()
282            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
283    }
284
285    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
286        #[cold]
287        fn operation_suspend_point_cold<B: BackingStorage>(
288            this: &TurboTasksBackendInner<B>,
289            suspend: impl FnOnce() -> AnyOperation,
290        ) {
291            let operation = Arc::new(suspend());
292            let mut snapshot_request = this.snapshot_request.lock();
293            if snapshot_request.snapshot_requested {
294                snapshot_request
295                    .suspended_operations
296                    .insert(operation.clone().into());
297                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
298                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
299                if value == SNAPSHOT_REQUESTED_BIT {
300                    this.operations_suspended.notify_all();
301                }
302                this.snapshot_completed
303                    .wait_while(&mut snapshot_request, |snapshot_request| {
304                        snapshot_request.snapshot_requested
305                    });
306                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
307                snapshot_request
308                    .suspended_operations
309                    .remove(&operation.into());
310            }
311        }
312
313        if self.suspending_requested() {
314            operation_suspend_point_cold(self, suspend);
315        }
316    }
317
318    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
319        if !self.should_persist() {
320            return OperationGuard { backend: None };
321        }
322        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
323        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
324            let mut snapshot_request = self.snapshot_request.lock();
325            if snapshot_request.snapshot_requested {
326                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
327                if value == SNAPSHOT_REQUESTED_BIT {
328                    self.operations_suspended.notify_all();
329                }
330                self.snapshot_completed
331                    .wait_while(&mut snapshot_request, |snapshot_request| {
332                        snapshot_request.snapshot_requested
333                    });
334                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
335            }
336        }
337        OperationGuard {
338            backend: Some(self),
339        }
340    }
341
342    fn should_persist(&self) -> bool {
343        matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
344    }
345
346    fn should_restore(&self) -> bool {
347        self.options.storage_mode.is_some()
348    }
349
350    fn should_track_dependencies(&self) -> bool {
351        self.options.dependency_tracking
352    }
353
354    fn should_track_activeness(&self) -> bool {
355        self.options.active_tracking
356    }
357
358    fn should_track_children(&self) -> bool {
359        self.options.children_tracking
360    }
361
362    fn track_cache_hit(&self, task_type: &CachedTaskType) {
363        self.task_statistics
364            .map(|stats| stats.increment_cache_hit(task_type.fn_type));
365    }
366
367    fn track_cache_miss(&self, task_type: &CachedTaskType) {
368        self.task_statistics
369            .map(|stats| stats.increment_cache_miss(task_type.fn_type));
370    }
371}
372
373pub(crate) struct OperationGuard<'a, B: BackingStorage> {
374    backend: Option<&'a TurboTasksBackendInner<B>>,
375}
376
377impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
378    fn drop(&mut self) {
379        if let Some(backend) = self.backend {
380            let fetch_sub = backend
381                .in_progress_operations
382                .fetch_sub(1, Ordering::AcqRel);
383            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
384                backend.operations_suspended.notify_all();
385            }
386        }
387    }
388}
389
390// Operations
391impl<B: BackingStorage> TurboTasksBackendInner<B> {
392    /// # Safety
393    ///
394    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
395    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
396        &'l self,
397        tx: Option<&'l B::ReadTransaction<'tx>>,
398        parent_task: TaskId,
399        child_task: TaskId,
400        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
401    ) {
402        operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
403            self.execute_context_with_tx(tx, turbo_tasks)
404        });
405    }
406
407    fn connect_child(
408        &self,
409        parent_task: TaskId,
410        child_task: TaskId,
411        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
412    ) {
413        operation::ConnectChildOperation::run(
414            parent_task,
415            child_task,
416            self.execute_context(turbo_tasks),
417        );
418    }
419
420    fn try_read_task_output(
421        &self,
422        task_id: TaskId,
423        reader: Option<TaskId>,
424        consistency: ReadConsistency,
425        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
426    ) -> Result<Result<RawVc, EventListener>> {
427        if let Some(reader) = reader {
428            self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
429        }
430
431        let mut ctx = self.execute_context(turbo_tasks);
432        let mut task = ctx.task(task_id, TaskDataCategory::All);
433
434        fn listen_to_done_event<B: BackingStorage>(
435            this: &TurboTasksBackendInner<B>,
436            reader: Option<TaskId>,
437            done_event: &Event,
438        ) -> EventListener {
439            let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
440            done_event.listen_with_note(move || {
441                if let Some(reader_desc) = reader_desc.as_ref() {
442                    format!("try_read_task_output from {}", reader_desc())
443                } else {
444                    "try_read_task_output (untracked)".to_string()
445                }
446            })
447        }
448
449        fn check_in_progress<B: BackingStorage>(
450            this: &TurboTasksBackendInner<B>,
451            task: &impl TaskGuard,
452            reader: Option<TaskId>,
453            ctx: &impl ExecuteContext<'_>,
454        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
455        {
456            match get!(task, InProgress) {
457                Some(InProgressState::Scheduled { done_event, .. }) => {
458                    Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
459                }
460                Some(InProgressState::InProgress(box InProgressStateInner {
461                    done,
462                    done_event,
463                    ..
464                })) => {
465                    if !*done {
466                        Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
467                    } else {
468                        None
469                    }
470                }
471                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
472                    "{} was canceled",
473                    ctx.get_task_description(task.id())
474                ))),
475                None => None,
476            }
477        }
478
479        if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
480            // Ensure it's an root node
481            loop {
482                let aggregation_number = get_aggregation_number(&task);
483                if is_root_node(aggregation_number) {
484                    break;
485                }
486                drop(task);
487                {
488                    let _span = tracing::trace_span!(
489                        "make root node for strongly consistent read",
490                        %task_id
491                    )
492                    .entered();
493                    AggregationUpdateQueue::run(
494                        AggregationUpdateJob::UpdateAggregationNumber {
495                            task_id,
496                            base_aggregation_number: u32::MAX,
497                            distance: None,
498                        },
499                        &mut ctx,
500                    );
501                }
502                task = ctx.task(task_id, TaskDataCategory::All);
503            }
504
505            let is_dirty =
506                get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
507
508            // Check the dirty count of the root node
509            let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
510                .cloned()
511                .unwrap_or_default()
512                .get(self.session_id);
513            if dirty_tasks > 0 || is_dirty {
514                let root = get_mut!(task, Activeness);
515                let mut task_ids_to_schedule: Vec<_> = Vec::new();
516                // When there are dirty task, subscribe to the all_clean_event
517                let root = if let Some(root) = root {
518                    // This makes sure all tasks stay active and this task won't stale.
519                    // active_until_clean is automatically removed when this
520                    // task is clean.
521                    root.set_active_until_clean();
522                    root
523                } else {
524                    // If we don't have a root state, add one. This also makes sure all tasks stay
525                    // active and this task won't stale. active_until_clean
526                    // is automatically removed when this task is clean.
527                    get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
528                        .set_active_until_clean();
529                    if ctx.should_track_activeness() {
530                        // A newly added Activeness need to make sure to schedule the tasks
531                        task_ids_to_schedule = get_many!(
532                            task,
533                            AggregatedDirtyContainer {
534                                task
535                            } count if count.get(self.session_id) > 0 => {
536                                task
537                            }
538                        );
539                        task_ids_to_schedule.push(task_id);
540                    }
541                    get!(task, Activeness).unwrap()
542                };
543                let listener = root.all_clean_event.listen_with_note(move || {
544                    format!("try_read_task_output (strongly consistent) from {reader:?}")
545                });
546                drop(task);
547                if !task_ids_to_schedule.is_empty() {
548                    let mut queue = AggregationUpdateQueue::new();
549                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
550                    queue.execute(&mut ctx);
551                }
552
553                return Ok(Err(listener));
554            }
555        }
556
557        if let Some(value) = check_in_progress(self, &task, reader, &ctx) {
558            return value;
559        }
560
561        if let Some(output) = get!(task, Output) {
562            let result = match output {
563                OutputValue::Cell(cell) => Some(Ok(Ok(RawVc::TaskCell(cell.task, cell.cell)))),
564                OutputValue::Output(task) => Some(Ok(Ok(RawVc::TaskOutput(*task)))),
565                OutputValue::Error => get!(task, Error).map(|error| Err(error.clone().into())),
566            };
567            if let Some(result) = result {
568                if self.should_track_dependencies() {
569                    if let Some(reader) = reader {
570                        let _ = task.add(CachedDataItem::OutputDependent {
571                            task: reader,
572                            value: (),
573                        });
574                        drop(task);
575
576                        let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
577                        if reader_task
578                            .remove(&CachedDataItemKey::OutdatedOutputDependency {
579                                target: task_id,
580                            })
581                            .is_none()
582                        {
583                            let _ = reader_task.add(CachedDataItem::OutputDependency {
584                                target: task_id,
585                                value: (),
586                            });
587                        }
588                    }
589                }
590
591                return result;
592            }
593        }
594
595        let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
596        let note = move || {
597            if let Some(reader_desc) = reader_desc.as_ref() {
598                format!("try_read_task_output (recompute) from {}", reader_desc())
599            } else {
600                "try_read_task_output (recompute, untracked)".to_string()
601            }
602        };
603
604        // Output doesn't exist. We need to schedule the task to compute it.
605        let (item, listener) =
606            CachedDataItem::new_scheduled_with_listener(self.get_task_desc_fn(task_id), note);
607        // It's not possible that the task is InProgress at this point. If it is InProgress {
608        // done: true } it must have Output and would early return.
609        task.add_new(item);
610        turbo_tasks.schedule(task_id);
611
612        Ok(Err(listener))
613    }
614
615    fn try_read_task_cell(
616        &self,
617        task_id: TaskId,
618        reader: Option<TaskId>,
619        cell: CellId,
620        options: ReadCellOptions,
621        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
622    ) -> Result<Result<TypedCellContent, EventListener>> {
623        if let Some(reader) = reader {
624            self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
625        }
626
627        fn add_cell_dependency<B: BackingStorage>(
628            backend: &TurboTasksBackendInner<B>,
629            mut task: impl TaskGuard,
630            reader: Option<TaskId>,
631            cell: CellId,
632            task_id: TaskId,
633            ctx: &mut impl ExecuteContext<'_>,
634        ) {
635            if !backend.should_track_dependencies() {
636                return;
637            }
638            if let Some(reader) = reader {
639                if reader == task_id {
640                    // We never want to have a dependency on ourselves, otherwise we end up in a
641                    // loop of re-executing the same task.
642                    return;
643                }
644                let _ = task.add(CachedDataItem::CellDependent {
645                    cell,
646                    task: reader,
647                    value: (),
648                });
649                drop(task);
650
651                let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
652                let target = CellRef {
653                    task: task_id,
654                    cell,
655                };
656                if reader_task
657                    .remove(&CachedDataItemKey::OutdatedCellDependency { target })
658                    .is_none()
659                {
660                    let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
661                }
662            }
663        }
664
665        let mut ctx = self.execute_context(turbo_tasks);
666        let mut task = ctx.task(task_id, TaskDataCategory::Data);
667        let content = if options.final_read_hint {
668            remove!(task, CellData { cell })
669        } else if let Some(content) = get!(task, CellData { cell }) {
670            let content = content.clone();
671            Some(content)
672        } else {
673            None
674        };
675        if let Some(content) = content {
676            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
677            return Ok(Ok(TypedCellContent(
678                cell.type_id,
679                CellContent(Some(content.1)),
680            )));
681        }
682
683        let in_progress = get!(task, InProgress);
684        if matches!(
685            in_progress,
686            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
687        ) {
688            return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
689        }
690        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
691        let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));
692
693        // Check cell index range (cell might not exist at all)
694        let max_id = get!(
695            task,
696            CellTypeMaxIndex {
697                cell_type: cell.type_id
698            }
699        )
700        .copied();
701        let Some(max_id) = max_id else {
702            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
703            bail!(
704                "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
705                ctx.get_task_description(task_id)
706            );
707        };
708        if cell.index >= max_id {
709            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
710            bail!(
711                "Cell {cell:?} no longer exists in task {} (index out of bounds)",
712                ctx.get_task_description(task_id)
713            );
714        }
715
716        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
717        // task the get the cell content.
718
719        // Listen to the cell and potentially schedule the task
720        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
721        if !new_listener {
722            return Ok(Err(listener));
723        }
724
725        let _span = tracing::trace_span!(
726            "recomputation",
727            cell_type = registry::get_value_type_global_name(cell.type_id),
728            cell_index = cell.index
729        )
730        .entered();
731
732        // Schedule the task, if not already scheduled
733        if is_cancelled {
734            bail!("{} was canceled", ctx.get_task_description(task_id));
735        } else if !is_scheduled
736            && task.add(CachedDataItem::new_scheduled(
737                self.get_task_desc_fn(task_id),
738            ))
739        {
740            turbo_tasks.schedule(task_id);
741        }
742
743        Ok(Err(listener))
744    }
745
746    fn listen_to_cell(
747        &self,
748        task: &mut impl TaskGuard,
749        task_id: TaskId,
750        reader: Option<TaskId>,
751        cell: CellId,
752    ) -> (EventListener, bool) {
753        let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
754        let note = move || {
755            if let Some(reader_desc) = reader_desc.as_ref() {
756                format!("try_read_task_cell (in progress) from {}", reader_desc())
757            } else {
758                "try_read_task_cell (in progress, untracked)".to_string()
759            }
760        };
761        if let Some(in_progress) = get!(task, InProgressCell { cell }) {
762            // Someone else is already computing the cell
763            let listener = in_progress.event.listen_with_note(note);
764            return (listener, false);
765        }
766        let in_progress = InProgressCellState::new(task_id, cell);
767        let listener = in_progress.event.listen_with_note(note);
768        task.add_new(CachedDataItem::InProgressCell {
769            cell,
770            value: in_progress,
771        });
772        (listener, true)
773    }
774
775    fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
776        if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
777            return Some(task_type);
778        }
779        if self.should_restore() && !task_id.is_transient() {
780            if let Some(task_type) = unsafe {
781                self.backing_storage
782                    .reverse_lookup_task_cache(None, task_id)
783            } {
784                let _ = self.task_cache.try_insert(task_type.clone(), task_id);
785                return Some(task_type);
786            }
787        }
788        None
789    }
790
791    // TODO feature flag that for hanging detection only
792    fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
793        let task_type = self.lookup_task_type(task_id);
794        move || {
795            task_type.as_ref().map_or_else(
796                || format!("{task_id:?} transient"),
797                |task_type| format!("{task_id:?} {task_type}"),
798            )
799        }
800    }
801
802    fn snapshot(&self) -> Option<(Instant, bool)> {
803        debug_assert!(self.should_persist());
804        let mut snapshot_request = self.snapshot_request.lock();
805        snapshot_request.snapshot_requested = true;
806        let active_operations = self
807            .in_progress_operations
808            .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
809        if active_operations != 0 {
810            self.operations_suspended
811                .wait_while(&mut snapshot_request, |_| {
812                    self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
813                });
814        }
815        let suspended_operations = snapshot_request
816            .suspended_operations
817            .iter()
818            .map(|op| op.arc().clone())
819            .collect::<Vec<_>>();
820        drop(snapshot_request);
821        let mut persisted_task_cache_log = self
822            .persisted_task_cache_log
823            .as_ref()
824            .map(|l| l.take(|i| i))
825            .unwrap_or_default();
826        self.storage.start_snapshot();
827        let mut snapshot_request = self.snapshot_request.lock();
828        snapshot_request.snapshot_requested = false;
829        self.in_progress_operations
830            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
831        self.snapshot_completed.notify_all();
832        let snapshot_time = Instant::now();
833        drop(snapshot_request);
834
835        let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
836            if task_id.is_transient() {
837                return (None, None);
838            }
839            let len = inner.len();
840            let mut meta = Vec::with_capacity(len);
841            let mut data = Vec::with_capacity(len);
842            for (key, value) in inner.iter_all() {
843                if key.is_persistent() && value.is_persistent() {
844                    match key.category() {
845                        TaskDataCategory::Meta => {
846                            meta.push(CachedDataItem::from_key_and_value_ref(key, value))
847                        }
848                        TaskDataCategory::Data => {
849                            data.push(CachedDataItem::from_key_and_value_ref(key, value))
850                        }
851                        _ => {}
852                    }
853                }
854            }
855
856            (
857                inner.state().meta_restored().then_some(meta),
858                inner.state().data_restored().then_some(data),
859            )
860        };
861        let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
862            (
863                task_id,
864                meta.map(|d| B::serialize(task_id, &d)),
865                data.map(|d| B::serialize(task_id, &d)),
866            )
867        };
868        let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
869            if task_id.is_transient() {
870                return (task_id, None, None);
871            }
872            let len = inner.len();
873            let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
874            let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
875            for (key, value) in inner.iter_all() {
876                if key.is_persistent() && value.is_persistent() {
877                    match key.category() {
878                        TaskDataCategory::Meta => {
879                            if let Some(meta) = &mut meta {
880                                meta.push(CachedDataItem::from_key_and_value_ref(key, value));
881                            }
882                        }
883                        TaskDataCategory::Data => {
884                            if let Some(data) = &mut data {
885                                data.push(CachedDataItem::from_key_and_value_ref(key, value));
886                            }
887                        }
888                        _ => {}
889                    }
890                }
891            }
892            (
893                task_id,
894                meta.map(|meta| B::serialize(task_id, &meta)),
895                data.map(|data| B::serialize(task_id, &data)),
896            )
897        };
898
899        let snapshot = {
900            let _span = tracing::trace_span!("take snapshot");
901            self.storage
902                .take_snapshot(&preprocess, &process, &process_snapshot)
903        };
904
905        let task_snapshots = snapshot
906            .into_iter()
907            .filter_map(|iter| {
908                let mut iter = iter
909                    .filter_map(
910                        |(task_id, meta, data): (
911                            _,
912                            Option<Result<SmallVec<_>>>,
913                            Option<Result<SmallVec<_>>>,
914                        )| {
915                            let meta = match meta {
916                                Some(Ok(meta)) => Some(meta),
917                                None => None,
918                                Some(Err(err)) => {
919                                    println!(
920                                        "Serializing task {} failed (meta): {:?}",
921                                        self.get_task_description(task_id),
922                                        err
923                                    );
924                                    None
925                                }
926                            };
927                            let data = match data {
928                                Some(Ok(data)) => Some(data),
929                                None => None,
930                                Some(Err(err)) => {
931                                    println!(
932                                        "Serializing task {} failed (data): {:?}",
933                                        self.get_task_description(task_id),
934                                        err
935                                    );
936                                    None
937                                }
938                            };
939                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
940                        },
941                    )
942                    .peekable();
943                iter.peek().is_some().then_some(iter)
944            })
945            .collect::<Vec<_>>();
946
947        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
948
949        let mut new_items = false;
950
951        if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
952            new_items = true;
953            if let Err(err) = self.backing_storage.save_snapshot(
954                self.session_id,
955                suspended_operations,
956                persisted_task_cache_log,
957                task_snapshots,
958            ) {
959                println!("Persisting failed: {err:?}");
960                return None;
961            }
962        }
963
964        Some((snapshot_time, new_items))
965    }
966
967    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
968        if self.should_restore() {
969            // Continue all uncompleted operations
970            // They can't be interrupted by a snapshot since the snapshotting job has not been
971            // scheduled yet.
972            let uncompleted_operations = self.backing_storage.uncompleted_operations();
973            if !uncompleted_operations.is_empty() {
974                let mut ctx = self.execute_context(turbo_tasks);
975                for op in uncompleted_operations {
976                    op.execute(&mut ctx);
977                }
978            }
979        }
980
981        if self.should_persist() {
982            // Schedule the snapshot job
983            turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT);
984        }
985    }
986
987    fn stopping(&self) {
988        self.stopping.store(true, Ordering::Release);
989        self.stopping_event.notify(usize::MAX);
990    }
991
992    #[allow(unused_variables)]
993    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
994        #[cfg(feature = "verify_aggregation_graph")]
995        {
996            self.is_idle.store(false, Ordering::Release);
997            self.verify_aggregation_graph(turbo_tasks, false);
998        }
999        if let Err(err) = self.backing_storage.shutdown() {
1000            println!("Shutting down failed: {err}");
1001        }
1002    }
1003
1004    #[allow(unused_variables)]
1005    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1006        self.idle_start_event.notify(usize::MAX);
1007
1008        #[cfg(feature = "verify_aggregation_graph")]
1009        {
1010            use tokio::select;
1011
1012            self.is_idle.store(true, Ordering::Release);
1013            let this = self.clone();
1014            let turbo_tasks = turbo_tasks.pin();
1015            tokio::task::spawn(async move {
1016                select! {
1017                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1018                        // do nothing
1019                    }
1020                    _ = this.idle_end_event.listen() => {
1021                        return;
1022                    }
1023                }
1024                if !this.is_idle.load(Ordering::Relaxed) {
1025                    return;
1026                }
1027                this.verify_aggregation_graph(&*turbo_tasks, true);
1028            });
1029        }
1030    }
1031
1032    fn idle_end(&self) {
1033        #[cfg(feature = "verify_aggregation_graph")]
1034        self.is_idle.store(false, Ordering::Release);
1035        self.idle_end_event.notify(usize::MAX);
1036    }
1037
1038    fn get_or_create_persistent_task(
1039        &self,
1040        task_type: CachedTaskType,
1041        parent_task: TaskId,
1042        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1043    ) -> TaskId {
1044        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1045            self.track_cache_hit(&task_type);
1046            self.connect_child(parent_task, task_id, turbo_tasks);
1047            return task_id;
1048        }
1049
1050        self.track_cache_miss(&task_type);
1051        let tx = self
1052            .should_restore()
1053            .then(|| self.backing_storage.start_read_transaction())
1054            .flatten();
1055        let task_id = {
1056            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1057            if let Some(task_id) = unsafe {
1058                self.backing_storage
1059                    .forward_lookup_task_cache(tx.as_ref(), &task_type)
1060            } {
1061                let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1062                task_id
1063            } else {
1064                let task_type = Arc::new(task_type);
1065                let task_id = self.persisted_task_id_factory.get();
1066                let task_id = if let Err(existing_task_id) =
1067                    self.task_cache.try_insert(task_type.clone(), task_id)
1068                {
1069                    // Safety: We just created the id and failed to insert it.
1070                    unsafe {
1071                        self.persisted_task_id_factory.reuse(task_id);
1072                    }
1073                    existing_task_id
1074                } else {
1075                    task_id
1076                };
1077                if let Some(log) = &self.persisted_task_cache_log {
1078                    log.lock(task_id).push((task_type, task_id));
1079                }
1080                task_id
1081            }
1082        };
1083
1084        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1085        unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1086
1087        task_id
1088    }
1089
1090    fn get_or_create_transient_task(
1091        &self,
1092        task_type: CachedTaskType,
1093        parent_task: TaskId,
1094        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1095    ) -> TaskId {
1096        if !parent_task.is_transient() {
1097            self.panic_persistent_calling_transient(
1098                self.lookup_task_type(parent_task).as_deref(),
1099                Some(&task_type),
1100                /* cell_id */ None,
1101            );
1102        }
1103        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1104            self.track_cache_hit(&task_type);
1105            self.connect_child(parent_task, task_id, turbo_tasks);
1106            return task_id;
1107        }
1108
1109        self.track_cache_miss(&task_type);
1110        let task_type = Arc::new(task_type);
1111        let task_id = self.transient_task_id_factory.get();
1112        if let Err(existing_task_id) = self.task_cache.try_insert(task_type, task_id) {
1113            // Safety: We just created the id and failed to insert it.
1114            unsafe {
1115                self.transient_task_id_factory.reuse(task_id);
1116            }
1117            self.connect_child(parent_task, existing_task_id, turbo_tasks);
1118            return existing_task_id;
1119        }
1120
1121        self.connect_child(parent_task, task_id, turbo_tasks);
1122
1123        task_id
1124    }
1125
1126    /// Generate an object that implements [`fmt::Display`] explaining why the given
1127    /// [`CachedTaskType`] is transient.
1128    fn debug_trace_transient_task(
1129        &self,
1130        task_type: &CachedTaskType,
1131        cell_id: Option<CellId>,
1132    ) -> DebugTraceTransientTask {
1133        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1134        // from tracing the same task many times, so use a visited_set
1135        fn inner_id(
1136            backend: &TurboTasksBackendInner<impl BackingStorage>,
1137            task_id: TaskId,
1138            cell_type_id: Option<ValueTypeId>,
1139            visited_set: &mut FxHashSet<TaskId>,
1140        ) -> DebugTraceTransientTask {
1141            if let Some(task_type) = backend.lookup_task_type(task_id) {
1142                if visited_set.contains(&task_id) {
1143                    let task_name = task_type.get_name();
1144                    DebugTraceTransientTask::Collapsed {
1145                        task_name,
1146                        cell_type_id,
1147                    }
1148                } else {
1149                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1150                }
1151            } else {
1152                DebugTraceTransientTask::Uncached { cell_type_id }
1153            }
1154        }
1155        fn inner_cached(
1156            backend: &TurboTasksBackendInner<impl BackingStorage>,
1157            task_type: &CachedTaskType,
1158            cell_type_id: Option<ValueTypeId>,
1159            visited_set: &mut FxHashSet<TaskId>,
1160        ) -> DebugTraceTransientTask {
1161            let task_name = task_type.get_name();
1162
1163            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1164                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1165                    // `task_id` should never be `None` at this point, as that would imply a
1166                    // non-local task is returning a local `Vc`...
1167                    // Just ignore if it happens, as we're likely already panicking.
1168                    return None;
1169                };
1170                if task_id.is_transient() {
1171                    Some(Box::new(inner_id(
1172                        backend,
1173                        task_id,
1174                        cause_self_raw_vc.try_get_type_id(),
1175                        visited_set,
1176                    )))
1177                } else {
1178                    None
1179                }
1180            });
1181            let cause_args = task_type
1182                .arg
1183                .get_raw_vcs()
1184                .into_iter()
1185                .filter_map(|raw_vc| {
1186                    let Some(task_id) = raw_vc.try_get_task_id() else {
1187                        // `task_id` should never be `None` (see comment above)
1188                        return None;
1189                    };
1190                    if !task_id.is_transient() {
1191                        return None;
1192                    }
1193                    Some((task_id, raw_vc.try_get_type_id()))
1194                })
1195                .collect::<IndexSet<_>>() // dedupe
1196                .into_iter()
1197                .map(|(task_id, cell_type_id)| {
1198                    inner_id(backend, task_id, cell_type_id, visited_set)
1199                })
1200                .collect();
1201
1202            DebugTraceTransientTask::Cached {
1203                task_name,
1204                cell_type_id,
1205                cause_self,
1206                cause_args,
1207            }
1208        }
1209        inner_cached(
1210            self,
1211            task_type,
1212            cell_id.map(|c| c.type_id),
1213            &mut FxHashSet::default(),
1214        )
1215    }
1216
1217    fn invalidate_task(
1218        &self,
1219        task_id: TaskId,
1220        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1221    ) {
1222        if !self.should_track_dependencies() {
1223            panic!("Dependency tracking is disabled so invalidation is not allowed");
1224        }
1225        operation::InvalidateOperation::run(
1226            smallvec![task_id],
1227            #[cfg(feature = "trace_task_dirty")]
1228            TaskDirtyCause::Invalidator,
1229            self.execute_context(turbo_tasks),
1230        );
1231    }
1232
1233    fn invalidate_tasks(
1234        &self,
1235        tasks: &[TaskId],
1236        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1237    ) {
1238        if !self.should_track_dependencies() {
1239            panic!("Dependency tracking is disabled so invalidation is not allowed");
1240        }
1241        operation::InvalidateOperation::run(
1242            tasks.iter().copied().collect(),
1243            #[cfg(feature = "trace_task_dirty")]
1244            TaskDirtyCause::Unknown,
1245            self.execute_context(turbo_tasks),
1246        );
1247    }
1248
1249    fn invalidate_tasks_set(
1250        &self,
1251        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1252        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1253    ) {
1254        if !self.should_track_dependencies() {
1255            panic!("Dependency tracking is disabled so invalidation is not allowed");
1256        }
1257        operation::InvalidateOperation::run(
1258            tasks.iter().copied().collect(),
1259            #[cfg(feature = "trace_task_dirty")]
1260            TaskDirtyCause::Unknown,
1261            self.execute_context(turbo_tasks),
1262        );
1263    }
1264
1265    fn invalidate_serialization(
1266        &self,
1267        task_id: TaskId,
1268        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1269    ) {
1270        if task_id.is_transient() {
1271            return;
1272        }
1273        let mut ctx = self.execute_context(turbo_tasks);
1274        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1275        task.invalidate_serialization();
1276    }
1277
1278    fn get_task_description(&self, task_id: TaskId) -> String {
1279        self.lookup_task_type(task_id).map_or_else(
1280            || format!("{task_id:?} transient"),
1281            |task_type| task_type.to_string(),
1282        )
1283    }
1284
1285    fn try_get_function_id(&self, task_id: TaskId) -> Option<FunctionId> {
1286        self.lookup_task_type(task_id)
1287            .map(|task_type| task_type.fn_type)
1288    }
1289
1290    fn task_execution_canceled(
1291        &self,
1292        task_id: TaskId,
1293        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1294    ) {
1295        let mut ctx = self.execute_context(turbo_tasks);
1296        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1297        if let Some(in_progress) = remove!(task, InProgress) {
1298            match in_progress {
1299                InProgressState::Scheduled { done_event } => done_event.notify(usize::MAX),
1300                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1301                    done_event.notify(usize::MAX)
1302                }
1303                InProgressState::Canceled => {}
1304            }
1305        }
1306        task.add_new(CachedDataItem::InProgress {
1307            value: InProgressState::Canceled,
1308        });
1309    }
1310
1311    fn try_start_task_execution(
1312        &self,
1313        task_id: TaskId,
1314        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1315    ) -> Option<TaskExecutionSpec<'_>> {
1316        enum TaskType {
1317            Cached(Arc<CachedTaskType>),
1318            Transient(Arc<TransientTask>),
1319        }
1320        let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1321            (TaskType::Cached(task_type), false)
1322        } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1323            (
1324                TaskType::Transient(task_type.clone()),
1325                matches!(**task_type, TransientTask::Once(_)),
1326            )
1327        } else {
1328            return None;
1329        };
1330        {
1331            let mut ctx = self.execute_context(turbo_tasks);
1332            let mut task = ctx.task(task_id, TaskDataCategory::All);
1333            let in_progress = remove!(task, InProgress)?;
1334            let InProgressState::Scheduled { done_event } = in_progress else {
1335                task.add_new(CachedDataItem::InProgress { value: in_progress });
1336                return None;
1337            };
1338            task.add_new(CachedDataItem::InProgress {
1339                value: InProgressState::InProgress(Box::new(InProgressStateInner {
1340                    stale: false,
1341                    once_task,
1342                    done_event,
1343                    session_dependent: false,
1344                    marked_as_completed: false,
1345                    done: false,
1346                    new_children: Default::default(),
1347                })),
1348            });
1349
1350            if self.should_track_children() {
1351                // Make all current collectibles outdated (remove left-over outdated collectibles)
1352                enum Collectible {
1353                    Current(CollectibleRef, i32),
1354                    Outdated(CollectibleRef),
1355                }
1356                let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1357                    .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1358                    .collect::<Vec<_>>();
1359                for collectible in collectibles {
1360                    match collectible {
1361                        Collectible::Current(collectible, value) => {
1362                            let _ = task
1363                                .insert(CachedDataItem::OutdatedCollectible { collectible, value });
1364                        }
1365                        Collectible::Outdated(collectible) => {
1366                            if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1367                                task.remove(&CachedDataItemKey::OutdatedCollectible {
1368                                    collectible,
1369                                });
1370                            }
1371                        }
1372                    }
1373                }
1374            }
1375
1376            if self.should_track_dependencies() {
1377                // Make all dependencies outdated
1378                enum Dep {
1379                    CurrentCell(CellRef),
1380                    CurrentOutput(TaskId),
1381                    OutdatedCell(CellRef),
1382                    OutdatedOutput(TaskId),
1383                }
1384                let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1385                    .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1386                    .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1387                    .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1388                    .collect::<Vec<_>>();
1389                for dep in dependencies {
1390                    match dep {
1391                        Dep::CurrentCell(cell) => {
1392                            let _ = task.add(CachedDataItem::OutdatedCellDependency {
1393                                target: cell,
1394                                value: (),
1395                            });
1396                        }
1397                        Dep::CurrentOutput(output) => {
1398                            let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1399                                target: output,
1400                                value: (),
1401                            });
1402                        }
1403                        Dep::OutdatedCell(cell) => {
1404                            if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1405                                task.remove(&CachedDataItemKey::OutdatedCellDependency {
1406                                    target: cell,
1407                                });
1408                            }
1409                        }
1410                        Dep::OutdatedOutput(output) => {
1411                            if !task
1412                                .has_key(&CachedDataItemKey::OutputDependency { target: output })
1413                            {
1414                                task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1415                                    target: output,
1416                                });
1417                            }
1418                        }
1419                    }
1420                }
1421            }
1422        }
1423
1424        let (span, future) = match task_type {
1425            TaskType::Cached(task_type) => {
1426                let CachedTaskType { fn_type, this, arg } = &*task_type;
1427                (
1428                    registry::get_function(*fn_type).span(task_id.persistence()),
1429                    registry::get_function(*fn_type).execute(*this, &**arg),
1430                )
1431            }
1432            TaskType::Transient(task_type) => {
1433                let task_type = task_type.clone();
1434                let span = tracing::trace_span!("turbo_tasks::root_task");
1435                let future = match &*task_type {
1436                    TransientTask::Root(f) => f(),
1437                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1438                };
1439                (span, future)
1440            }
1441        };
1442        Some(TaskExecutionSpec { future, span })
1443    }
1444
1445    fn task_execution_result(
1446        &self,
1447        task_id: TaskId,
1448        result: Result<RawVc, Arc<TurboTasksExecutionError>>,
1449        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1450    ) {
1451        operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
1452    }
1453
1454    fn task_execution_completed(
1455        &self,
1456        task_id: TaskId,
1457        _duration: Duration,
1458        _memory_usage: usize,
1459        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1460        stateful: bool,
1461        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1462    ) -> bool {
1463        // Task completion is a 4 step process:
1464        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1465        //    aggregation number of the task and the new children.
1466        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1467        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1468        // 4. Shrink the task memory to reduce footprint of the task.
1469
1470        // Due to persistance it is possible that the process is cancelled after any step. This is
1471        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1472        // in-memory representation.
1473
1474        // The task might be invalidated during this process, so we need to change the stale flag
1475        // at the start of every step.
1476
1477        let _span = tracing::trace_span!("task execution completed").entered();
1478        let mut ctx = self.execute_context(turbo_tasks);
1479
1480        //// STEP 1 ////
1481
1482        let mut task = ctx.task(task_id, TaskDataCategory::All);
1483        let Some(in_progress) = get_mut!(task, InProgress) else {
1484            panic!("Task execution completed, but task is not in progress: {task:#?}");
1485        };
1486        let &mut InProgressState::InProgress(box InProgressStateInner {
1487            stale,
1488            ref mut done,
1489            ref done_event,
1490            ref mut new_children,
1491            ..
1492        }) = in_progress
1493        else {
1494            panic!("Task execution completed, but task is not in progress: {task:#?}");
1495        };
1496
1497        // If the task is stale, reschedule it
1498        if stale {
1499            let Some(InProgressState::InProgress(box InProgressStateInner {
1500                done_event,
1501                mut new_children,
1502                ..
1503            })) = remove!(task, InProgress)
1504            else {
1505                unreachable!();
1506            };
1507            task.add_new(CachedDataItem::InProgress {
1508                value: InProgressState::Scheduled { done_event },
1509            });
1510            // Remove old children from new_children to leave only the children that had their
1511            // active count increased
1512            for task in iter_many!(task, Child { task } => task) {
1513                new_children.remove(&task);
1514            }
1515            drop(task);
1516
1517            // We need to undo the active count increase for the children since we throw away the
1518            // new_children list now.
1519            AggregationUpdateQueue::run(
1520                AggregationUpdateJob::DecreaseActiveCounts {
1521                    task_ids: new_children.into_iter().collect(),
1522                },
1523                &mut ctx,
1524            );
1525            return true;
1526        }
1527
1528        // mark the task as completed, so dependent tasks can continue working
1529        *done = true;
1530        done_event.notify(usize::MAX);
1531
1532        // take the children from the task to process them
1533        let mut new_children = take(new_children);
1534
1535        // handle stateful
1536        if stateful {
1537            let _ = task.add(CachedDataItem::Stateful { value: () });
1538        }
1539
1540        // handle cell counters: update max index and remove cells that are no longer used
1541        let old_counters: FxHashMap<_, _> =
1542            get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1543        let mut counters_to_remove = old_counters.clone();
1544        for (&cell_type, &max_index) in cell_counters.iter() {
1545            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1546                if old_max_index != max_index {
1547                    task.insert(CachedDataItem::CellTypeMaxIndex {
1548                        cell_type,
1549                        value: max_index,
1550                    });
1551                }
1552            } else {
1553                task.add_new(CachedDataItem::CellTypeMaxIndex {
1554                    cell_type,
1555                    value: max_index,
1556                });
1557            }
1558        }
1559        for (cell_type, _) in counters_to_remove {
1560            task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1561        }
1562
1563        let mut queue = AggregationUpdateQueue::new();
1564
1565        let mut removed_data = Vec::new();
1566        let mut old_edges = Vec::new();
1567
1568        let has_children = !new_children.is_empty();
1569
1570        // Prepare all new children
1571        if has_children {
1572            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1573        }
1574
1575        // Filter actual new children
1576        if has_children {
1577            old_edges.extend(
1578                iter_many!(task, Child { task } => task)
1579                    .filter(|task| !new_children.remove(task))
1580                    .map(OutdatedEdge::Child),
1581            );
1582        } else {
1583            old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
1584        }
1585
1586        // Remove no longer existing cells and
1587        // find all outdated data items (removed cells, outdated edges)
1588        // Note: For persistent tasks we only want to call extract_if when there are actual cells to
1589        // remove to avoid tracking that as modification.
1590        if task_id.is_transient() || iter_many!(task, CellData { cell }
1591            if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
1592        ).count() > 0 {
1593            removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
1594                matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
1595                            .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
1596            }));
1597        }
1598        if self.should_track_children() {
1599            old_edges.extend(
1600                task.iter(CachedDataItemType::OutdatedCollectible)
1601                    .filter_map(|(key, value)| match (key, value) {
1602                        (
1603                            CachedDataItemKey::OutdatedCollectible { collectible },
1604                            CachedDataItemValueRef::OutdatedCollectible { value },
1605                        ) => Some(OutdatedEdge::Collectible(collectible, *value)),
1606                        _ => None,
1607                    }),
1608            );
1609        }
1610        if self.should_track_dependencies() {
1611            old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
1612            old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
1613            old_edges.extend(
1614                iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
1615                    |(cell, task)| {
1616                        if cell_counters
1617                            .get(&cell.type_id)
1618                            .is_none_or(|start_index| cell.index >= *start_index)
1619                        {
1620                            if let Some(old_counter) = old_counters.get(&cell.type_id) {
1621                                if cell.index < *old_counter {
1622                                    return Some(OutdatedEdge::RemovedCellDependent {
1623                                        task_id: task,
1624                                        #[cfg(feature = "trace_task_dirty")]
1625                                        value_type_id: cell.type_id,
1626                                    });
1627                                }
1628                            }
1629                        }
1630                        None
1631                    },
1632                ),
1633            );
1634        }
1635
1636        drop(task);
1637
1638        if !queue.is_empty() || !old_edges.is_empty() {
1639            #[cfg(feature = "trace_task_completion")]
1640            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
1641            // Remove outdated edges first, before removing in_progress+dirty flag.
1642            // We need to make sure all outdated edges are removed before the task can potentially
1643            // be scheduled and executed again
1644            CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx);
1645        }
1646
1647        // When restoring from persistent caching the following might not be executed (since we can
1648        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1649        // would be executed again.
1650
1651        //// STEP 2 ////
1652
1653        let mut task = ctx.task(task_id, TaskDataCategory::All);
1654        let Some(in_progress) = get!(task, InProgress) else {
1655            panic!("Task execution completed, but task is not in progress: {task:#?}");
1656        };
1657        let InProgressState::InProgress(box InProgressStateInner { stale, .. }) = in_progress
1658        else {
1659            panic!("Task execution completed, but task is not in progress: {task:#?}");
1660        };
1661
1662        // If the task is stale, reschedule it
1663        if *stale {
1664            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
1665                remove!(task, InProgress)
1666            else {
1667                unreachable!();
1668            };
1669            task.add_new(CachedDataItem::InProgress {
1670                value: InProgressState::Scheduled { done_event },
1671            });
1672            drop(task);
1673
1674            // All `new_children` are currently hold active with an active count and we need to undo
1675            // that. (We already filtered out the old children from that list)
1676            AggregationUpdateQueue::run(
1677                AggregationUpdateJob::DecreaseActiveCounts {
1678                    task_ids: new_children.into_iter().collect(),
1679                },
1680                &mut ctx,
1681            );
1682            return true;
1683        }
1684
1685        let mut queue = AggregationUpdateQueue::new();
1686
1687        if has_children {
1688            let has_active_count = ctx.should_track_activeness()
1689                && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
1690            connect_children(
1691                task_id,
1692                &mut task,
1693                new_children,
1694                &mut queue,
1695                has_active_count,
1696                ctx.should_track_activeness(),
1697            );
1698        }
1699
1700        drop(task);
1701
1702        if has_children {
1703            #[cfg(feature = "trace_task_completion")]
1704            let _span = tracing::trace_span!("connect new children").entered();
1705            queue.execute(&mut ctx);
1706        }
1707
1708        let mut task = ctx.task(task_id, TaskDataCategory::All);
1709        let Some(in_progress) = remove!(task, InProgress) else {
1710            panic!("Task execution completed, but task is not in progress: {task:#?}");
1711        };
1712        let InProgressState::InProgress(box InProgressStateInner {
1713            done_event,
1714            once_task: _,
1715            stale,
1716            session_dependent,
1717            done: _,
1718            marked_as_completed: _,
1719            new_children,
1720        }) = in_progress
1721        else {
1722            panic!("Task execution completed, but task is not in progress: {task:#?}");
1723        };
1724        debug_assert!(new_children.is_empty());
1725
1726        // If the task is stale, reschedule it
1727        if stale {
1728            task.add_new(CachedDataItem::InProgress {
1729                value: InProgressState::Scheduled { done_event },
1730            });
1731            return true;
1732        }
1733
1734        // Notify in progress cells
1735        removed_data.extend(task.extract_if(
1736            CachedDataItemType::InProgressCell,
1737            |key, value| match (key, value) {
1738                (
1739                    CachedDataItemKey::InProgressCell { .. },
1740                    CachedDataItemValueRef::InProgressCell { value },
1741                ) => {
1742                    value.event.notify(usize::MAX);
1743                    true
1744                }
1745                _ => false,
1746            },
1747        ));
1748
1749        // Update the dirty state
1750        let old_dirty_state = get!(task, Dirty).copied();
1751
1752        let new_dirty_state = if session_dependent {
1753            Some(DirtyState {
1754                clean_in_session: Some(self.session_id),
1755            })
1756        } else {
1757            None
1758        };
1759
1760        let data_update = if old_dirty_state != new_dirty_state {
1761            if let Some(new_dirty_state) = new_dirty_state {
1762                task.insert(CachedDataItem::Dirty {
1763                    value: new_dirty_state,
1764                });
1765            } else {
1766                task.remove(&CachedDataItemKey::Dirty {});
1767            }
1768
1769            if self.should_track_children()
1770                && (old_dirty_state.is_some() || new_dirty_state.is_some())
1771            {
1772                let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
1773                    .cloned()
1774                    .unwrap_or_default();
1775                if let Some(old_dirty_state) = old_dirty_state {
1776                    dirty_containers.update_with_dirty_state(&old_dirty_state);
1777                }
1778                let aggregated_update = match (old_dirty_state, new_dirty_state) {
1779                    (None, None) => unreachable!(),
1780                    (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
1781                    (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
1782                    (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
1783                };
1784                if !aggregated_update.is_zero() {
1785                    if aggregated_update.get(self.session_id) < 0 {
1786                        if let Some(root_state) = get_mut!(task, Activeness) {
1787                            root_state.all_clean_event.notify(usize::MAX);
1788                            root_state.unset_active_until_clean();
1789                            if root_state.is_empty() {
1790                                task.remove(&CachedDataItemKey::Activeness {});
1791                            }
1792                        }
1793                    }
1794                    AggregationUpdateJob::data_update(
1795                        &mut task,
1796                        AggregatedDataUpdate::new()
1797                            .dirty_container_update(task_id, aggregated_update),
1798                    )
1799                } else {
1800                    None
1801                }
1802            } else {
1803                None
1804            }
1805        } else {
1806            None
1807        };
1808
1809        drop(task);
1810
1811        if let Some(data_update) = data_update {
1812            AggregationUpdateQueue::run(data_update, &mut ctx);
1813        }
1814
1815        drop(removed_data);
1816
1817        //// STEP 4 ////
1818
1819        let mut task = ctx.task(task_id, TaskDataCategory::All);
1820        task.shrink_to_fit(CachedDataItemType::CellData);
1821        task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
1822        task.shrink_to_fit(CachedDataItemType::CellDependency);
1823        task.shrink_to_fit(CachedDataItemType::OutputDependency);
1824        task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
1825        drop(task);
1826
1827        false
1828    }
1829
1830    fn run_backend_job<'a>(
1831        self: &'a Arc<Self>,
1832        id: BackendJobId,
1833        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1834    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
1835        Box::pin(async move {
1836            if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
1837                debug_assert!(self.should_persist());
1838
1839                let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
1840                let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
1841                loop {
1842                    const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(60);
1843                    const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(30);
1844                    const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
1845
1846                    let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
1847                        FIRST_SNAPSHOT_WAIT
1848                    } else {
1849                        SNAPSHOT_INTERVAL
1850                    };
1851
1852                    let until = last_snapshot + time;
1853                    if until > Instant::now() {
1854                        let mut stop_listener = self.stopping_event.listen();
1855                        if !self.stopping.load(Ordering::Acquire) {
1856                            let mut idle_start_listener = self.idle_start_event.listen();
1857                            let mut idle_end_listener = self.idle_end_event.listen();
1858                            let mut idle_time = if turbo_tasks.is_idle() {
1859                                Instant::now() + IDLE_TIMEOUT
1860                            } else {
1861                                far_future()
1862                            };
1863                            loop {
1864                                tokio::select! {
1865                                    _ = &mut stop_listener => {
1866                                        break;
1867                                    },
1868                                    _ = &mut idle_start_listener => {
1869                                        idle_time = Instant::now() + IDLE_TIMEOUT;
1870                                        idle_start_listener = self.idle_start_event.listen()
1871                                    },
1872                                    _ = &mut idle_end_listener => {
1873                                        idle_time = until + IDLE_TIMEOUT;
1874                                        idle_end_listener = self.idle_end_event.listen()
1875                                    },
1876                                    _ = tokio::time::sleep_until(until) => {
1877                                        break;
1878                                    },
1879                                    _ = tokio::time::sleep_until(idle_time) => {
1880                                        if turbo_tasks.is_idle() {
1881                                            break;
1882                                        }
1883                                    },
1884                                }
1885                            }
1886                        }
1887                    }
1888
1889                    let this = self.clone();
1890                    let snapshot = turbo_tasks::spawn_blocking(move || this.snapshot()).await;
1891                    if let Some((snapshot_start, new_data)) = snapshot {
1892                        last_snapshot = snapshot_start;
1893                        if new_data {
1894                            continue;
1895                        }
1896                        let last_snapshot = last_snapshot.duration_since(self.start_time);
1897                        self.last_snapshot.store(
1898                            last_snapshot.as_millis().try_into().unwrap(),
1899                            Ordering::Relaxed,
1900                        );
1901
1902                        turbo_tasks.schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT);
1903                        return;
1904                    }
1905                }
1906            }
1907        })
1908    }
1909
1910    fn try_read_own_task_cell_untracked(
1911        &self,
1912        task_id: TaskId,
1913        cell: CellId,
1914        _options: ReadCellOptions,
1915        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1916    ) -> Result<TypedCellContent> {
1917        let mut ctx = self.execute_context(turbo_tasks);
1918        let task = ctx.task(task_id, TaskDataCategory::Data);
1919        if let Some(content) = get!(task, CellData { cell }) {
1920            Ok(CellContent(Some(content.1.clone())).into_typed(cell.type_id))
1921        } else {
1922            Ok(CellContent(None).into_typed(cell.type_id))
1923        }
1924    }
1925
1926    fn read_task_collectibles(
1927        &self,
1928        task_id: TaskId,
1929        collectible_type: TraitTypeId,
1930        reader_id: TaskId,
1931        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1932    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
1933        if !self.should_track_children() {
1934            return AutoMap::default();
1935        }
1936
1937        let mut ctx = self.execute_context(turbo_tasks);
1938        let mut collectibles = AutoMap::default();
1939        {
1940            let mut task = ctx.task(task_id, TaskDataCategory::All);
1941            // Ensure it's an root node
1942            loop {
1943                let aggregation_number = get_aggregation_number(&task);
1944                if is_root_node(aggregation_number) {
1945                    break;
1946                }
1947                drop(task);
1948                AggregationUpdateQueue::run(
1949                    AggregationUpdateJob::UpdateAggregationNumber {
1950                        task_id,
1951                        base_aggregation_number: u32::MAX,
1952                        distance: None,
1953                    },
1954                    &mut ctx,
1955                );
1956                task = ctx.task(task_id, TaskDataCategory::All);
1957            }
1958            for collectible in iter_many!(
1959                task,
1960                AggregatedCollectible {
1961                    collectible
1962                } count if collectible.collectible_type == collectible_type && *count > 0 => {
1963                    collectible.cell
1964                }
1965            ) {
1966                *collectibles
1967                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
1968                    .or_insert(0) += 1;
1969            }
1970            for (collectible, count) in iter_many!(
1971                task,
1972                Collectible {
1973                    collectible
1974                } count if collectible.collectible_type == collectible_type => {
1975                    (collectible.cell, *count)
1976                }
1977            ) {
1978                *collectibles
1979                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
1980                    .or_insert(0) += count;
1981            }
1982            let _ = task.add(CachedDataItem::CollectiblesDependent {
1983                collectible_type,
1984                task: reader_id,
1985                value: (),
1986            });
1987        }
1988        {
1989            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
1990            let target = CollectiblesRef {
1991                task: task_id,
1992                collectible_type,
1993            };
1994            if reader.add(CachedDataItem::CollectiblesDependency { target, value: () }) {
1995                reader.remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target });
1996            }
1997        }
1998        collectibles
1999    }
2000
2001    fn emit_collectible(
2002        &self,
2003        collectible_type: TraitTypeId,
2004        collectible: RawVc,
2005        task_id: TaskId,
2006        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2007    ) {
2008        self.assert_valid_collectible(task_id, collectible);
2009        if !self.should_track_children() {
2010            return;
2011        }
2012
2013        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2014            panic!("Collectibles need to be resolved");
2015        };
2016        let cell = CellRef {
2017            task: collectible_task,
2018            cell,
2019        };
2020        operation::UpdateCollectibleOperation::run(
2021            task_id,
2022            CollectibleRef {
2023                collectible_type,
2024                cell,
2025            },
2026            1,
2027            self.execute_context(turbo_tasks),
2028        );
2029    }
2030
2031    fn unemit_collectible(
2032        &self,
2033        collectible_type: TraitTypeId,
2034        collectible: RawVc,
2035        count: u32,
2036        task_id: TaskId,
2037        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2038    ) {
2039        self.assert_valid_collectible(task_id, collectible);
2040        if !self.should_track_children() {
2041            return;
2042        }
2043
2044        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2045            panic!("Collectibles need to be resolved");
2046        };
2047        let cell = CellRef {
2048            task: collectible_task,
2049            cell,
2050        };
2051        operation::UpdateCollectibleOperation::run(
2052            task_id,
2053            CollectibleRef {
2054                collectible_type,
2055                cell,
2056            },
2057            -(i32::try_from(count).unwrap()),
2058            self.execute_context(turbo_tasks),
2059        );
2060    }
2061
2062    fn update_task_cell(
2063        &self,
2064        task_id: TaskId,
2065        cell: CellId,
2066        content: CellContent,
2067        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2068    ) {
2069        operation::UpdateCellOperation::run(
2070            task_id,
2071            cell,
2072            content,
2073            self.execute_context(turbo_tasks),
2074        );
2075    }
2076
2077    fn mark_own_task_as_session_dependent(
2078        &self,
2079        task_id: TaskId,
2080        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2081    ) {
2082        if !self.should_track_dependencies() {
2083            // Without dependency tracking we don't need session dependent tasks
2084            return;
2085        }
2086        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2087        let mut ctx = self.execute_context(turbo_tasks);
2088        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2089        let aggregation_number = get_aggregation_number(&task);
2090        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2091            drop(task);
2092            // We want to use a high aggregation number to avoid large aggregation chains for
2093            // session dependent tasks (which change on every run)
2094            AggregationUpdateQueue::run(
2095                AggregationUpdateJob::UpdateAggregationNumber {
2096                    task_id,
2097                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2098                    distance: None,
2099                },
2100                &mut ctx,
2101            );
2102            task = ctx.task(task_id, TaskDataCategory::Meta);
2103        }
2104        if let Some(InProgressState::InProgress(box InProgressStateInner {
2105            session_dependent,
2106            ..
2107        })) = get_mut!(task, InProgress)
2108        {
2109            *session_dependent = true;
2110        }
2111    }
2112
2113    fn mark_own_task_as_finished(
2114        &self,
2115        task: TaskId,
2116        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2117    ) {
2118        let mut ctx = self.execute_context(turbo_tasks);
2119        let mut task = ctx.task(task, TaskDataCategory::Data);
2120        if let Some(InProgressState::InProgress(box InProgressStateInner {
2121            marked_as_completed,
2122            ..
2123        })) = get_mut!(task, InProgress)
2124        {
2125            *marked_as_completed = true;
2126            // TODO this should remove the dirty state (also check session_dependent)
2127            // but this would break some assumptions for strongly consistent reads.
2128            // Client tasks are not connected yet, so we wouldn't wait for them.
2129            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
2130        }
2131    }
2132
2133    fn set_own_task_aggregation_number(
2134        &self,
2135        task: TaskId,
2136        aggregation_number: u32,
2137        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2138    ) {
2139        let mut ctx = self.execute_context(turbo_tasks);
2140        AggregationUpdateQueue::run(
2141            AggregationUpdateJob::UpdateAggregationNumber {
2142                task_id: task,
2143                base_aggregation_number: aggregation_number,
2144                distance: None,
2145            },
2146            &mut ctx,
2147        );
2148    }
2149
2150    fn connect_task(
2151        &self,
2152        task: TaskId,
2153        parent_task: TaskId,
2154        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2155    ) {
2156        self.assert_not_persistent_calling_transient(parent_task, task, None);
2157        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2158    }
2159
2160    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2161        let task_id = self.transient_task_id_factory.get();
2162        let root_type = match task_type {
2163            TransientTaskType::Root(_) => RootType::RootTask,
2164            TransientTaskType::Once(_) => RootType::OnceTask,
2165        };
2166        self.transient_tasks.insert(
2167            task_id,
2168            Arc::new(match task_type {
2169                TransientTaskType::Root(f) => TransientTask::Root(f),
2170                TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2171            }),
2172        );
2173        {
2174            let mut task = self.storage.access_mut(task_id);
2175            task.add(CachedDataItem::AggregationNumber {
2176                value: AggregationNumber {
2177                    base: u32::MAX,
2178                    distance: 0,
2179                    effective: u32::MAX,
2180                },
2181            });
2182            if self.should_track_activeness() {
2183                task.add(CachedDataItem::Activeness {
2184                    value: ActivenessState::new_root(root_type, task_id),
2185                });
2186            }
2187            task.add(CachedDataItem::new_scheduled(move || match root_type {
2188                RootType::RootTask => "Root Task".to_string(),
2189                RootType::OnceTask => "Once Task".to_string(),
2190            }));
2191        }
2192        #[cfg(feature = "verify_aggregation_graph")]
2193        self.root_tasks.lock().insert(task_id);
2194        task_id
2195    }
2196
2197    fn dispose_root_task(
2198        &self,
2199        task_id: TaskId,
2200        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2201    ) {
2202        #[cfg(feature = "verify_aggregation_graph")]
2203        self.root_tasks.lock().remove(&task_id);
2204
2205        let mut ctx = self.execute_context(turbo_tasks);
2206        let mut task = ctx.task(task_id, TaskDataCategory::All);
2207        let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2208        let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2209            .map_or(false, |dirty_containers| {
2210                dirty_containers.get(self.session_id) > 0
2211            });
2212        if is_dirty || has_dirty_containers {
2213            if let Some(root_state) = get_mut!(task, Activeness) {
2214                // We will finish the task, but it would be removed after the task is done
2215                root_state.unset_root_type();
2216                root_state.set_active_until_clean();
2217            };
2218        } else if let Some(root_state) = remove!(task, Activeness) {
2219            // Technically nobody should be listening to this event, but just in case
2220            // we notify it anyway
2221            root_state.all_clean_event.notify(usize::MAX);
2222        }
2223    }
2224
2225    #[cfg(feature = "verify_aggregation_graph")]
2226    fn verify_aggregation_graph(
2227        &self,
2228        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2229        idle: bool,
2230    ) {
2231        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2232            return;
2233        }
2234        use std::{collections::VecDeque, env, io::stdout};
2235
2236        use crate::backend::operation::{get_uppers, is_aggregating_node};
2237
2238        let mut ctx = self.execute_context(turbo_tasks);
2239        let root_tasks = self.root_tasks.lock().clone();
2240        let len = root_tasks.len();
2241
2242        for (i, task_id) in root_tasks.into_iter().enumerate() {
2243            println!("Verifying graph from root {task_id} {i}/{len}...");
2244            let mut queue = VecDeque::new();
2245            let mut visited = FxHashSet::default();
2246            let mut aggregated_nodes = FxHashSet::default();
2247            let mut collectibles = FxHashMap::default();
2248            let root_task_id = task_id;
2249            visited.insert(task_id);
2250            aggregated_nodes.insert(task_id);
2251            queue.push_back(task_id);
2252            let mut counter = 0;
2253            while let Some(task_id) = queue.pop_front() {
2254                counter += 1;
2255                if counter % 100000 == 0 {
2256                    println!(
2257                        "queue={}, visited={}, aggregated_nodes={}",
2258                        queue.len(),
2259                        visited.len(),
2260                        aggregated_nodes.len()
2261                    );
2262                }
2263                let task = ctx.task(task_id, TaskDataCategory::All);
2264                if idle && !self.is_idle.load(Ordering::Relaxed) {
2265                    return;
2266                }
2267
2268                let uppers = get_uppers(&task);
2269                if task_id != root_task_id
2270                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2271                {
2272                    println!(
2273                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2274                         {:?})",
2275                        task_id,
2276                        ctx.get_task_description(task_id),
2277                        uppers
2278                    );
2279                }
2280
2281                let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2282                for collectible in aggregated_collectibles {
2283                    collectibles
2284                        .entry(collectible)
2285                        .or_insert_with(|| (false, Vec::new()))
2286                        .1
2287                        .push(task_id);
2288                }
2289
2290                let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2291                for collectible in own_collectibles {
2292                    if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2293                        *flag = true
2294                    } else {
2295                        println!(
2296                            "Task {} has a collectible {:?} that is not in any upper task",
2297                            task_id, collectible
2298                        );
2299                    }
2300                }
2301
2302                let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2303                let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2304                    .is_some_and(|count| count.get(self.session_id) > 0);
2305                let should_be_in_upper = is_dirty || has_dirty_container;
2306
2307                let aggregation_number = get_aggregation_number(&task);
2308                if is_aggregating_node(aggregation_number) {
2309                    aggregated_nodes.insert(task_id);
2310                }
2311                // println!(
2312                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
2313                //     ctx.get_task_description(task_id),
2314                //     uppers
2315                // );
2316
2317                for child_id in iter_many!(task, Child { task } => task) {
2318                    // println!("{task_id}: child -> {child_id}");
2319                    if visited.insert(child_id) {
2320                        queue.push_back(child_id);
2321                    }
2322                }
2323                drop(task);
2324
2325                if should_be_in_upper {
2326                    for upper_id in uppers {
2327                        let task = ctx.task(task_id, TaskDataCategory::All);
2328                        let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2329                            .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2330                        if !in_upper {
2331                            println!(
2332                                "Task {} is dirty, but is not listed in the upper task {}",
2333                                task_id, upper_id
2334                            );
2335                        }
2336                    }
2337                }
2338            }
2339
2340            for (collectible, (flag, task_ids)) in collectibles {
2341                if !flag {
2342                    use std::io::Write;
2343                    let mut stdout = stdout().lock();
2344                    writeln!(
2345                        stdout,
2346                        "{:?} that is not emitted in any child task but in these aggregated \
2347                         tasks: {:#?}",
2348                        collectible,
2349                        task_ids
2350                            .iter()
2351                            .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
2352                            .collect::<Vec<_>>()
2353                    );
2354
2355                    let task_id = collectible.cell.task;
2356                    let mut queue = {
2357                        let task = ctx.task(task_id, TaskDataCategory::All);
2358                        get_uppers(&task)
2359                    };
2360                    let mut visited = FxHashSet::default();
2361                    for &upper_id in queue.iter() {
2362                        visited.insert(upper_id);
2363                        writeln!(stdout, "{task_id:?} -> {upper_id:?}");
2364                    }
2365                    while let Some(task_id) = queue.pop() {
2366                        let desc = ctx.get_task_description(task_id);
2367                        let task = ctx.task(task_id, TaskDataCategory::All);
2368                        let aggregated_collectible =
2369                            get!(task, AggregatedCollectible { collectible })
2370                                .copied()
2371                                .unwrap_or_default();
2372                        let uppers = get_uppers(&task);
2373                        drop(task);
2374                        writeln!(
2375                            stdout,
2376                            "upper {task_id} {desc} collectible={aggregated_collectible}"
2377                        );
2378                        if task_ids.contains(&task_id) {
2379                            writeln!(
2380                                stdout,
2381                                "Task has an upper connection to an aggregated task that doesn't \
2382                                 reference it. Upper connection is invalid!"
2383                            );
2384                        }
2385                        for upper_id in uppers {
2386                            writeln!(stdout, "{task_id:?} -> {upper_id:?}");
2387                            if !visited.contains(&upper_id) {
2388                                queue.push(upper_id);
2389                            }
2390                        }
2391                    }
2392                }
2393            }
2394            println!("visited {task_id} {} tasks", visited.len());
2395        }
2396    }
2397
2398    fn assert_not_persistent_calling_transient(
2399        &self,
2400        parent_id: TaskId,
2401        child_id: TaskId,
2402        cell_id: Option<CellId>,
2403    ) {
2404        if !parent_id.is_transient() && child_id.is_transient() {
2405            self.panic_persistent_calling_transient(
2406                self.lookup_task_type(parent_id).as_deref(),
2407                self.lookup_task_type(child_id).as_deref(),
2408                cell_id,
2409            );
2410        }
2411    }
2412
2413    fn panic_persistent_calling_transient(
2414        &self,
2415        parent: Option<&CachedTaskType>,
2416        child: Option<&CachedTaskType>,
2417        cell_id: Option<CellId>,
2418    ) {
2419        let transient_reason = if let Some(child) = child {
2420            Cow::Owned(format!(
2421                " The callee is transient because it depends on:\n{}",
2422                self.debug_trace_transient_task(child, cell_id),
2423            ))
2424        } else {
2425            Cow::Borrowed("")
2426        };
2427        panic!(
2428            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
2429            parent.map_or("unknown", |t| t.get_name()),
2430            child.map_or("unknown", |t| t.get_name()),
2431            transient_reason,
2432        );
2433    }
2434
2435    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
2436        // these checks occur in a potentially hot codepath, but they're cheap
2437        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
2438            // This should never happen: The collectible APIs use ResolvedVc
2439            let task_info = if let Some(col_task_ty) = collectible
2440                .try_get_task_id()
2441                .and_then(|t| self.lookup_task_type(t))
2442            {
2443                Cow::Owned(format!(" (return type of {col_task_ty})"))
2444            } else {
2445                Cow::Borrowed("")
2446            };
2447            panic!("Collectible{task_info} must be a ResolvedVc")
2448        };
2449        if col_task_id.is_transient() && !task_id.is_transient() {
2450            let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
2451                Cow::Owned(format!(
2452                    ". The collectible is transient because it depends on:\n{}",
2453                    self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
2454                ))
2455            } else {
2456                Cow::Borrowed("")
2457            };
2458            // this should never happen: How would a persistent function get a transient Vc?
2459            panic!(
2460                "Collectible is transient, transient collectibles cannot be emitted from \
2461                 persistent tasks{transient_reason}",
2462            )
2463        }
2464    }
2465}
2466
2467impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
2468    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2469        self.0.startup(turbo_tasks);
2470    }
2471
2472    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2473        self.0.stopping();
2474    }
2475
2476    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2477        self.0.stop(turbo_tasks);
2478    }
2479
2480    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2481        self.0.idle_start(turbo_tasks);
2482    }
2483
2484    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2485        self.0.idle_end();
2486    }
2487
2488    fn get_or_create_persistent_task(
2489        &self,
2490        task_type: CachedTaskType,
2491        parent_task: TaskId,
2492        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2493    ) -> TaskId {
2494        self.0
2495            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
2496    }
2497
2498    fn get_or_create_transient_task(
2499        &self,
2500        task_type: CachedTaskType,
2501        parent_task: TaskId,
2502        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2503    ) -> TaskId {
2504        self.0
2505            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
2506    }
2507
2508    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2509        self.0.invalidate_task(task_id, turbo_tasks);
2510    }
2511
2512    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2513        self.0.invalidate_tasks(tasks, turbo_tasks);
2514    }
2515
2516    fn invalidate_tasks_set(
2517        &self,
2518        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
2519        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2520    ) {
2521        self.0.invalidate_tasks_set(tasks, turbo_tasks);
2522    }
2523
2524    fn invalidate_serialization(
2525        &self,
2526        task_id: TaskId,
2527        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2528    ) {
2529        self.0.invalidate_serialization(task_id, turbo_tasks);
2530    }
2531
2532    fn get_task_description(&self, task: TaskId) -> String {
2533        self.0.get_task_description(task)
2534    }
2535
2536    fn try_get_function_id(&self, task_id: TaskId) -> Option<FunctionId> {
2537        self.0.try_get_function_id(task_id)
2538    }
2539
2540    type TaskState = ();
2541    fn new_task_state(&self, _task: TaskId) -> Self::TaskState {}
2542
2543    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2544        self.0.task_execution_canceled(task, turbo_tasks)
2545    }
2546
2547    fn try_start_task_execution(
2548        &self,
2549        task_id: TaskId,
2550        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2551    ) -> Option<TaskExecutionSpec<'_>> {
2552        self.0.try_start_task_execution(task_id, turbo_tasks)
2553    }
2554
2555    fn task_execution_result(
2556        &self,
2557        task_id: TaskId,
2558        result: Result<RawVc, Arc<TurboTasksExecutionError>>,
2559        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2560    ) {
2561        self.0.task_execution_result(task_id, result, turbo_tasks);
2562    }
2563
2564    fn task_execution_completed(
2565        &self,
2566        task_id: TaskId,
2567        _duration: Duration,
2568        _memory_usage: usize,
2569        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2570        stateful: bool,
2571        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2572    ) -> bool {
2573        self.0.task_execution_completed(
2574            task_id,
2575            _duration,
2576            _memory_usage,
2577            cell_counters,
2578            stateful,
2579            turbo_tasks,
2580        )
2581    }
2582
2583    fn run_backend_job<'a>(
2584        &'a self,
2585        id: BackendJobId,
2586        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
2587    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2588        self.0.run_backend_job(id, turbo_tasks)
2589    }
2590
2591    fn try_read_task_output(
2592        &self,
2593        task_id: TaskId,
2594        reader: TaskId,
2595        consistency: ReadConsistency,
2596        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2597    ) -> Result<Result<RawVc, EventListener>> {
2598        self.0
2599            .try_read_task_output(task_id, Some(reader), consistency, turbo_tasks)
2600    }
2601
2602    fn try_read_task_output_untracked(
2603        &self,
2604        task_id: TaskId,
2605        consistency: ReadConsistency,
2606        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2607    ) -> Result<Result<RawVc, EventListener>> {
2608        self.0
2609            .try_read_task_output(task_id, None, consistency, turbo_tasks)
2610    }
2611
2612    fn try_read_task_cell(
2613        &self,
2614        task_id: TaskId,
2615        cell: CellId,
2616        reader: TaskId,
2617        options: ReadCellOptions,
2618        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2619    ) -> Result<Result<TypedCellContent, EventListener>> {
2620        self.0
2621            .try_read_task_cell(task_id, Some(reader), cell, options, turbo_tasks)
2622    }
2623
2624    fn try_read_task_cell_untracked(
2625        &self,
2626        task_id: TaskId,
2627        cell: CellId,
2628        options: ReadCellOptions,
2629        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2630    ) -> Result<Result<TypedCellContent, EventListener>> {
2631        self.0
2632            .try_read_task_cell(task_id, None, cell, options, turbo_tasks)
2633    }
2634
2635    fn try_read_own_task_cell_untracked(
2636        &self,
2637        task_id: TaskId,
2638        cell: CellId,
2639        options: ReadCellOptions,
2640        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2641    ) -> Result<TypedCellContent> {
2642        self.0
2643            .try_read_own_task_cell_untracked(task_id, cell, options, turbo_tasks)
2644    }
2645
2646    fn read_task_collectibles(
2647        &self,
2648        task_id: TaskId,
2649        collectible_type: TraitTypeId,
2650        reader: TaskId,
2651        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2652    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2653        self.0
2654            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
2655    }
2656
2657    fn emit_collectible(
2658        &self,
2659        collectible_type: TraitTypeId,
2660        collectible: RawVc,
2661        task_id: TaskId,
2662        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2663    ) {
2664        self.0
2665            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
2666    }
2667
2668    fn unemit_collectible(
2669        &self,
2670        collectible_type: TraitTypeId,
2671        collectible: RawVc,
2672        count: u32,
2673        task_id: TaskId,
2674        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2675    ) {
2676        self.0
2677            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
2678    }
2679
2680    fn update_task_cell(
2681        &self,
2682        task_id: TaskId,
2683        cell: CellId,
2684        content: CellContent,
2685        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2686    ) {
2687        self.0.update_task_cell(task_id, cell, content, turbo_tasks);
2688    }
2689
2690    fn mark_own_task_as_finished(
2691        &self,
2692        task_id: TaskId,
2693        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2694    ) {
2695        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
2696    }
2697
2698    fn set_own_task_aggregation_number(
2699        &self,
2700        task: TaskId,
2701        aggregation_number: u32,
2702        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2703    ) {
2704        self.0
2705            .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
2706    }
2707
2708    fn mark_own_task_as_session_dependent(
2709        &self,
2710        task: TaskId,
2711        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2712    ) {
2713        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
2714    }
2715
2716    fn connect_task(
2717        &self,
2718        task: TaskId,
2719        parent_task: TaskId,
2720        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2721    ) {
2722        self.0.connect_task(task, parent_task, turbo_tasks);
2723    }
2724
2725    fn create_transient_task(
2726        &self,
2727        task_type: TransientTaskType,
2728        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2729    ) -> TaskId {
2730        self.0.create_transient_task(task_type)
2731    }
2732
2733    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2734        self.0.dispose_root_task(task_id, turbo_tasks);
2735    }
2736
2737    fn task_statistics(&self) -> &TaskStatisticsApi {
2738        &self.0.task_statistics
2739    }
2740}
2741
2742enum DebugTraceTransientTask {
2743    Cached {
2744        task_name: &'static str,
2745        cell_type_id: Option<ValueTypeId>,
2746        cause_self: Option<Box<DebugTraceTransientTask>>,
2747        cause_args: Vec<DebugTraceTransientTask>,
2748    },
2749    /// This representation is used when this task is a duplicate of one previously shown
2750    Collapsed {
2751        task_name: &'static str,
2752        cell_type_id: Option<ValueTypeId>,
2753    },
2754    Uncached {
2755        cell_type_id: Option<ValueTypeId>,
2756    },
2757}
2758
2759impl DebugTraceTransientTask {
2760    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
2761        let indent = "    ".repeat(level);
2762        f.write_str(&indent)?;
2763
2764        fn fmt_cell_type_id(
2765            f: &mut fmt::Formatter<'_>,
2766            cell_type_id: Option<ValueTypeId>,
2767        ) -> fmt::Result {
2768            if let Some(ty) = cell_type_id {
2769                write!(f, " (read cell of type {})", get_value_type_global_name(ty))
2770            } else {
2771                Ok(())
2772            }
2773        }
2774
2775        // write the name and type
2776        match self {
2777            Self::Cached {
2778                task_name,
2779                cell_type_id,
2780                ..
2781            }
2782            | Self::Collapsed {
2783                task_name,
2784                cell_type_id,
2785                ..
2786            } => {
2787                f.write_str(task_name)?;
2788                fmt_cell_type_id(f, *cell_type_id)?;
2789                if matches!(self, Self::Collapsed { .. }) {
2790                    f.write_str(" (collapsed)")?;
2791                }
2792            }
2793            Self::Uncached { cell_type_id } => {
2794                f.write_str("unknown transient task")?;
2795                fmt_cell_type_id(f, *cell_type_id)?;
2796            }
2797        }
2798        f.write_char('\n')?;
2799
2800        // write any extra "cause" information we might have
2801        if let Self::Cached {
2802            cause_self,
2803            cause_args,
2804            ..
2805        } = self
2806        {
2807            if let Some(c) = cause_self {
2808                writeln!(f, "{indent}  self:")?;
2809                c.fmt_indented(f, level + 1)?;
2810            }
2811            if !cause_args.is_empty() {
2812                writeln!(f, "{indent}  args:")?;
2813                for c in cause_args {
2814                    c.fmt_indented(f, level + 1)?;
2815                }
2816            }
2817        }
2818        Ok(())
2819    }
2820}
2821
2822impl fmt::Display for DebugTraceTransientTask {
2823    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2824        self.fmt_indented(f, 0)
2825    }
2826}
2827
2828// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
2829fn far_future() -> Instant {
2830    // Roughly 30 years from now.
2831    // API does not provide a way to obtain max `Instant`
2832    // or convert specific date in the future to instant.
2833    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
2834    Instant::now() + Duration::from_secs(86400 * 365 * 30)
2835}