turbo_tasks_backend/backend/
mod.rs

1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6    borrow::Cow,
7    fmt::{self, Write},
8    future::Future,
9    hash::BuildHasherDefault,
10    mem::take,
11    pin::Pin,
12    sync::{
13        Arc,
14        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
15    },
16    thread::available_parallelism,
17};
18
19use anyhow::{Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use turbo_tasks::{
27    CellId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency, SessionId,
28    TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi,
29    ValueTypeId,
30    backend::{
31        Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
32        TransientTaskType, TurboTasksExecutionError, TypedCellContent,
33    },
34    event::{Event, EventListener},
35    message_queue::TimingEvent,
36    registry::{self, get_value_type_global_name},
37    task_statistics::TaskStatisticsApi,
38    trace::TraceRawVcs,
39    turbo_tasks,
40    util::IdFactoryWithReuse,
41};
42
43pub use self::{operation::AnyOperation, storage::TaskDataCategory};
44#[cfg(feature = "trace_task_dirty")]
45use crate::backend::operation::TaskDirtyCause;
46use crate::{
47    backend::{
48        operation::{
49            AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
50            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
51            Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
52            is_root_node, prepare_new_children,
53        },
54        storage::{
55            InnerStorageSnapshot, Storage, get, get_many, get_mut, get_mut_or_insert_with,
56            iter_many, remove,
57        },
58    },
59    backing_storage::BackingStorage,
60    data::{
61        ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
62        CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
63        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
64    },
65    utils::{
66        bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
67    },
68};
69
70const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
71const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) };
72
73const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
74
75struct SnapshotRequest {
76    snapshot_requested: bool,
77    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
78}
79
80impl SnapshotRequest {
81    fn new() -> Self {
82        Self {
83            snapshot_requested: false,
84            suspended_operations: FxHashSet::default(),
85        }
86    }
87}
88
89type TransientTaskOnce =
90    Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
91
92pub enum TransientTask {
93    /// A root task that will track dependencies and re-execute when
94    /// dependencies change. Task will eventually settle to the correct
95    /// execution.
96    ///
97    /// Always active. Automatically scheduled.
98    Root(TransientTaskRoot),
99
100    // TODO implement these strongly consistency
101    /// A single root task execution. It won't track dependencies.
102    /// Task will definitely include all invalidations that happened before the
103    /// start of the task. It may or may not include invalidations that
104    /// happened after that. It may see these invalidations partially
105    /// applied.
106    ///
107    /// Active until done. Automatically scheduled.
108    Once(TransientTaskOnce),
109}
110
111pub enum StorageMode {
112    /// Queries the storage for cache entries that don't exist locally.
113    ReadOnly,
114    /// Queries the storage for cache entries that don't exist locally.
115    /// Keeps a log of all changes and regularly push them to the backing storage.
116    ReadWrite,
117}
118
119pub struct BackendOptions {
120    /// Enables dependency tracking.
121    ///
122    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
123    /// forever.
124    pub dependency_tracking: bool,
125
126    /// Enables children tracking.
127    ///
128    /// When disabled: Strongly consistent reads are only eventually consistent. All tasks are
129    /// considered as active. Collectibles are disabled.
130    pub children_tracking: bool,
131
132    /// Enables active tracking.
133    ///
134    /// Automatically disabled when `dependency_tracking` is disabled.
135    ///
136    /// When disabled: All tasks are considered as active.
137    pub active_tracking: bool,
138
139    /// Enables the backing storage.
140    pub storage_mode: Option<StorageMode>,
141
142    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
143    pub small_preallocation: bool,
144}
145
146impl Default for BackendOptions {
147    fn default() -> Self {
148        Self {
149            dependency_tracking: true,
150            children_tracking: true,
151            active_tracking: true,
152            storage_mode: Some(StorageMode::ReadWrite),
153            small_preallocation: false,
154        }
155    }
156}
157
158pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
159
160type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
161
162struct TurboTasksBackendInner<B: BackingStorage> {
163    options: BackendOptions,
164
165    start_time: Instant,
166    session_id: SessionId,
167
168    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
169    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
170
171    persisted_task_cache_log: Option<TaskCacheLog>,
172    task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
173    transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
174
175    storage: Storage,
176
177    /// Number of executing operations + Highest bit is set when snapshot is
178    /// requested. When that bit is set, operations should pause until the
179    /// snapshot is completed. When the bit is set and in progress counter
180    /// reaches zero, `operations_completed_when_snapshot_requested` is
181    /// triggered.
182    in_progress_operations: AtomicUsize,
183
184    snapshot_request: Mutex<SnapshotRequest>,
185    /// Condition Variable that is triggered when `in_progress_operations`
186    /// reaches zero while snapshot is requested. All operations are either
187    /// completed or suspended.
188    operations_suspended: Condvar,
189    /// Condition Variable that is triggered when a snapshot is completed and
190    /// operations can continue.
191    snapshot_completed: Condvar,
192    /// The timestamp of the last started snapshot since [`Self::start_time`].
193    last_snapshot: AtomicU64,
194
195    stopping: AtomicBool,
196    stopping_event: Event,
197    idle_start_event: Event,
198    idle_end_event: Event,
199    #[cfg(feature = "verify_aggregation_graph")]
200    is_idle: AtomicBool,
201
202    task_statistics: TaskStatisticsApi,
203
204    backing_storage: B,
205
206    #[cfg(feature = "verify_aggregation_graph")]
207    root_tasks: Mutex<FxHashSet<TaskId>>,
208}
209
210impl<B: BackingStorage> TurboTasksBackend<B> {
211    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
212        Self(Arc::new(TurboTasksBackendInner::new(
213            options,
214            backing_storage,
215        )))
216    }
217
218    pub fn backing_storage(&self) -> &B {
219        &self.0.backing_storage
220    }
221}
222
223impl<B: BackingStorage> TurboTasksBackendInner<B> {
224    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
225        let shard_amount =
226            (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
227        let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
228        if !options.dependency_tracking {
229            options.active_tracking = false;
230        }
231        let small_preallocation = options.small_preallocation;
232        Self {
233            options,
234            start_time: Instant::now(),
235            session_id: backing_storage
236                .next_session_id()
237                .expect("Failed get session id"),
238            persisted_task_id_factory: IdFactoryWithReuse::new(
239                backing_storage
240                    .next_free_task_id()
241                    .expect("Failed to get task id"),
242                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
243            ),
244            transient_task_id_factory: IdFactoryWithReuse::new(
245                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
246                TaskId::MAX,
247            ),
248            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
249            task_cache: BiMap::new(),
250            transient_tasks: FxDashMap::default(),
251            storage: Storage::new(small_preallocation),
252            in_progress_operations: AtomicUsize::new(0),
253            snapshot_request: Mutex::new(SnapshotRequest::new()),
254            operations_suspended: Condvar::new(),
255            snapshot_completed: Condvar::new(),
256            last_snapshot: AtomicU64::new(0),
257            stopping: AtomicBool::new(false),
258            stopping_event: Event::new(|| "TurboTasksBackend::stopping_event".to_string()),
259            idle_start_event: Event::new(|| "TurboTasksBackend::idle_start_event".to_string()),
260            idle_end_event: Event::new(|| "TurboTasksBackend::idle_end_event".to_string()),
261            #[cfg(feature = "verify_aggregation_graph")]
262            is_idle: AtomicBool::new(false),
263            task_statistics: TaskStatisticsApi::default(),
264            backing_storage,
265            #[cfg(feature = "verify_aggregation_graph")]
266            root_tasks: Default::default(),
267        }
268    }
269
270    fn execute_context<'a>(
271        &'a self,
272        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
273    ) -> impl ExecuteContext<'a> {
274        ExecuteContextImpl::new(self, turbo_tasks)
275    }
276
277    fn session_id(&self) -> SessionId {
278        self.session_id
279    }
280
281    /// # Safety
282    ///
283    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
284    unsafe fn execute_context_with_tx<'e, 'tx>(
285        &'e self,
286        tx: Option<&'e B::ReadTransaction<'tx>>,
287        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
288    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
289    where
290        'tx: 'e,
291    {
292        // Safety: `tx` is from `self`.
293        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
294    }
295
296    fn suspending_requested(&self) -> bool {
297        self.should_persist()
298            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
299    }
300
301    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
302        #[cold]
303        fn operation_suspend_point_cold<B: BackingStorage>(
304            this: &TurboTasksBackendInner<B>,
305            suspend: impl FnOnce() -> AnyOperation,
306        ) {
307            let operation = Arc::new(suspend());
308            let mut snapshot_request = this.snapshot_request.lock();
309            if snapshot_request.snapshot_requested {
310                snapshot_request
311                    .suspended_operations
312                    .insert(operation.clone().into());
313                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
314                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
315                if value == SNAPSHOT_REQUESTED_BIT {
316                    this.operations_suspended.notify_all();
317                }
318                this.snapshot_completed
319                    .wait_while(&mut snapshot_request, |snapshot_request| {
320                        snapshot_request.snapshot_requested
321                    });
322                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
323                snapshot_request
324                    .suspended_operations
325                    .remove(&operation.into());
326            }
327        }
328
329        if self.suspending_requested() {
330            operation_suspend_point_cold(self, suspend);
331        }
332    }
333
334    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
335        if !self.should_persist() {
336            return OperationGuard { backend: None };
337        }
338        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
339        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
340            let mut snapshot_request = self.snapshot_request.lock();
341            if snapshot_request.snapshot_requested {
342                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
343                if value == SNAPSHOT_REQUESTED_BIT {
344                    self.operations_suspended.notify_all();
345                }
346                self.snapshot_completed
347                    .wait_while(&mut snapshot_request, |snapshot_request| {
348                        snapshot_request.snapshot_requested
349                    });
350                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
351            }
352        }
353        OperationGuard {
354            backend: Some(self),
355        }
356    }
357
358    fn should_persist(&self) -> bool {
359        matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
360    }
361
362    fn should_restore(&self) -> bool {
363        self.options.storage_mode.is_some()
364    }
365
366    fn should_track_dependencies(&self) -> bool {
367        self.options.dependency_tracking
368    }
369
370    fn should_track_activeness(&self) -> bool {
371        self.options.active_tracking
372    }
373
374    fn should_track_children(&self) -> bool {
375        self.options.children_tracking
376    }
377
378    fn track_cache_hit(&self, task_type: &CachedTaskType) {
379        self.task_statistics
380            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
381    }
382
383    fn track_cache_miss(&self, task_type: &CachedTaskType) {
384        self.task_statistics
385            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
386    }
387}
388
389pub(crate) struct OperationGuard<'a, B: BackingStorage> {
390    backend: Option<&'a TurboTasksBackendInner<B>>,
391}
392
393impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
394    fn drop(&mut self) {
395        if let Some(backend) = self.backend {
396            let fetch_sub = backend
397                .in_progress_operations
398                .fetch_sub(1, Ordering::AcqRel);
399            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
400                backend.operations_suspended.notify_all();
401            }
402        }
403    }
404}
405
406// Operations
407impl<B: BackingStorage> TurboTasksBackendInner<B> {
408    /// # Safety
409    ///
410    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
411    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
412        &'l self,
413        tx: Option<&'l B::ReadTransaction<'tx>>,
414        parent_task: TaskId,
415        child_task: TaskId,
416        is_immutable: bool,
417        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
418    ) {
419        operation::ConnectChildOperation::run(parent_task, child_task, is_immutable, unsafe {
420            self.execute_context_with_tx(tx, turbo_tasks)
421        });
422    }
423
424    fn connect_child(
425        &self,
426        parent_task: TaskId,
427        child_task: TaskId,
428        is_immutable: bool,
429        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
430    ) {
431        operation::ConnectChildOperation::run(
432            parent_task,
433            child_task,
434            is_immutable,
435            self.execute_context(turbo_tasks),
436        );
437    }
438
439    fn try_read_task_output(
440        &self,
441        task_id: TaskId,
442        reader: Option<TaskId>,
443        consistency: ReadConsistency,
444        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
445    ) -> Result<Result<RawVc, EventListener>> {
446        if let Some(reader) = reader {
447            self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
448        }
449
450        let mut ctx = self.execute_context(turbo_tasks);
451        let mut task = ctx.task(task_id, TaskDataCategory::All);
452
453        fn listen_to_done_event<B: BackingStorage>(
454            this: &TurboTasksBackendInner<B>,
455            reader: Option<TaskId>,
456            done_event: &Event,
457        ) -> EventListener {
458            let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
459            done_event.listen_with_note(move || {
460                if let Some(reader_desc) = reader_desc.as_ref() {
461                    format!("try_read_task_output from {}", reader_desc())
462                } else {
463                    "try_read_task_output (untracked)".to_string()
464                }
465            })
466        }
467
468        fn check_in_progress<B: BackingStorage>(
469            this: &TurboTasksBackendInner<B>,
470            task: &impl TaskGuard,
471            reader: Option<TaskId>,
472            ctx: &impl ExecuteContext<'_>,
473        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
474        {
475            match get!(task, InProgress) {
476                Some(InProgressState::Scheduled { done_event, .. }) => {
477                    Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
478                }
479                Some(InProgressState::InProgress(box InProgressStateInner {
480                    done,
481                    done_event,
482                    ..
483                })) => {
484                    if !*done {
485                        Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
486                    } else {
487                        None
488                    }
489                }
490                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
491                    "{} was canceled",
492                    ctx.get_task_description(task.id())
493                ))),
494                None => None,
495            }
496        }
497
498        if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
499            // Ensure it's an root node
500            loop {
501                let aggregation_number = get_aggregation_number(&task);
502                if is_root_node(aggregation_number) {
503                    break;
504                }
505                drop(task);
506                {
507                    let _span = tracing::trace_span!(
508                        "make root node for strongly consistent read",
509                        %task_id
510                    )
511                    .entered();
512                    AggregationUpdateQueue::run(
513                        AggregationUpdateJob::UpdateAggregationNumber {
514                            task_id,
515                            base_aggregation_number: u32::MAX,
516                            distance: None,
517                        },
518                        &mut ctx,
519                    );
520                }
521                task = ctx.task(task_id, TaskDataCategory::All);
522            }
523
524            let is_dirty =
525                get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
526
527            // Check the dirty count of the root node
528            let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
529                .cloned()
530                .unwrap_or_default()
531                .get(self.session_id);
532            if dirty_tasks > 0 || is_dirty {
533                let root = get_mut!(task, Activeness);
534                let mut task_ids_to_schedule: Vec<_> = Vec::new();
535                // When there are dirty task, subscribe to the all_clean_event
536                let root = if let Some(root) = root {
537                    // This makes sure all tasks stay active and this task won't stale.
538                    // active_until_clean is automatically removed when this
539                    // task is clean.
540                    root.set_active_until_clean();
541                    root
542                } else {
543                    // If we don't have a root state, add one. This also makes sure all tasks stay
544                    // active and this task won't stale. active_until_clean
545                    // is automatically removed when this task is clean.
546                    get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
547                        .set_active_until_clean();
548                    if ctx.should_track_activeness() {
549                        // A newly added Activeness need to make sure to schedule the tasks
550                        task_ids_to_schedule = get_many!(
551                            task,
552                            AggregatedDirtyContainer {
553                                task
554                            } count if count.get(self.session_id) > 0 => {
555                                task
556                            }
557                        );
558                        task_ids_to_schedule.push(task_id);
559                    }
560                    get!(task, Activeness).unwrap()
561                };
562                let listener = root.all_clean_event.listen_with_note(move || {
563                    format!("try_read_task_output (strongly consistent) from {reader:?}")
564                });
565                drop(task);
566                if !task_ids_to_schedule.is_empty() {
567                    let mut queue = AggregationUpdateQueue::new();
568                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
569                    queue.execute(&mut ctx);
570                }
571
572                return Ok(Err(listener));
573            }
574        }
575
576        if let Some(value) = check_in_progress(self, &task, reader, &ctx) {
577            return value;
578        }
579
580        if let Some(output) = get!(task, Output) {
581            let result = match output {
582                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
583                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
584                OutputValue::Error(error) => {
585                    let err: anyhow::Error = error.clone().into();
586                    Err(err.context(format!(
587                        "Execution of {} failed",
588                        ctx.get_task_description(task_id)
589                    )))
590                }
591            };
592            if self.should_track_dependencies()
593                && let Some(reader) = reader
594            {
595                let _ = task.add(CachedDataItem::OutputDependent {
596                    task: reader,
597                    value: (),
598                });
599                drop(task);
600
601                let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
602                if reader_task
603                    .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
604                    .is_none()
605                {
606                    let _ = reader_task.add(CachedDataItem::OutputDependency {
607                        target: task_id,
608                        value: (),
609                    });
610                }
611            }
612
613            return result;
614        }
615
616        let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
617        let note = move || {
618            if let Some(reader_desc) = reader_desc.as_ref() {
619                format!("try_read_task_output (recompute) from {}", reader_desc())
620            } else {
621                "try_read_task_output (recompute, untracked)".to_string()
622            }
623        };
624
625        // Output doesn't exist. We need to schedule the task to compute it.
626        let (item, listener) = CachedDataItem::new_scheduled_with_listener(
627            TaskExecutionReason::OutputNotAvailable,
628            self.get_task_desc_fn(task_id),
629            note,
630        );
631        // It's not possible that the task is InProgress at this point. If it is InProgress {
632        // done: true } it must have Output and would early return.
633        task.add_new(item);
634        turbo_tasks.schedule(task_id);
635
636        Ok(Err(listener))
637    }
638
639    fn try_read_task_cell(
640        &self,
641        task_id: TaskId,
642        reader: Option<TaskId>,
643        cell: CellId,
644        options: ReadCellOptions,
645        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
646    ) -> Result<Result<TypedCellContent, EventListener>> {
647        if let Some(reader) = reader {
648            self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
649        }
650
651        fn add_cell_dependency<B: BackingStorage>(
652            backend: &TurboTasksBackendInner<B>,
653            mut task: impl TaskGuard,
654            reader: Option<TaskId>,
655            cell: CellId,
656            task_id: TaskId,
657            ctx: &mut impl ExecuteContext<'_>,
658        ) {
659            if !backend.should_track_dependencies() {
660                return;
661            }
662            if let Some(reader) = reader {
663                if reader == task_id {
664                    // We never want to have a dependency on ourselves, otherwise we end up in a
665                    // loop of re-executing the same task.
666                    return;
667                }
668                let _ = task.add(CachedDataItem::CellDependent {
669                    cell,
670                    task: reader,
671                    value: (),
672                });
673                drop(task);
674
675                let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
676                let target = CellRef {
677                    task: task_id,
678                    cell,
679                };
680                if reader_task
681                    .remove(&CachedDataItemKey::OutdatedCellDependency { target })
682                    .is_none()
683                {
684                    let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
685                }
686            }
687        }
688
689        let mut ctx = self.execute_context(turbo_tasks);
690        let mut task = ctx.task(task_id, TaskDataCategory::Data);
691        let content = if options.final_read_hint {
692            remove!(task, CellData { cell })
693        } else if let Some(content) = get!(task, CellData { cell }) {
694            let content = content.clone();
695            Some(content)
696        } else {
697            None
698        };
699        if let Some(content) = content {
700            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
701            return Ok(Ok(TypedCellContent(
702                cell.type_id,
703                CellContent(Some(content.reference)),
704            )));
705        }
706
707        let in_progress = get!(task, InProgress);
708        if matches!(
709            in_progress,
710            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
711        ) {
712            return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
713        }
714        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
715        let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));
716
717        // Check cell index range (cell might not exist at all)
718        let max_id = get!(
719            task,
720            CellTypeMaxIndex {
721                cell_type: cell.type_id
722            }
723        )
724        .copied();
725        let Some(max_id) = max_id else {
726            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
727            bail!(
728                "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
729                ctx.get_task_description(task_id)
730            );
731        };
732        if cell.index >= max_id {
733            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
734            bail!(
735                "Cell {cell:?} no longer exists in task {} (index out of bounds)",
736                ctx.get_task_description(task_id)
737            );
738        }
739
740        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
741        // task the get the cell content.
742
743        // Listen to the cell and potentially schedule the task
744        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
745        if !new_listener {
746            return Ok(Err(listener));
747        }
748
749        let _span = tracing::trace_span!(
750            "recomputation",
751            cell_type = registry::get_value_type_global_name(cell.type_id),
752            cell_index = cell.index
753        )
754        .entered();
755
756        // Schedule the task, if not already scheduled
757        if is_cancelled {
758            bail!("{} was canceled", ctx.get_task_description(task_id));
759        } else if !is_scheduled
760            && task.add(CachedDataItem::new_scheduled(
761                TaskExecutionReason::CellNotAvailable,
762                self.get_task_desc_fn(task_id),
763            ))
764        {
765            turbo_tasks.schedule(task_id);
766        }
767
768        Ok(Err(listener))
769    }
770
771    fn listen_to_cell(
772        &self,
773        task: &mut impl TaskGuard,
774        task_id: TaskId,
775        reader: Option<TaskId>,
776        cell: CellId,
777    ) -> (EventListener, bool) {
778        let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
779        let note = move || {
780            if let Some(reader_desc) = reader_desc.as_ref() {
781                format!("try_read_task_cell (in progress) from {}", reader_desc())
782            } else {
783                "try_read_task_cell (in progress, untracked)".to_string()
784            }
785        };
786        if let Some(in_progress) = get!(task, InProgressCell { cell }) {
787            // Someone else is already computing the cell
788            let listener = in_progress.event.listen_with_note(note);
789            return (listener, false);
790        }
791        let in_progress = InProgressCellState::new(task_id, cell);
792        let listener = in_progress.event.listen_with_note(note);
793        task.add_new(CachedDataItem::InProgressCell {
794            cell,
795            value: in_progress,
796        });
797        (listener, true)
798    }
799
800    fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
801        if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
802            return Some(task_type);
803        }
804        if self.should_restore()
805            && !task_id.is_transient()
806            && let Some(task_type) = unsafe {
807                self.backing_storage
808                    .reverse_lookup_task_cache(None, task_id)
809                    .expect("Failed to lookup task type")
810            }
811        {
812            let _ = self.task_cache.try_insert(task_type.clone(), task_id);
813            return Some(task_type);
814        }
815        None
816    }
817
818    // TODO feature flag that for hanging detection only
819    fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
820        let task_type = self.lookup_task_type(task_id);
821        move || {
822            task_type.as_ref().map_or_else(
823                || format!("{task_id:?} transient"),
824                |task_type| format!("{task_id:?} {task_type}"),
825            )
826        }
827    }
828
829    fn snapshot(&self) -> Option<(Instant, bool)> {
830        let start = Instant::now();
831        debug_assert!(self.should_persist());
832        let mut snapshot_request = self.snapshot_request.lock();
833        snapshot_request.snapshot_requested = true;
834        let active_operations = self
835            .in_progress_operations
836            .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
837        if active_operations != 0 {
838            self.operations_suspended
839                .wait_while(&mut snapshot_request, |_| {
840                    self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
841                });
842        }
843        let suspended_operations = snapshot_request
844            .suspended_operations
845            .iter()
846            .map(|op| op.arc().clone())
847            .collect::<Vec<_>>();
848        drop(snapshot_request);
849        let mut persisted_task_cache_log = self
850            .persisted_task_cache_log
851            .as_ref()
852            .map(|l| l.take(|i| i))
853            .unwrap_or_default();
854        self.storage.start_snapshot();
855        let mut snapshot_request = self.snapshot_request.lock();
856        snapshot_request.snapshot_requested = false;
857        self.in_progress_operations
858            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
859        self.snapshot_completed.notify_all();
860        let snapshot_time = Instant::now();
861        drop(snapshot_request);
862
863        let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
864            if task_id.is_transient() {
865                return (None, None);
866            }
867            let len = inner.len();
868
869            let meta_restored = inner.state().meta_restored();
870            let data_restored = inner.state().data_restored();
871
872            let mut meta = meta_restored.then(|| Vec::with_capacity(len));
873            let mut data = data_restored.then(|| Vec::with_capacity(len));
874            for (key, value) in inner.iter_all() {
875                if key.is_persistent() && value.is_persistent() {
876                    match key.category() {
877                        TaskDataCategory::Meta => {
878                            if let Some(meta) = &mut meta {
879                                meta.push(CachedDataItem::from_key_and_value_ref(key, value))
880                            }
881                        }
882                        TaskDataCategory::Data => {
883                            if let Some(data) = &mut data {
884                                data.push(CachedDataItem::from_key_and_value_ref(key, value))
885                            }
886                        }
887                        _ => {}
888                    }
889                }
890            }
891
892            (meta, data)
893        };
894        let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
895            (
896                task_id,
897                meta.map(|d| B::serialize(task_id, &d)),
898                data.map(|d| B::serialize(task_id, &d)),
899            )
900        };
901        let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
902            if task_id.is_transient() {
903                return (task_id, None, None);
904            }
905            let len = inner.len();
906            let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
907            let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
908            for (key, value) in inner.iter_all() {
909                if key.is_persistent() && value.is_persistent() {
910                    match key.category() {
911                        TaskDataCategory::Meta => {
912                            if let Some(meta) = &mut meta {
913                                meta.push(CachedDataItem::from_key_and_value_ref(key, value));
914                            }
915                        }
916                        TaskDataCategory::Data => {
917                            if let Some(data) = &mut data {
918                                data.push(CachedDataItem::from_key_and_value_ref(key, value));
919                            }
920                        }
921                        _ => {}
922                    }
923                }
924            }
925            (
926                task_id,
927                meta.map(|meta| B::serialize(task_id, &meta)),
928                data.map(|data| B::serialize(task_id, &data)),
929            )
930        };
931
932        let snapshot = {
933            let _span = tracing::trace_span!("take snapshot");
934            self.storage
935                .take_snapshot(&preprocess, &process, &process_snapshot)
936        };
937
938        #[cfg(feature = "print_cache_item_size")]
939        #[derive(Default)]
940        struct TaskCacheStats {
941            data: usize,
942            data_count: usize,
943            meta: usize,
944            meta_count: usize,
945        }
946        #[cfg(feature = "print_cache_item_size")]
947        impl TaskCacheStats {
948            fn add_data(&mut self, len: usize) {
949                self.data += len;
950                self.data_count += 1;
951            }
952
953            fn add_meta(&mut self, len: usize) {
954                self.meta += len;
955                self.meta_count += 1;
956            }
957        }
958        #[cfg(feature = "print_cache_item_size")]
959        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
960            Mutex::new(FxHashMap::default());
961
962        let task_snapshots = snapshot
963            .into_iter()
964            .filter_map(|iter| {
965                let mut iter = iter
966                    .filter_map(
967                        |(task_id, meta, data): (
968                            _,
969                            Option<Result<SmallVec<_>>>,
970                            Option<Result<SmallVec<_>>>,
971                        )| {
972                            let meta = match meta {
973                                Some(Ok(meta)) => {
974                                    #[cfg(feature = "print_cache_item_size")]
975                                    task_cache_stats
976                                        .lock()
977                                        .entry(self.get_task_description(task_id))
978                                        .or_default()
979                                        .add_meta(meta.len());
980                                    Some(meta)
981                                }
982                                None => None,
983                                Some(Err(err)) => {
984                                    println!(
985                                        "Serializing task {} failed (meta): {:?}",
986                                        self.get_task_description(task_id),
987                                        err
988                                    );
989                                    None
990                                }
991                            };
992                            let data = match data {
993                                Some(Ok(data)) => {
994                                    #[cfg(feature = "print_cache_item_size")]
995                                    task_cache_stats
996                                        .lock()
997                                        .entry(self.get_task_description(task_id))
998                                        .or_default()
999                                        .add_data(data.len());
1000                                    Some(data)
1001                                }
1002                                None => None,
1003                                Some(Err(err)) => {
1004                                    println!(
1005                                        "Serializing task {} failed (data): {:?}",
1006                                        self.get_task_description(task_id),
1007                                        err
1008                                    );
1009                                    None
1010                                }
1011                            };
1012                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1013                        },
1014                    )
1015                    .peekable();
1016                iter.peek().is_some().then_some(iter)
1017            })
1018            .collect::<Vec<_>>();
1019
1020        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1021
1022        let mut new_items = false;
1023
1024        if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
1025            new_items = true;
1026            if let Err(err) = self.backing_storage.save_snapshot(
1027                self.session_id,
1028                suspended_operations,
1029                persisted_task_cache_log,
1030                task_snapshots,
1031            ) {
1032                println!("Persisting failed: {err:?}");
1033                return None;
1034            }
1035            #[cfg(feature = "print_cache_item_size")]
1036            {
1037                let mut task_cache_stats = task_cache_stats
1038                    .into_inner()
1039                    .into_iter()
1040                    .collect::<Vec<_>>();
1041                if !task_cache_stats.is_empty() {
1042                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1043                        (stats_b.data + stats_b.meta, key_b)
1044                            .cmp(&(stats_a.data + stats_a.meta, key_a))
1045                    });
1046                    println!("Task cache stats:");
1047                    for (task_desc, stats) in task_cache_stats {
1048                        use std::ops::Div;
1049
1050                        use turbo_tasks::util::FormatBytes;
1051
1052                        println!(
1053                            "  {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1054                            FormatBytes(stats.data + stats.meta),
1055                            FormatBytes(stats.meta),
1056                            stats.meta_count,
1057                            FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1058                            FormatBytes(stats.data),
1059                            stats.data_count,
1060                            FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1061                        );
1062                    }
1063                }
1064            }
1065        }
1066
1067        if new_items {
1068            let elapsed = start.elapsed();
1069            turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1070                "Finished writing to persistent cache".to_string(),
1071                elapsed,
1072            )));
1073        }
1074
1075        Some((snapshot_time, new_items))
1076    }
1077
1078    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1079        if self.should_restore() {
1080            // Continue all uncompleted operations
1081            // They can't be interrupted by a snapshot since the snapshotting job has not been
1082            // scheduled yet.
1083            let uncompleted_operations = self
1084                .backing_storage
1085                .uncompleted_operations()
1086                .expect("Failed to get uncompleted operations");
1087            if !uncompleted_operations.is_empty() {
1088                let mut ctx = self.execute_context(turbo_tasks);
1089                for op in uncompleted_operations {
1090                    op.execute(&mut ctx);
1091                }
1092            }
1093        }
1094
1095        if self.should_persist() {
1096            // Schedule the snapshot job
1097            turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT);
1098        }
1099    }
1100
1101    fn stopping(&self) {
1102        self.stopping.store(true, Ordering::Release);
1103        self.stopping_event.notify(usize::MAX);
1104    }
1105
1106    #[allow(unused_variables)]
1107    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1108        #[cfg(feature = "verify_aggregation_graph")]
1109        {
1110            self.is_idle.store(false, Ordering::Release);
1111            self.verify_aggregation_graph(turbo_tasks, false);
1112        }
1113        if let Err(err) = self.backing_storage.shutdown() {
1114            println!("Shutting down failed: {err}");
1115        }
1116    }
1117
1118    #[allow(unused_variables)]
1119    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1120        self.idle_start_event.notify(usize::MAX);
1121
1122        #[cfg(feature = "verify_aggregation_graph")]
1123        {
1124            use tokio::select;
1125
1126            self.is_idle.store(true, Ordering::Release);
1127            let this = self.clone();
1128            let turbo_tasks = turbo_tasks.pin();
1129            tokio::task::spawn(async move {
1130                select! {
1131                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1132                        // do nothing
1133                    }
1134                    _ = this.idle_end_event.listen() => {
1135                        return;
1136                    }
1137                }
1138                if !this.is_idle.load(Ordering::Relaxed) {
1139                    return;
1140                }
1141                this.verify_aggregation_graph(&*turbo_tasks, true);
1142            });
1143        }
1144    }
1145
1146    fn idle_end(&self) {
1147        #[cfg(feature = "verify_aggregation_graph")]
1148        self.is_idle.store(false, Ordering::Release);
1149        self.idle_end_event.notify(usize::MAX);
1150    }
1151
1152    fn get_or_create_persistent_task(
1153        &self,
1154        task_type: CachedTaskType,
1155        parent_task: TaskId,
1156        is_immutable: bool,
1157        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1158    ) -> TaskId {
1159        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1160            self.track_cache_hit(&task_type);
1161            self.connect_child(parent_task, task_id, is_immutable, turbo_tasks);
1162            return task_id;
1163        }
1164
1165        self.track_cache_miss(&task_type);
1166        let tx = self
1167            .should_restore()
1168            .then(|| self.backing_storage.start_read_transaction())
1169            .flatten();
1170        let task_id = {
1171            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1172            if let Some(task_id) = unsafe {
1173                self.backing_storage
1174                    .forward_lookup_task_cache(tx.as_ref(), &task_type)
1175                    .expect("Failed to lookup task id")
1176            } {
1177                let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1178                task_id
1179            } else {
1180                let task_type = Arc::new(task_type);
1181                let task_id = self.persisted_task_id_factory.get();
1182                let task_id = if let Err(existing_task_id) =
1183                    self.task_cache.try_insert(task_type.clone(), task_id)
1184                {
1185                    // Safety: We just created the id and failed to insert it.
1186                    unsafe {
1187                        self.persisted_task_id_factory.reuse(task_id);
1188                    }
1189                    existing_task_id
1190                } else {
1191                    task_id
1192                };
1193                if let Some(log) = &self.persisted_task_cache_log {
1194                    log.lock(task_id).push((task_type, task_id));
1195                }
1196                task_id
1197            }
1198        };
1199
1200        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1201        unsafe {
1202            self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, is_immutable, turbo_tasks)
1203        };
1204
1205        task_id
1206    }
1207
1208    fn get_or_create_transient_task(
1209        &self,
1210        task_type: CachedTaskType,
1211        parent_task: TaskId,
1212        is_immutable: bool,
1213        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1214    ) -> TaskId {
1215        if !parent_task.is_transient() {
1216            self.panic_persistent_calling_transient(
1217                self.lookup_task_type(parent_task).as_deref(),
1218                Some(&task_type),
1219                /* cell_id */ None,
1220            );
1221        }
1222        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1223            self.track_cache_hit(&task_type);
1224            self.connect_child(parent_task, task_id, is_immutable, turbo_tasks);
1225            return task_id;
1226        }
1227
1228        self.track_cache_miss(&task_type);
1229        let task_type = Arc::new(task_type);
1230        let task_id = self.transient_task_id_factory.get();
1231        if let Err(existing_task_id) = self.task_cache.try_insert(task_type, task_id) {
1232            // Safety: We just created the id and failed to insert it.
1233            unsafe {
1234                self.transient_task_id_factory.reuse(task_id);
1235            }
1236            self.connect_child(parent_task, existing_task_id, is_immutable, turbo_tasks);
1237            return existing_task_id;
1238        }
1239
1240        self.connect_child(parent_task, task_id, is_immutable, turbo_tasks);
1241
1242        task_id
1243    }
1244
1245    /// Generate an object that implements [`fmt::Display`] explaining why the given
1246    /// [`CachedTaskType`] is transient.
1247    fn debug_trace_transient_task(
1248        &self,
1249        task_type: &CachedTaskType,
1250        cell_id: Option<CellId>,
1251    ) -> DebugTraceTransientTask {
1252        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1253        // from tracing the same task many times, so use a visited_set
1254        fn inner_id(
1255            backend: &TurboTasksBackendInner<impl BackingStorage>,
1256            task_id: TaskId,
1257            cell_type_id: Option<ValueTypeId>,
1258            visited_set: &mut FxHashSet<TaskId>,
1259        ) -> DebugTraceTransientTask {
1260            if let Some(task_type) = backend.lookup_task_type(task_id) {
1261                if visited_set.contains(&task_id) {
1262                    let task_name = task_type.get_name();
1263                    DebugTraceTransientTask::Collapsed {
1264                        task_name,
1265                        cell_type_id,
1266                    }
1267                } else {
1268                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1269                }
1270            } else {
1271                DebugTraceTransientTask::Uncached { cell_type_id }
1272            }
1273        }
1274        fn inner_cached(
1275            backend: &TurboTasksBackendInner<impl BackingStorage>,
1276            task_type: &CachedTaskType,
1277            cell_type_id: Option<ValueTypeId>,
1278            visited_set: &mut FxHashSet<TaskId>,
1279        ) -> DebugTraceTransientTask {
1280            let task_name = task_type.get_name();
1281
1282            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1283                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1284                    // `task_id` should never be `None` at this point, as that would imply a
1285                    // non-local task is returning a local `Vc`...
1286                    // Just ignore if it happens, as we're likely already panicking.
1287                    return None;
1288                };
1289                if task_id.is_transient() {
1290                    Some(Box::new(inner_id(
1291                        backend,
1292                        task_id,
1293                        cause_self_raw_vc.try_get_type_id(),
1294                        visited_set,
1295                    )))
1296                } else {
1297                    None
1298                }
1299            });
1300            let cause_args = task_type
1301                .arg
1302                .get_raw_vcs()
1303                .into_iter()
1304                .filter_map(|raw_vc| {
1305                    let Some(task_id) = raw_vc.try_get_task_id() else {
1306                        // `task_id` should never be `None` (see comment above)
1307                        return None;
1308                    };
1309                    if !task_id.is_transient() {
1310                        return None;
1311                    }
1312                    Some((task_id, raw_vc.try_get_type_id()))
1313                })
1314                .collect::<IndexSet<_>>() // dedupe
1315                .into_iter()
1316                .map(|(task_id, cell_type_id)| {
1317                    inner_id(backend, task_id, cell_type_id, visited_set)
1318                })
1319                .collect();
1320
1321            DebugTraceTransientTask::Cached {
1322                task_name,
1323                cell_type_id,
1324                cause_self,
1325                cause_args,
1326            }
1327        }
1328        inner_cached(
1329            self,
1330            task_type,
1331            cell_id.map(|c| c.type_id),
1332            &mut FxHashSet::default(),
1333        )
1334    }
1335
1336    fn invalidate_task(
1337        &self,
1338        task_id: TaskId,
1339        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1340    ) {
1341        if !self.should_track_dependencies() {
1342            panic!("Dependency tracking is disabled so invalidation is not allowed");
1343        }
1344        operation::InvalidateOperation::run(
1345            smallvec![task_id],
1346            #[cfg(feature = "trace_task_dirty")]
1347            TaskDirtyCause::Invalidator,
1348            self.execute_context(turbo_tasks),
1349        );
1350    }
1351
1352    fn invalidate_tasks(
1353        &self,
1354        tasks: &[TaskId],
1355        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1356    ) {
1357        if !self.should_track_dependencies() {
1358            panic!("Dependency tracking is disabled so invalidation is not allowed");
1359        }
1360        operation::InvalidateOperation::run(
1361            tasks.iter().copied().collect(),
1362            #[cfg(feature = "trace_task_dirty")]
1363            TaskDirtyCause::Unknown,
1364            self.execute_context(turbo_tasks),
1365        );
1366    }
1367
1368    fn invalidate_tasks_set(
1369        &self,
1370        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1371        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1372    ) {
1373        if !self.should_track_dependencies() {
1374            panic!("Dependency tracking is disabled so invalidation is not allowed");
1375        }
1376        operation::InvalidateOperation::run(
1377            tasks.iter().copied().collect(),
1378            #[cfg(feature = "trace_task_dirty")]
1379            TaskDirtyCause::Unknown,
1380            self.execute_context(turbo_tasks),
1381        );
1382    }
1383
1384    fn invalidate_serialization(
1385        &self,
1386        task_id: TaskId,
1387        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1388    ) {
1389        if task_id.is_transient() {
1390            return;
1391        }
1392        let mut ctx = self.execute_context(turbo_tasks);
1393        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1394        task.invalidate_serialization();
1395    }
1396
1397    fn get_task_description(&self, task_id: TaskId) -> String {
1398        self.lookup_task_type(task_id).map_or_else(
1399            || format!("{task_id:?} transient"),
1400            |task_type| task_type.to_string(),
1401        )
1402    }
1403
1404    fn task_execution_canceled(
1405        &self,
1406        task_id: TaskId,
1407        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1408    ) {
1409        let mut ctx = self.execute_context(turbo_tasks);
1410        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1411        if let Some(in_progress) = remove!(task, InProgress) {
1412            match in_progress {
1413                InProgressState::Scheduled {
1414                    done_event,
1415                    reason: _,
1416                } => done_event.notify(usize::MAX),
1417                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1418                    done_event.notify(usize::MAX)
1419                }
1420                InProgressState::Canceled => {}
1421            }
1422        }
1423        task.add_new(CachedDataItem::InProgress {
1424            value: InProgressState::Canceled,
1425        });
1426    }
1427
1428    fn try_start_task_execution(
1429        &self,
1430        task_id: TaskId,
1431        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1432    ) -> Option<TaskExecutionSpec<'_>> {
1433        enum TaskType {
1434            Cached(Arc<CachedTaskType>),
1435            Transient(Arc<TransientTask>),
1436        }
1437        let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1438            (TaskType::Cached(task_type), false)
1439        } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1440            (
1441                TaskType::Transient(task_type.clone()),
1442                matches!(**task_type, TransientTask::Once(_)),
1443            )
1444        } else {
1445            return None;
1446        };
1447        let execution_reason;
1448        {
1449            let mut ctx = self.execute_context(turbo_tasks);
1450            let mut task = ctx.task(task_id, TaskDataCategory::All);
1451            let in_progress = remove!(task, InProgress)?;
1452            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1453                task.add_new(CachedDataItem::InProgress { value: in_progress });
1454                return None;
1455            };
1456            execution_reason = reason;
1457            task.add_new(CachedDataItem::InProgress {
1458                value: InProgressState::InProgress(Box::new(InProgressStateInner {
1459                    stale: false,
1460                    once_task,
1461                    done_event,
1462                    session_dependent: false,
1463                    marked_as_completed: false,
1464                    done: false,
1465                    new_children: Default::default(),
1466                })),
1467            });
1468
1469            if self.should_track_children() {
1470                // Make all current collectibles outdated (remove left-over outdated collectibles)
1471                enum Collectible {
1472                    Current(CollectibleRef, i32),
1473                    Outdated(CollectibleRef),
1474                }
1475                let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1476                    .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1477                    .collect::<Vec<_>>();
1478                for collectible in collectibles {
1479                    match collectible {
1480                        Collectible::Current(collectible, value) => {
1481                            let _ = task
1482                                .insert(CachedDataItem::OutdatedCollectible { collectible, value });
1483                        }
1484                        Collectible::Outdated(collectible) => {
1485                            if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1486                                task.remove(&CachedDataItemKey::OutdatedCollectible {
1487                                    collectible,
1488                                });
1489                            }
1490                        }
1491                    }
1492                }
1493            }
1494
1495            if self.should_track_dependencies() {
1496                // Make all dependencies outdated
1497                enum Dep {
1498                    CurrentCell(CellRef),
1499                    CurrentOutput(TaskId),
1500                    OutdatedCell(CellRef),
1501                    OutdatedOutput(TaskId),
1502                }
1503                let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1504                    .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1505                    .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1506                    .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1507                    .collect::<Vec<_>>();
1508                for dep in dependencies {
1509                    match dep {
1510                        Dep::CurrentCell(cell) => {
1511                            let _ = task.add(CachedDataItem::OutdatedCellDependency {
1512                                target: cell,
1513                                value: (),
1514                            });
1515                        }
1516                        Dep::CurrentOutput(output) => {
1517                            let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1518                                target: output,
1519                                value: (),
1520                            });
1521                        }
1522                        Dep::OutdatedCell(cell) => {
1523                            if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1524                                task.remove(&CachedDataItemKey::OutdatedCellDependency {
1525                                    target: cell,
1526                                });
1527                            }
1528                        }
1529                        Dep::OutdatedOutput(output) => {
1530                            if !task
1531                                .has_key(&CachedDataItemKey::OutputDependency { target: output })
1532                            {
1533                                task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1534                                    target: output,
1535                                });
1536                            }
1537                        }
1538                    }
1539                }
1540            }
1541        }
1542
1543        let (span, future) = match task_type {
1544            TaskType::Cached(task_type) => {
1545                let CachedTaskType {
1546                    native_fn,
1547                    this,
1548                    arg,
1549                } = &*task_type;
1550                (
1551                    native_fn.span(task_id.persistence(), execution_reason),
1552                    native_fn.execute(*this, &**arg),
1553                )
1554            }
1555            TaskType::Transient(task_type) => {
1556                let task_type = task_type.clone();
1557                let span = tracing::trace_span!("turbo_tasks::root_task");
1558                let future = match &*task_type {
1559                    TransientTask::Root(f) => f(),
1560                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1561                };
1562                (span, future)
1563            }
1564        };
1565        Some(TaskExecutionSpec { future, span })
1566    }
1567
1568    fn task_execution_result(
1569        &self,
1570        task_id: TaskId,
1571        result: Result<RawVc, TurboTasksExecutionError>,
1572        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1573    ) {
1574        operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
1575    }
1576
1577    fn task_execution_completed(
1578        &self,
1579        task_id: TaskId,
1580        _duration: Duration,
1581        _memory_usage: usize,
1582        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1583        stateful: bool,
1584        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1585    ) -> bool {
1586        // Task completion is a 4 step process:
1587        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1588        //    aggregation number of the task and the new children.
1589        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1590        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1591        // 4. Shrink the task memory to reduce footprint of the task.
1592
1593        // Due to persistance it is possible that the process is cancelled after any step. This is
1594        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1595        // in-memory representation.
1596
1597        // The task might be invalidated during this process, so we need to change the stale flag
1598        // at the start of every step.
1599
1600        let _span = tracing::trace_span!("task execution completed").entered();
1601        let mut ctx = self.execute_context(turbo_tasks);
1602
1603        //// STEP 1 ////
1604
1605        let mut task = ctx.task(task_id, TaskDataCategory::All);
1606        let Some(in_progress) = get_mut!(task, InProgress) else {
1607            panic!("Task execution completed, but task is not in progress: {task:#?}");
1608        };
1609        let &mut InProgressState::InProgress(box InProgressStateInner {
1610            stale,
1611            ref mut done,
1612            ref done_event,
1613            ref mut new_children,
1614            ..
1615        }) = in_progress
1616        else {
1617            panic!("Task execution completed, but task is not in progress: {task:#?}");
1618        };
1619
1620        // If the task is stale, reschedule it
1621        if stale {
1622            let Some(InProgressState::InProgress(box InProgressStateInner {
1623                done_event,
1624                mut new_children,
1625                ..
1626            })) = remove!(task, InProgress)
1627            else {
1628                unreachable!();
1629            };
1630            task.add_new(CachedDataItem::InProgress {
1631                value: InProgressState::Scheduled {
1632                    done_event,
1633                    reason: TaskExecutionReason::Stale,
1634                },
1635            });
1636            // Remove old children from new_children to leave only the children that had their
1637            // active count increased
1638            for task in iter_many!(task, Child { task } => task) {
1639                new_children.remove(&task);
1640            }
1641            drop(task);
1642
1643            // We need to undo the active count increase for the children since we throw away the
1644            // new_children list now.
1645            AggregationUpdateQueue::run(
1646                AggregationUpdateJob::DecreaseActiveCounts {
1647                    task_ids: new_children.into_keys().collect(),
1648                },
1649                &mut ctx,
1650            );
1651            return true;
1652        }
1653
1654        // mark the task as completed, so dependent tasks can continue working
1655        *done = true;
1656        done_event.notify(usize::MAX);
1657
1658        // take the children from the task to process them
1659        let mut new_children = take(new_children);
1660
1661        // handle stateful
1662        if stateful {
1663            let _ = task.add(CachedDataItem::Stateful { value: () });
1664        }
1665
1666        // handle cell counters: update max index and remove cells that are no longer used
1667        let old_counters: FxHashMap<_, _> =
1668            get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1669        let mut counters_to_remove = old_counters.clone();
1670        for (&cell_type, &max_index) in cell_counters.iter() {
1671            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1672                if old_max_index != max_index {
1673                    task.insert(CachedDataItem::CellTypeMaxIndex {
1674                        cell_type,
1675                        value: max_index,
1676                    });
1677                }
1678            } else {
1679                task.add_new(CachedDataItem::CellTypeMaxIndex {
1680                    cell_type,
1681                    value: max_index,
1682                });
1683            }
1684        }
1685        for (cell_type, _) in counters_to_remove {
1686            task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1687        }
1688
1689        let mut queue = AggregationUpdateQueue::new();
1690
1691        let mut removed_data = Vec::new();
1692        let mut old_edges = Vec::new();
1693
1694        let has_children = !new_children.is_empty();
1695        let has_mutable_children =
1696            has_children && new_children.values().any(|is_immutable| !*is_immutable);
1697
1698        // If the task is not stateful and has no mutable children, it does not have a way to be
1699        // invalidated and we can mark it as immutable.
1700        if !stateful && !has_mutable_children {
1701            task.mark_as_immutable();
1702        }
1703
1704        // Prepare all new children
1705        if has_children {
1706            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1707        }
1708
1709        // Filter actual new children
1710        if has_children {
1711            old_edges.extend(
1712                iter_many!(task, Child { task } => task)
1713                    .filter(|task| new_children.remove(task).is_none())
1714                    .map(OutdatedEdge::Child),
1715            );
1716        } else {
1717            old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
1718        }
1719
1720        // Remove no longer existing cells and
1721        // find all outdated data items (removed cells, outdated edges)
1722        // Note: For persistent tasks we only want to call extract_if when there are actual cells to
1723        // remove to avoid tracking that as modification.
1724        if task_id.is_transient() || iter_many!(task, CellData { cell }
1725            if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
1726        ).count() > 0 {
1727            removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
1728                matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
1729                            .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
1730            }));
1731        }
1732        if self.should_track_children() {
1733            old_edges.extend(
1734                task.iter(CachedDataItemType::OutdatedCollectible)
1735                    .filter_map(|(key, value)| match (key, value) {
1736                        (
1737                            CachedDataItemKey::OutdatedCollectible { collectible },
1738                            CachedDataItemValueRef::OutdatedCollectible { value },
1739                        ) => Some(OutdatedEdge::Collectible(collectible, *value)),
1740                        _ => None,
1741                    }),
1742            );
1743        }
1744        if !task.is_immutable() && self.should_track_dependencies() {
1745            old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
1746            old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
1747            old_edges.extend(
1748                iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
1749                    |(cell, task)| {
1750                        if cell_counters
1751                            .get(&cell.type_id)
1752                            .is_none_or(|start_index| cell.index >= *start_index)
1753                            && let Some(old_counter) = old_counters.get(&cell.type_id)
1754                            && cell.index < *old_counter
1755                        {
1756                            return Some(OutdatedEdge::RemovedCellDependent {
1757                                task_id: task,
1758                                #[cfg(feature = "trace_task_dirty")]
1759                                value_type_id: cell.type_id,
1760                            });
1761                        }
1762                        None
1763                    },
1764                ),
1765            );
1766        }
1767
1768        drop(task);
1769
1770        if !queue.is_empty() || !old_edges.is_empty() {
1771            #[cfg(feature = "trace_task_completion")]
1772            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
1773            // Remove outdated edges first, before removing in_progress+dirty flag.
1774            // We need to make sure all outdated edges are removed before the task can potentially
1775            // be scheduled and executed again
1776            CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx);
1777        }
1778
1779        // When restoring from persistent caching the following might not be executed (since we can
1780        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1781        // would be executed again.
1782
1783        //// STEP 2 ////
1784
1785        let mut task = ctx.task(task_id, TaskDataCategory::All);
1786        let Some(in_progress) = get!(task, InProgress) else {
1787            panic!("Task execution completed, but task is not in progress: {task:#?}");
1788        };
1789        let InProgressState::InProgress(box InProgressStateInner { stale, .. }) = in_progress
1790        else {
1791            panic!("Task execution completed, but task is not in progress: {task:#?}");
1792        };
1793
1794        // If the task is stale, reschedule it
1795        if *stale {
1796            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
1797                remove!(task, InProgress)
1798            else {
1799                unreachable!();
1800            };
1801            task.add_new(CachedDataItem::InProgress {
1802                value: InProgressState::Scheduled {
1803                    done_event,
1804                    reason: TaskExecutionReason::Stale,
1805                },
1806            });
1807            drop(task);
1808
1809            // All `new_children` are currently hold active with an active count and we need to undo
1810            // that. (We already filtered out the old children from that list)
1811            AggregationUpdateQueue::run(
1812                AggregationUpdateJob::DecreaseActiveCounts {
1813                    task_ids: new_children.into_keys().collect(),
1814                },
1815                &mut ctx,
1816            );
1817            return true;
1818        }
1819
1820        let mut queue = AggregationUpdateQueue::new();
1821
1822        if has_children {
1823            let is_immutable = task.is_immutable();
1824            let has_active_count = !is_immutable
1825                && ctx.should_track_activeness()
1826                && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
1827            connect_children(
1828                task_id,
1829                &mut task,
1830                new_children,
1831                &mut queue,
1832                has_active_count,
1833                !is_immutable && ctx.should_track_activeness(),
1834            );
1835        }
1836
1837        drop(task);
1838
1839        if has_children {
1840            #[cfg(feature = "trace_task_completion")]
1841            let _span = tracing::trace_span!("connect new children").entered();
1842            queue.execute(&mut ctx);
1843        }
1844
1845        let mut task = ctx.task(task_id, TaskDataCategory::All);
1846        let Some(in_progress) = remove!(task, InProgress) else {
1847            panic!("Task execution completed, but task is not in progress: {task:#?}");
1848        };
1849        let InProgressState::InProgress(box InProgressStateInner {
1850            done_event,
1851            once_task: _,
1852            stale,
1853            session_dependent,
1854            done: _,
1855            marked_as_completed: _,
1856            new_children,
1857        }) = in_progress
1858        else {
1859            panic!("Task execution completed, but task is not in progress: {task:#?}");
1860        };
1861        debug_assert!(new_children.is_empty());
1862
1863        // If the task is stale, reschedule it
1864        if stale {
1865            task.add_new(CachedDataItem::InProgress {
1866                value: InProgressState::Scheduled {
1867                    done_event,
1868                    reason: TaskExecutionReason::Stale,
1869                },
1870            });
1871            return true;
1872        }
1873
1874        // Notify in progress cells
1875        removed_data.extend(task.extract_if(
1876            CachedDataItemType::InProgressCell,
1877            |key, value| match (key, value) {
1878                (
1879                    CachedDataItemKey::InProgressCell { .. },
1880                    CachedDataItemValueRef::InProgressCell { value },
1881                ) => {
1882                    value.event.notify(usize::MAX);
1883                    true
1884                }
1885                _ => false,
1886            },
1887        ));
1888
1889        // Update the dirty state
1890        let old_dirty_state = get!(task, Dirty).copied();
1891
1892        let new_dirty_state = if session_dependent {
1893            Some(DirtyState {
1894                clean_in_session: Some(self.session_id),
1895            })
1896        } else {
1897            None
1898        };
1899
1900        let data_update = if old_dirty_state != new_dirty_state {
1901            if let Some(new_dirty_state) = new_dirty_state {
1902                task.insert(CachedDataItem::Dirty {
1903                    value: new_dirty_state,
1904                });
1905            } else {
1906                task.remove(&CachedDataItemKey::Dirty {});
1907            }
1908
1909            if self.should_track_children()
1910                && (old_dirty_state.is_some() || new_dirty_state.is_some())
1911            {
1912                let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
1913                    .cloned()
1914                    .unwrap_or_default();
1915                if let Some(old_dirty_state) = old_dirty_state {
1916                    dirty_containers.update_with_dirty_state(&old_dirty_state);
1917                }
1918                let aggregated_update = match (old_dirty_state, new_dirty_state) {
1919                    (None, None) => unreachable!(),
1920                    (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
1921                    (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
1922                    (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
1923                };
1924                if !aggregated_update.is_zero() {
1925                    if aggregated_update.get(self.session_id) < 0
1926                        && let Some(root_state) = get_mut!(task, Activeness)
1927                    {
1928                        root_state.all_clean_event.notify(usize::MAX);
1929                        root_state.unset_active_until_clean();
1930                        if root_state.is_empty() {
1931                            task.remove(&CachedDataItemKey::Activeness {});
1932                        }
1933                    }
1934                    AggregationUpdateJob::data_update(
1935                        &mut task,
1936                        AggregatedDataUpdate::new()
1937                            .dirty_container_update(task_id, aggregated_update),
1938                    )
1939                } else {
1940                    None
1941                }
1942            } else {
1943                None
1944            }
1945        } else {
1946            None
1947        };
1948
1949        drop(task);
1950
1951        if let Some(data_update) = data_update {
1952            AggregationUpdateQueue::run(data_update, &mut ctx);
1953        }
1954
1955        drop(removed_data);
1956
1957        //// STEP 4 ////
1958
1959        let mut task = ctx.task(task_id, TaskDataCategory::All);
1960        task.shrink_to_fit(CachedDataItemType::CellData);
1961        task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
1962        task.shrink_to_fit(CachedDataItemType::CellDependency);
1963        task.shrink_to_fit(CachedDataItemType::OutputDependency);
1964        task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
1965        drop(task);
1966
1967        false
1968    }
1969
1970    fn run_backend_job<'a>(
1971        self: &'a Arc<Self>,
1972        id: BackendJobId,
1973        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1974    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
1975        Box::pin(async move {
1976            if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
1977                debug_assert!(self.should_persist());
1978
1979                let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
1980                let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
1981                loop {
1982                    const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(60);
1983                    const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(30);
1984                    const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
1985
1986                    let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
1987                        FIRST_SNAPSHOT_WAIT
1988                    } else {
1989                        SNAPSHOT_INTERVAL
1990                    };
1991
1992                    let until = last_snapshot + time;
1993                    if until > Instant::now() {
1994                        let mut stop_listener = self.stopping_event.listen();
1995                        if !self.stopping.load(Ordering::Acquire) {
1996                            let mut idle_start_listener = self.idle_start_event.listen();
1997                            let mut idle_end_listener = self.idle_end_event.listen();
1998                            let mut idle_time = if turbo_tasks.is_idle() {
1999                                Instant::now() + IDLE_TIMEOUT
2000                            } else {
2001                                far_future()
2002                            };
2003                            loop {
2004                                tokio::select! {
2005                                    _ = &mut stop_listener => {
2006                                        break;
2007                                    },
2008                                    _ = &mut idle_start_listener => {
2009                                        idle_time = Instant::now() + IDLE_TIMEOUT;
2010                                        idle_start_listener = self.idle_start_event.listen()
2011                                    },
2012                                    _ = &mut idle_end_listener => {
2013                                        idle_time = until + IDLE_TIMEOUT;
2014                                        idle_end_listener = self.idle_end_event.listen()
2015                                    },
2016                                    _ = tokio::time::sleep_until(until) => {
2017                                        break;
2018                                    },
2019                                    _ = tokio::time::sleep_until(idle_time) => {
2020                                        if turbo_tasks.is_idle() {
2021                                            break;
2022                                        }
2023                                    },
2024                                }
2025                            }
2026                        }
2027                    }
2028
2029                    let this = self.clone();
2030                    let snapshot = turbo_tasks::spawn_blocking(move || this.snapshot()).await;
2031                    if let Some((snapshot_start, new_data)) = snapshot {
2032                        last_snapshot = snapshot_start;
2033                        if new_data {
2034                            continue;
2035                        }
2036                        let last_snapshot = last_snapshot.duration_since(self.start_time);
2037                        self.last_snapshot.store(
2038                            last_snapshot.as_millis().try_into().unwrap(),
2039                            Ordering::Relaxed,
2040                        );
2041
2042                        turbo_tasks.schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT);
2043                        return;
2044                    }
2045                }
2046            }
2047        })
2048    }
2049
2050    fn try_read_own_task_cell_untracked(
2051        &self,
2052        task_id: TaskId,
2053        cell: CellId,
2054        _options: ReadCellOptions,
2055        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2056    ) -> Result<TypedCellContent> {
2057        let mut ctx = self.execute_context(turbo_tasks);
2058        let task = ctx.task(task_id, TaskDataCategory::Data);
2059        if let Some(content) = get!(task, CellData { cell }) {
2060            Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
2061        } else {
2062            Ok(CellContent(None).into_typed(cell.type_id))
2063        }
2064    }
2065
2066    fn read_task_collectibles(
2067        &self,
2068        task_id: TaskId,
2069        collectible_type: TraitTypeId,
2070        reader_id: TaskId,
2071        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2072    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2073        if !self.should_track_children() {
2074            return AutoMap::default();
2075        }
2076
2077        let mut ctx = self.execute_context(turbo_tasks);
2078        let mut collectibles = AutoMap::default();
2079        {
2080            let mut task = ctx.task(task_id, TaskDataCategory::All);
2081            // Ensure it's an root node
2082            loop {
2083                let aggregation_number = get_aggregation_number(&task);
2084                if is_root_node(aggregation_number) {
2085                    break;
2086                }
2087                drop(task);
2088                AggregationUpdateQueue::run(
2089                    AggregationUpdateJob::UpdateAggregationNumber {
2090                        task_id,
2091                        base_aggregation_number: u32::MAX,
2092                        distance: None,
2093                    },
2094                    &mut ctx,
2095                );
2096                task = ctx.task(task_id, TaskDataCategory::All);
2097            }
2098            for collectible in iter_many!(
2099                task,
2100                AggregatedCollectible {
2101                    collectible
2102                } count if collectible.collectible_type == collectible_type && *count > 0 => {
2103                    collectible.cell
2104                }
2105            ) {
2106                *collectibles
2107                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2108                    .or_insert(0) += 1;
2109            }
2110            for (collectible, count) in iter_many!(
2111                task,
2112                Collectible {
2113                    collectible
2114                } count if collectible.collectible_type == collectible_type => {
2115                    (collectible.cell, *count)
2116                }
2117            ) {
2118                *collectibles
2119                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2120                    .or_insert(0) += count;
2121            }
2122            let _ = task.add(CachedDataItem::CollectiblesDependent {
2123                collectible_type,
2124                task: reader_id,
2125                value: (),
2126            });
2127        }
2128        {
2129            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2130            let target = CollectiblesRef {
2131                task: task_id,
2132                collectible_type,
2133            };
2134            if reader.add(CachedDataItem::CollectiblesDependency { target, value: () }) {
2135                reader.remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target });
2136            }
2137        }
2138        collectibles
2139    }
2140
2141    fn emit_collectible(
2142        &self,
2143        collectible_type: TraitTypeId,
2144        collectible: RawVc,
2145        task_id: TaskId,
2146        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2147    ) {
2148        self.assert_valid_collectible(task_id, collectible);
2149        if !self.should_track_children() {
2150            return;
2151        }
2152
2153        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2154            panic!("Collectibles need to be resolved");
2155        };
2156        let cell = CellRef {
2157            task: collectible_task,
2158            cell,
2159        };
2160        operation::UpdateCollectibleOperation::run(
2161            task_id,
2162            CollectibleRef {
2163                collectible_type,
2164                cell,
2165            },
2166            1,
2167            self.execute_context(turbo_tasks),
2168        );
2169    }
2170
2171    fn unemit_collectible(
2172        &self,
2173        collectible_type: TraitTypeId,
2174        collectible: RawVc,
2175        count: u32,
2176        task_id: TaskId,
2177        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2178    ) {
2179        self.assert_valid_collectible(task_id, collectible);
2180        if !self.should_track_children() {
2181            return;
2182        }
2183
2184        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2185            panic!("Collectibles need to be resolved");
2186        };
2187        let cell = CellRef {
2188            task: collectible_task,
2189            cell,
2190        };
2191        operation::UpdateCollectibleOperation::run(
2192            task_id,
2193            CollectibleRef {
2194                collectible_type,
2195                cell,
2196            },
2197            -(i32::try_from(count).unwrap()),
2198            self.execute_context(turbo_tasks),
2199        );
2200    }
2201
2202    fn update_task_cell(
2203        &self,
2204        task_id: TaskId,
2205        cell: CellId,
2206        content: CellContent,
2207        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2208    ) {
2209        operation::UpdateCellOperation::run(
2210            task_id,
2211            cell,
2212            content,
2213            self.execute_context(turbo_tasks),
2214        );
2215    }
2216
2217    fn mark_own_task_as_session_dependent(
2218        &self,
2219        task_id: TaskId,
2220        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2221    ) {
2222        if !self.should_track_dependencies() {
2223            // Without dependency tracking we don't need session dependent tasks
2224            return;
2225        }
2226        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2227        let mut ctx = self.execute_context(turbo_tasks);
2228        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2229        let aggregation_number = get_aggregation_number(&task);
2230        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2231            drop(task);
2232            // We want to use a high aggregation number to avoid large aggregation chains for
2233            // session dependent tasks (which change on every run)
2234            AggregationUpdateQueue::run(
2235                AggregationUpdateJob::UpdateAggregationNumber {
2236                    task_id,
2237                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2238                    distance: None,
2239                },
2240                &mut ctx,
2241            );
2242            task = ctx.task(task_id, TaskDataCategory::Meta);
2243        }
2244        if let Some(InProgressState::InProgress(box InProgressStateInner {
2245            session_dependent,
2246            ..
2247        })) = get_mut!(task, InProgress)
2248        {
2249            *session_dependent = true;
2250        }
2251    }
2252
2253    fn mark_own_task_as_finished(
2254        &self,
2255        task: TaskId,
2256        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2257    ) {
2258        let mut ctx = self.execute_context(turbo_tasks);
2259        let mut task = ctx.task(task, TaskDataCategory::Data);
2260        if let Some(InProgressState::InProgress(box InProgressStateInner {
2261            marked_as_completed,
2262            ..
2263        })) = get_mut!(task, InProgress)
2264        {
2265            *marked_as_completed = true;
2266            // TODO this should remove the dirty state (also check session_dependent)
2267            // but this would break some assumptions for strongly consistent reads.
2268            // Client tasks are not connected yet, so we wouldn't wait for them.
2269            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
2270        }
2271    }
2272
2273    fn set_own_task_aggregation_number(
2274        &self,
2275        task: TaskId,
2276        aggregation_number: u32,
2277        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2278    ) {
2279        let mut ctx = self.execute_context(turbo_tasks);
2280        AggregationUpdateQueue::run(
2281            AggregationUpdateJob::UpdateAggregationNumber {
2282                task_id: task,
2283                base_aggregation_number: aggregation_number,
2284                distance: None,
2285            },
2286            &mut ctx,
2287        );
2288    }
2289
2290    fn connect_task(
2291        &self,
2292        task: TaskId,
2293        parent_task: TaskId,
2294        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2295    ) {
2296        self.assert_not_persistent_calling_transient(parent_task, task, None);
2297        ConnectChildOperation::run(parent_task, task, false, self.execute_context(turbo_tasks));
2298    }
2299
2300    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2301        let task_id = self.transient_task_id_factory.get();
2302        let root_type = match task_type {
2303            TransientTaskType::Root(_) => RootType::RootTask,
2304            TransientTaskType::Once(_) => RootType::OnceTask,
2305        };
2306        self.transient_tasks.insert(
2307            task_id,
2308            Arc::new(match task_type {
2309                TransientTaskType::Root(f) => TransientTask::Root(f),
2310                TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2311            }),
2312        );
2313        {
2314            let mut task = self.storage.access_mut(task_id);
2315            task.add(CachedDataItem::AggregationNumber {
2316                value: AggregationNumber {
2317                    base: u32::MAX,
2318                    distance: 0,
2319                    effective: u32::MAX,
2320                },
2321            });
2322            if !task.state().is_immutable() && self.should_track_activeness() {
2323                task.add(CachedDataItem::Activeness {
2324                    value: ActivenessState::new_root(root_type, task_id),
2325                });
2326            }
2327            task.add(CachedDataItem::new_scheduled(
2328                TaskExecutionReason::Initial,
2329                move || match root_type {
2330                    RootType::RootTask => "Root Task".to_string(),
2331                    RootType::OnceTask => "Once Task".to_string(),
2332                },
2333            ));
2334        }
2335        #[cfg(feature = "verify_aggregation_graph")]
2336        self.root_tasks.lock().insert(task_id);
2337        task_id
2338    }
2339
2340    fn dispose_root_task(
2341        &self,
2342        task_id: TaskId,
2343        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2344    ) {
2345        #[cfg(feature = "verify_aggregation_graph")]
2346        self.root_tasks.lock().remove(&task_id);
2347
2348        let mut ctx = self.execute_context(turbo_tasks);
2349        let mut task = ctx.task(task_id, TaskDataCategory::All);
2350        let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2351        let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2352            .map_or(false, |dirty_containers| {
2353                dirty_containers.get(self.session_id) > 0
2354            });
2355        if is_dirty || has_dirty_containers {
2356            if let Some(root_state) = get_mut!(task, Activeness) {
2357                // We will finish the task, but it would be removed after the task is done
2358                root_state.unset_root_type();
2359                root_state.set_active_until_clean();
2360            };
2361        } else if let Some(root_state) = remove!(task, Activeness) {
2362            // Technically nobody should be listening to this event, but just in case
2363            // we notify it anyway
2364            root_state.all_clean_event.notify(usize::MAX);
2365        }
2366    }
2367
2368    #[cfg(feature = "verify_aggregation_graph")]
2369    fn verify_aggregation_graph(
2370        &self,
2371        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2372        idle: bool,
2373    ) {
2374        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2375            return;
2376        }
2377        use std::{collections::VecDeque, env, io::stdout};
2378
2379        use crate::backend::operation::{get_uppers, is_aggregating_node};
2380
2381        let mut ctx = self.execute_context(turbo_tasks);
2382        let root_tasks = self.root_tasks.lock().clone();
2383        let len = root_tasks.len();
2384
2385        for (i, task_id) in root_tasks.into_iter().enumerate() {
2386            println!("Verifying graph from root {task_id} {i}/{len}...");
2387            let mut queue = VecDeque::new();
2388            let mut visited = FxHashSet::default();
2389            let mut aggregated_nodes = FxHashSet::default();
2390            let mut collectibles = FxHashMap::default();
2391            let root_task_id = task_id;
2392            visited.insert(task_id);
2393            aggregated_nodes.insert(task_id);
2394            queue.push_back(task_id);
2395            let mut counter = 0;
2396            while let Some(task_id) = queue.pop_front() {
2397                counter += 1;
2398                if counter % 100000 == 0 {
2399                    println!(
2400                        "queue={}, visited={}, aggregated_nodes={}",
2401                        queue.len(),
2402                        visited.len(),
2403                        aggregated_nodes.len()
2404                    );
2405                }
2406                let task = ctx.task(task_id, TaskDataCategory::All);
2407                if idle && !self.is_idle.load(Ordering::Relaxed) {
2408                    return;
2409                }
2410
2411                let uppers = get_uppers(&task);
2412                if task_id != root_task_id
2413                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2414                {
2415                    println!(
2416                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2417                         {:?})",
2418                        task_id,
2419                        ctx.get_task_description(task_id),
2420                        uppers
2421                    );
2422                }
2423
2424                let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2425                for collectible in aggregated_collectibles {
2426                    collectibles
2427                        .entry(collectible)
2428                        .or_insert_with(|| (false, Vec::new()))
2429                        .1
2430                        .push(task_id);
2431                }
2432
2433                let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2434                for collectible in own_collectibles {
2435                    if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2436                        *flag = true
2437                    } else {
2438                        println!(
2439                            "Task {} has a collectible {:?} that is not in any upper task",
2440                            task_id, collectible
2441                        );
2442                    }
2443                }
2444
2445                let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2446                let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2447                    .is_some_and(|count| count.get(self.session_id) > 0);
2448                let should_be_in_upper = is_dirty || has_dirty_container;
2449
2450                let aggregation_number = get_aggregation_number(&task);
2451                if is_aggregating_node(aggregation_number) {
2452                    aggregated_nodes.insert(task_id);
2453                }
2454                // println!(
2455                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
2456                //     ctx.get_task_description(task_id),
2457                //     uppers
2458                // );
2459
2460                for child_id in iter_many!(task, Child { task } => task) {
2461                    // println!("{task_id}: child -> {child_id}");
2462                    if visited.insert(child_id) {
2463                        queue.push_back(child_id);
2464                    }
2465                }
2466                drop(task);
2467
2468                if should_be_in_upper {
2469                    for upper_id in uppers {
2470                        let task = ctx.task(task_id, TaskDataCategory::All);
2471                        let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2472                            .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2473                        if !in_upper {
2474                            println!(
2475                                "Task {} is dirty, but is not listed in the upper task {}",
2476                                task_id, upper_id
2477                            );
2478                        }
2479                    }
2480                }
2481            }
2482
2483            for (collectible, (flag, task_ids)) in collectibles {
2484                if !flag {
2485                    use std::io::Write;
2486                    let mut stdout = stdout().lock();
2487                    writeln!(
2488                        stdout,
2489                        "{:?} that is not emitted in any child task but in these aggregated \
2490                         tasks: {:#?}",
2491                        collectible,
2492                        task_ids
2493                            .iter()
2494                            .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
2495                            .collect::<Vec<_>>()
2496                    );
2497
2498                    let task_id = collectible.cell.task;
2499                    let mut queue = {
2500                        let task = ctx.task(task_id, TaskDataCategory::All);
2501                        get_uppers(&task)
2502                    };
2503                    let mut visited = FxHashSet::default();
2504                    for &upper_id in queue.iter() {
2505                        visited.insert(upper_id);
2506                        writeln!(stdout, "{task_id:?} -> {upper_id:?}");
2507                    }
2508                    while let Some(task_id) = queue.pop() {
2509                        let desc = ctx.get_task_description(task_id);
2510                        let task = ctx.task(task_id, TaskDataCategory::All);
2511                        let aggregated_collectible =
2512                            get!(task, AggregatedCollectible { collectible })
2513                                .copied()
2514                                .unwrap_or_default();
2515                        let uppers = get_uppers(&task);
2516                        drop(task);
2517                        writeln!(
2518                            stdout,
2519                            "upper {task_id} {desc} collectible={aggregated_collectible}"
2520                        );
2521                        if task_ids.contains(&task_id) {
2522                            writeln!(
2523                                stdout,
2524                                "Task has an upper connection to an aggregated task that doesn't \
2525                                 reference it. Upper connection is invalid!"
2526                            );
2527                        }
2528                        for upper_id in uppers {
2529                            writeln!(stdout, "{task_id:?} -> {upper_id:?}");
2530                            if !visited.contains(&upper_id) {
2531                                queue.push(upper_id);
2532                            }
2533                        }
2534                    }
2535                }
2536            }
2537            println!("visited {task_id} {} tasks", visited.len());
2538        }
2539    }
2540
2541    fn assert_not_persistent_calling_transient(
2542        &self,
2543        parent_id: TaskId,
2544        child_id: TaskId,
2545        cell_id: Option<CellId>,
2546    ) {
2547        if !parent_id.is_transient() && child_id.is_transient() {
2548            self.panic_persistent_calling_transient(
2549                self.lookup_task_type(parent_id).as_deref(),
2550                self.lookup_task_type(child_id).as_deref(),
2551                cell_id,
2552            );
2553        }
2554    }
2555
2556    fn panic_persistent_calling_transient(
2557        &self,
2558        parent: Option<&CachedTaskType>,
2559        child: Option<&CachedTaskType>,
2560        cell_id: Option<CellId>,
2561    ) {
2562        let transient_reason = if let Some(child) = child {
2563            Cow::Owned(format!(
2564                " The callee is transient because it depends on:\n{}",
2565                self.debug_trace_transient_task(child, cell_id),
2566            ))
2567        } else {
2568            Cow::Borrowed("")
2569        };
2570        panic!(
2571            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
2572            parent.map_or("unknown", |t| t.get_name()),
2573            child.map_or("unknown", |t| t.get_name()),
2574            transient_reason,
2575        );
2576    }
2577
2578    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
2579        // these checks occur in a potentially hot codepath, but they're cheap
2580        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
2581            // This should never happen: The collectible APIs use ResolvedVc
2582            let task_info = if let Some(col_task_ty) = collectible
2583                .try_get_task_id()
2584                .and_then(|t| self.lookup_task_type(t))
2585            {
2586                Cow::Owned(format!(" (return type of {col_task_ty})"))
2587            } else {
2588                Cow::Borrowed("")
2589            };
2590            panic!("Collectible{task_info} must be a ResolvedVc")
2591        };
2592        if col_task_id.is_transient() && !task_id.is_transient() {
2593            let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
2594                Cow::Owned(format!(
2595                    ". The collectible is transient because it depends on:\n{}",
2596                    self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
2597                ))
2598            } else {
2599                Cow::Borrowed("")
2600            };
2601            // this should never happen: How would a persistent function get a transient Vc?
2602            panic!(
2603                "Collectible is transient, transient collectibles cannot be emitted from \
2604                 persistent tasks{transient_reason}",
2605            )
2606        }
2607    }
2608}
2609
2610impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
2611    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2612        self.0.startup(turbo_tasks);
2613    }
2614
2615    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2616        self.0.stopping();
2617    }
2618
2619    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2620        self.0.stop(turbo_tasks);
2621    }
2622
2623    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2624        self.0.idle_start(turbo_tasks);
2625    }
2626
2627    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2628        self.0.idle_end();
2629    }
2630
2631    fn get_or_create_persistent_task(
2632        &self,
2633        task_type: CachedTaskType,
2634        parent_task: TaskId,
2635        is_immutable: bool,
2636        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2637    ) -> TaskId {
2638        self.0
2639            .get_or_create_persistent_task(task_type, parent_task, is_immutable, turbo_tasks)
2640    }
2641
2642    fn get_or_create_transient_task(
2643        &self,
2644        task_type: CachedTaskType,
2645        parent_task: TaskId,
2646        is_immutable: bool,
2647        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2648    ) -> TaskId {
2649        self.0
2650            .get_or_create_transient_task(task_type, parent_task, is_immutable, turbo_tasks)
2651    }
2652
2653    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2654        self.0.invalidate_task(task_id, turbo_tasks);
2655    }
2656
2657    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2658        self.0.invalidate_tasks(tasks, turbo_tasks);
2659    }
2660
2661    fn invalidate_tasks_set(
2662        &self,
2663        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
2664        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2665    ) {
2666        self.0.invalidate_tasks_set(tasks, turbo_tasks);
2667    }
2668
2669    fn invalidate_serialization(
2670        &self,
2671        task_id: TaskId,
2672        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2673    ) {
2674        self.0.invalidate_serialization(task_id, turbo_tasks);
2675    }
2676
2677    fn get_task_description(&self, task: TaskId) -> String {
2678        self.0.get_task_description(task)
2679    }
2680
2681    type TaskState = ();
2682    fn new_task_state(&self, _task: TaskId) -> Self::TaskState {}
2683
2684    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2685        self.0.task_execution_canceled(task, turbo_tasks)
2686    }
2687
2688    fn try_start_task_execution(
2689        &self,
2690        task_id: TaskId,
2691        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2692    ) -> Option<TaskExecutionSpec<'_>> {
2693        self.0.try_start_task_execution(task_id, turbo_tasks)
2694    }
2695
2696    fn task_execution_result(
2697        &self,
2698        task_id: TaskId,
2699        result: Result<RawVc, TurboTasksExecutionError>,
2700        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2701    ) {
2702        self.0.task_execution_result(task_id, result, turbo_tasks);
2703    }
2704
2705    fn task_execution_completed(
2706        &self,
2707        task_id: TaskId,
2708        _duration: Duration,
2709        _memory_usage: usize,
2710        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2711        stateful: bool,
2712        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2713    ) -> bool {
2714        self.0.task_execution_completed(
2715            task_id,
2716            _duration,
2717            _memory_usage,
2718            cell_counters,
2719            stateful,
2720            turbo_tasks,
2721        )
2722    }
2723
2724    fn run_backend_job<'a>(
2725        &'a self,
2726        id: BackendJobId,
2727        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
2728    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2729        self.0.run_backend_job(id, turbo_tasks)
2730    }
2731
2732    fn try_read_task_output(
2733        &self,
2734        task_id: TaskId,
2735        reader: TaskId,
2736        consistency: ReadConsistency,
2737        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2738    ) -> Result<Result<RawVc, EventListener>> {
2739        self.0
2740            .try_read_task_output(task_id, Some(reader), consistency, turbo_tasks)
2741    }
2742
2743    fn try_read_task_output_untracked(
2744        &self,
2745        task_id: TaskId,
2746        consistency: ReadConsistency,
2747        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2748    ) -> Result<Result<RawVc, EventListener>> {
2749        self.0
2750            .try_read_task_output(task_id, None, consistency, turbo_tasks)
2751    }
2752
2753    fn try_read_task_cell(
2754        &self,
2755        task_id: TaskId,
2756        cell: CellId,
2757        reader: TaskId,
2758        options: ReadCellOptions,
2759        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2760    ) -> Result<Result<TypedCellContent, EventListener>> {
2761        self.0
2762            .try_read_task_cell(task_id, Some(reader), cell, options, turbo_tasks)
2763    }
2764
2765    fn try_read_task_cell_untracked(
2766        &self,
2767        task_id: TaskId,
2768        cell: CellId,
2769        options: ReadCellOptions,
2770        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2771    ) -> Result<Result<TypedCellContent, EventListener>> {
2772        self.0
2773            .try_read_task_cell(task_id, None, cell, options, turbo_tasks)
2774    }
2775
2776    fn try_read_own_task_cell_untracked(
2777        &self,
2778        task_id: TaskId,
2779        cell: CellId,
2780        options: ReadCellOptions,
2781        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2782    ) -> Result<TypedCellContent> {
2783        self.0
2784            .try_read_own_task_cell_untracked(task_id, cell, options, turbo_tasks)
2785    }
2786
2787    fn read_task_collectibles(
2788        &self,
2789        task_id: TaskId,
2790        collectible_type: TraitTypeId,
2791        reader: TaskId,
2792        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2793    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2794        self.0
2795            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
2796    }
2797
2798    fn emit_collectible(
2799        &self,
2800        collectible_type: TraitTypeId,
2801        collectible: RawVc,
2802        task_id: TaskId,
2803        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2804    ) {
2805        self.0
2806            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
2807    }
2808
2809    fn unemit_collectible(
2810        &self,
2811        collectible_type: TraitTypeId,
2812        collectible: RawVc,
2813        count: u32,
2814        task_id: TaskId,
2815        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2816    ) {
2817        self.0
2818            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
2819    }
2820
2821    fn update_task_cell(
2822        &self,
2823        task_id: TaskId,
2824        cell: CellId,
2825        content: CellContent,
2826        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2827    ) {
2828        self.0.update_task_cell(task_id, cell, content, turbo_tasks);
2829    }
2830
2831    fn mark_own_task_as_finished(
2832        &self,
2833        task_id: TaskId,
2834        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2835    ) {
2836        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
2837    }
2838
2839    fn set_own_task_aggregation_number(
2840        &self,
2841        task: TaskId,
2842        aggregation_number: u32,
2843        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2844    ) {
2845        self.0
2846            .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
2847    }
2848
2849    fn mark_own_task_as_session_dependent(
2850        &self,
2851        task: TaskId,
2852        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2853    ) {
2854        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
2855    }
2856
2857    fn connect_task(
2858        &self,
2859        task: TaskId,
2860        parent_task: TaskId,
2861        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2862    ) {
2863        self.0.connect_task(task, parent_task, turbo_tasks);
2864    }
2865
2866    fn create_transient_task(
2867        &self,
2868        task_type: TransientTaskType,
2869        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2870    ) -> TaskId {
2871        self.0.create_transient_task(task_type)
2872    }
2873
2874    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2875        self.0.dispose_root_task(task_id, turbo_tasks);
2876    }
2877
2878    fn task_statistics(&self) -> &TaskStatisticsApi {
2879        &self.0.task_statistics
2880    }
2881}
2882
2883enum DebugTraceTransientTask {
2884    Cached {
2885        task_name: &'static str,
2886        cell_type_id: Option<ValueTypeId>,
2887        cause_self: Option<Box<DebugTraceTransientTask>>,
2888        cause_args: Vec<DebugTraceTransientTask>,
2889    },
2890    /// This representation is used when this task is a duplicate of one previously shown
2891    Collapsed {
2892        task_name: &'static str,
2893        cell_type_id: Option<ValueTypeId>,
2894    },
2895    Uncached {
2896        cell_type_id: Option<ValueTypeId>,
2897    },
2898}
2899
2900impl DebugTraceTransientTask {
2901    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
2902        let indent = "    ".repeat(level);
2903        f.write_str(&indent)?;
2904
2905        fn fmt_cell_type_id(
2906            f: &mut fmt::Formatter<'_>,
2907            cell_type_id: Option<ValueTypeId>,
2908        ) -> fmt::Result {
2909            if let Some(ty) = cell_type_id {
2910                write!(f, " (read cell of type {})", get_value_type_global_name(ty))
2911            } else {
2912                Ok(())
2913            }
2914        }
2915
2916        // write the name and type
2917        match self {
2918            Self::Cached {
2919                task_name,
2920                cell_type_id,
2921                ..
2922            }
2923            | Self::Collapsed {
2924                task_name,
2925                cell_type_id,
2926                ..
2927            } => {
2928                f.write_str(task_name)?;
2929                fmt_cell_type_id(f, *cell_type_id)?;
2930                if matches!(self, Self::Collapsed { .. }) {
2931                    f.write_str(" (collapsed)")?;
2932                }
2933            }
2934            Self::Uncached { cell_type_id } => {
2935                f.write_str("unknown transient task")?;
2936                fmt_cell_type_id(f, *cell_type_id)?;
2937            }
2938        }
2939        f.write_char('\n')?;
2940
2941        // write any extra "cause" information we might have
2942        if let Self::Cached {
2943            cause_self,
2944            cause_args,
2945            ..
2946        } = self
2947        {
2948            if let Some(c) = cause_self {
2949                writeln!(f, "{indent}  self:")?;
2950                c.fmt_indented(f, level + 1)?;
2951            }
2952            if !cause_args.is_empty() {
2953                writeln!(f, "{indent}  args:")?;
2954                for c in cause_args {
2955                    c.fmt_indented(f, level + 1)?;
2956                }
2957            }
2958        }
2959        Ok(())
2960    }
2961}
2962
2963impl fmt::Display for DebugTraceTransientTask {
2964    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2965        self.fmt_indented(f, 0)
2966    }
2967}
2968
2969// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
2970fn far_future() -> Instant {
2971    // Roughly 30 years from now.
2972    // API does not provide a way to obtain max `Instant`
2973    // or convert specific date in the future to instant.
2974    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
2975    Instant::now() + Duration::from_secs(86400 * 365 * 30)
2976}