turbo_tasks_backend/backend/
mod.rs

1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6    borrow::Cow,
7    cmp::min,
8    fmt::{self, Write},
9    future::Future,
10    hash::BuildHasherDefault,
11    mem::take,
12    ops::Range,
13    pin::Pin,
14    sync::{
15        Arc,
16        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17    },
18    thread::available_parallelism,
19};
20
21use anyhow::{Result, bail};
22use auto_hash_map::{AutoMap, AutoSet};
23use indexmap::IndexSet;
24use parking_lot::{Condvar, Mutex};
25use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
26use smallvec::{SmallVec, smallvec};
27use tokio::time::{Duration, Instant};
28use tracing::{field::Empty, info_span};
29use turbo_tasks::{
30    CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
31    SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi,
32    ValueTypeId,
33    backend::{
34        Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
35        TransientTaskType, TurboTasksExecutionError, TypedCellContent,
36    },
37    event::{Event, EventListener},
38    message_queue::TimingEvent,
39    registry::{self, get_value_type_global_name},
40    task_statistics::TaskStatisticsApi,
41    trace::TraceRawVcs,
42    turbo_tasks,
43    util::{IdFactoryWithReuse, good_chunk_size},
44};
45
46pub use self::{operation::AnyOperation, storage::TaskDataCategory};
47#[cfg(feature = "trace_task_dirty")]
48use crate::backend::operation::TaskDirtyCause;
49use crate::{
50    backend::{
51        operation::{
52            AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
53            CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
54            Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
55            get_uppers, is_root_node, prepare_new_children,
56        },
57        storage::{
58            InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
59            iter_many, remove,
60        },
61    },
62    backing_storage::BackingStorage,
63    data::{
64        ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
65        CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
66        InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
67    },
68    utils::{
69        bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
70        ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
71    },
72};
73
74const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
75
76struct SnapshotRequest {
77    snapshot_requested: bool,
78    suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
79}
80
81impl SnapshotRequest {
82    fn new() -> Self {
83        Self {
84            snapshot_requested: false,
85            suspended_operations: FxHashSet::default(),
86        }
87    }
88}
89
90type TransientTaskOnce =
91    Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
92
93pub enum TransientTask {
94    /// A root task that will track dependencies and re-execute when
95    /// dependencies change. Task will eventually settle to the correct
96    /// execution.
97    ///
98    /// Always active. Automatically scheduled.
99    Root(TransientTaskRoot),
100
101    // TODO implement these strongly consistency
102    /// A single root task execution. It won't track dependencies.
103    /// Task will definitely include all invalidations that happened before the
104    /// start of the task. It may or may not include invalidations that
105    /// happened after that. It may see these invalidations partially
106    /// applied.
107    ///
108    /// Active until done. Automatically scheduled.
109    Once(TransientTaskOnce),
110}
111
112pub enum StorageMode {
113    /// Queries the storage for cache entries that don't exist locally.
114    ReadOnly,
115    /// Queries the storage for cache entries that don't exist locally.
116    /// Keeps a log of all changes and regularly push them to the backing storage.
117    ReadWrite,
118}
119
120pub struct BackendOptions {
121    /// Enables dependency tracking.
122    ///
123    /// When disabled: No state changes are allowed. Tasks will never reexecute and stay cached
124    /// forever.
125    pub dependency_tracking: bool,
126
127    /// Enables children tracking.
128    ///
129    /// When disabled: Strongly consistent reads are only eventually consistent. All tasks are
130    /// considered as active. Collectibles are disabled.
131    pub children_tracking: bool,
132
133    /// Enables active tracking.
134    ///
135    /// Automatically disabled when `dependency_tracking` is disabled.
136    ///
137    /// When disabled: All tasks are considered as active.
138    pub active_tracking: bool,
139
140    /// Enables the backing storage.
141    pub storage_mode: Option<StorageMode>,
142
143    /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
144    pub small_preallocation: bool,
145}
146
147impl Default for BackendOptions {
148    fn default() -> Self {
149        Self {
150            dependency_tracking: true,
151            children_tracking: true,
152            active_tracking: true,
153            storage_mode: Some(StorageMode::ReadWrite),
154            small_preallocation: false,
155        }
156    }
157}
158
159pub enum TurboTasksBackendJob {
160    InitialSnapshot,
161    FollowUpSnapshot,
162    Prefetch {
163        data: Arc<FxIndexMap<TaskId, bool>>,
164        range: Option<Range<usize>>,
165    },
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173    options: BackendOptions,
174
175    start_time: Instant,
176    session_id: SessionId,
177
178    persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
179    transient_task_id_factory: IdFactoryWithReuse<TaskId>,
180
181    persisted_task_cache_log: Option<TaskCacheLog>,
182    task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
183    transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
184
185    storage: Storage,
186
187    /// When true, the backing_storage has data that is not in the local storage.
188    local_is_partial: AtomicBool,
189
190    /// Number of executing operations + Highest bit is set when snapshot is
191    /// requested. When that bit is set, operations should pause until the
192    /// snapshot is completed. When the bit is set and in progress counter
193    /// reaches zero, `operations_completed_when_snapshot_requested` is
194    /// triggered.
195    in_progress_operations: AtomicUsize,
196
197    snapshot_request: Mutex<SnapshotRequest>,
198    /// Condition Variable that is triggered when `in_progress_operations`
199    /// reaches zero while snapshot is requested. All operations are either
200    /// completed or suspended.
201    operations_suspended: Condvar,
202    /// Condition Variable that is triggered when a snapshot is completed and
203    /// operations can continue.
204    snapshot_completed: Condvar,
205    /// The timestamp of the last started snapshot since [`Self::start_time`].
206    last_snapshot: AtomicU64,
207
208    stopping: AtomicBool,
209    stopping_event: Event,
210    idle_start_event: Event,
211    idle_end_event: Event,
212    #[cfg(feature = "verify_aggregation_graph")]
213    is_idle: AtomicBool,
214
215    task_statistics: TaskStatisticsApi,
216
217    backing_storage: B,
218
219    #[cfg(feature = "verify_aggregation_graph")]
220    root_tasks: Mutex<FxHashSet<TaskId>>,
221}
222
223impl<B: BackingStorage> TurboTasksBackend<B> {
224    pub fn new(options: BackendOptions, backing_storage: B) -> Self {
225        Self(Arc::new(TurboTasksBackendInner::new(
226            options,
227            backing_storage,
228        )))
229    }
230
231    pub fn backing_storage(&self) -> &B {
232        &self.0.backing_storage
233    }
234}
235
236impl<B: BackingStorage> TurboTasksBackendInner<B> {
237    pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
238        let shard_amount =
239            (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
240        let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
241        if !options.dependency_tracking {
242            options.active_tracking = false;
243        }
244        let small_preallocation = options.small_preallocation;
245        let next_task_id = backing_storage
246            .next_free_task_id()
247            .expect("Failed to get task id");
248        Self {
249            options,
250            start_time: Instant::now(),
251            session_id: backing_storage
252                .next_session_id()
253                .expect("Failed get session id"),
254            persisted_task_id_factory: IdFactoryWithReuse::new(
255                next_task_id,
256                TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
257            ),
258            transient_task_id_factory: IdFactoryWithReuse::new(
259                TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
260                TaskId::MAX,
261            ),
262            persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
263            task_cache: BiMap::new(),
264            transient_tasks: FxDashMap::default(),
265            local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
266            storage: Storage::new(small_preallocation),
267            in_progress_operations: AtomicUsize::new(0),
268            snapshot_request: Mutex::new(SnapshotRequest::new()),
269            operations_suspended: Condvar::new(),
270            snapshot_completed: Condvar::new(),
271            last_snapshot: AtomicU64::new(0),
272            stopping: AtomicBool::new(false),
273            stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
274            idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
275            idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
276            #[cfg(feature = "verify_aggregation_graph")]
277            is_idle: AtomicBool::new(false),
278            task_statistics: TaskStatisticsApi::default(),
279            backing_storage,
280            #[cfg(feature = "verify_aggregation_graph")]
281            root_tasks: Default::default(),
282        }
283    }
284
285    fn execute_context<'a>(
286        &'a self,
287        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
288    ) -> impl ExecuteContext<'a> {
289        ExecuteContextImpl::new(self, turbo_tasks)
290    }
291
292    fn session_id(&self) -> SessionId {
293        self.session_id
294    }
295
296    /// # Safety
297    ///
298    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
299    unsafe fn execute_context_with_tx<'e, 'tx>(
300        &'e self,
301        tx: Option<&'e B::ReadTransaction<'tx>>,
302        turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
303    ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
304    where
305        'tx: 'e,
306    {
307        // Safety: `tx` is from `self`.
308        unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
309    }
310
311    fn suspending_requested(&self) -> bool {
312        self.should_persist()
313            && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
314    }
315
316    fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
317        #[cold]
318        fn operation_suspend_point_cold<B: BackingStorage>(
319            this: &TurboTasksBackendInner<B>,
320            suspend: impl FnOnce() -> AnyOperation,
321        ) {
322            let operation = Arc::new(suspend());
323            let mut snapshot_request = this.snapshot_request.lock();
324            if snapshot_request.snapshot_requested {
325                snapshot_request
326                    .suspended_operations
327                    .insert(operation.clone().into());
328                let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
329                assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
330                if value == SNAPSHOT_REQUESTED_BIT {
331                    this.operations_suspended.notify_all();
332                }
333                this.snapshot_completed
334                    .wait_while(&mut snapshot_request, |snapshot_request| {
335                        snapshot_request.snapshot_requested
336                    });
337                this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
338                snapshot_request
339                    .suspended_operations
340                    .remove(&operation.into());
341            }
342        }
343
344        if self.suspending_requested() {
345            operation_suspend_point_cold(self, suspend);
346        }
347    }
348
349    pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
350        if !self.should_persist() {
351            return OperationGuard { backend: None };
352        }
353        let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
354        if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
355            let mut snapshot_request = self.snapshot_request.lock();
356            if snapshot_request.snapshot_requested {
357                let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
358                if value == SNAPSHOT_REQUESTED_BIT {
359                    self.operations_suspended.notify_all();
360                }
361                self.snapshot_completed
362                    .wait_while(&mut snapshot_request, |snapshot_request| {
363                        snapshot_request.snapshot_requested
364                    });
365                self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
366            }
367        }
368        OperationGuard {
369            backend: Some(self),
370        }
371    }
372
373    fn should_persist(&self) -> bool {
374        matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
375    }
376
377    fn should_restore(&self) -> bool {
378        self.options.storage_mode.is_some()
379    }
380
381    fn should_track_dependencies(&self) -> bool {
382        self.options.dependency_tracking
383    }
384
385    fn should_track_activeness(&self) -> bool {
386        self.options.active_tracking
387    }
388
389    fn should_track_children(&self) -> bool {
390        self.options.children_tracking
391    }
392
393    fn track_cache_hit(&self, task_type: &CachedTaskType) {
394        self.task_statistics
395            .map(|stats| stats.increment_cache_hit(task_type.native_fn));
396    }
397
398    fn track_cache_miss(&self, task_type: &CachedTaskType) {
399        self.task_statistics
400            .map(|stats| stats.increment_cache_miss(task_type.native_fn));
401    }
402}
403
404pub(crate) struct OperationGuard<'a, B: BackingStorage> {
405    backend: Option<&'a TurboTasksBackendInner<B>>,
406}
407
408impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
409    fn drop(&mut self) {
410        if let Some(backend) = self.backend {
411            let fetch_sub = backend
412                .in_progress_operations
413                .fetch_sub(1, Ordering::AcqRel);
414            if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
415                backend.operations_suspended.notify_all();
416            }
417        }
418    }
419}
420
421// Operations
422impl<B: BackingStorage> TurboTasksBackendInner<B> {
423    /// # Safety
424    ///
425    /// `tx` must be a transaction from this TurboTasksBackendInner instance.
426    unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
427        &'l self,
428        tx: Option<&'l B::ReadTransaction<'tx>>,
429        parent_task: TaskId,
430        child_task: TaskId,
431        turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
432    ) {
433        operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
434            self.execute_context_with_tx(tx, turbo_tasks)
435        });
436    }
437
438    fn connect_child(
439        &self,
440        parent_task: TaskId,
441        child_task: TaskId,
442        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
443    ) {
444        operation::ConnectChildOperation::run(
445            parent_task,
446            child_task,
447            self.execute_context(turbo_tasks),
448        );
449    }
450
451    fn try_read_task_output(
452        self: &Arc<Self>,
453        task_id: TaskId,
454        reader: Option<TaskId>,
455        consistency: ReadConsistency,
456        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
457    ) -> Result<Result<RawVc, EventListener>> {
458        if let Some(reader) = reader {
459            self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
460        }
461
462        let mut ctx = self.execute_context(turbo_tasks);
463        let mut task = ctx.task(task_id, TaskDataCategory::All);
464
465        fn listen_to_done_event<B: BackingStorage>(
466            this: &TurboTasksBackendInner<B>,
467            reader: Option<TaskId>,
468            done_event: &Event,
469        ) -> EventListener {
470            done_event.listen_with_note(move || {
471                let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
472                move || {
473                    if let Some(reader_desc) = reader_desc.as_ref() {
474                        format!("try_read_task_output from {}", reader_desc())
475                    } else {
476                        "try_read_task_output (untracked)".to_string()
477                    }
478                }
479            })
480        }
481
482        fn check_in_progress<B: BackingStorage>(
483            this: &TurboTasksBackendInner<B>,
484            task: &impl TaskGuard,
485            reader: Option<TaskId>,
486            ctx: &impl ExecuteContext<'_>,
487        ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
488        {
489            match get!(task, InProgress) {
490                Some(InProgressState::Scheduled { done_event, .. }) => {
491                    Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
492                }
493                Some(InProgressState::InProgress(box InProgressStateInner {
494                    done,
495                    done_event,
496                    ..
497                })) => {
498                    if !*done {
499                        Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
500                    } else {
501                        None
502                    }
503                }
504                Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
505                    "{} was canceled",
506                    ctx.get_task_description(task.id())
507                ))),
508                None => None,
509            }
510        }
511
512        if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
513            // Ensure it's an root node
514            loop {
515                let aggregation_number = get_aggregation_number(&task);
516                if is_root_node(aggregation_number) {
517                    break;
518                }
519                drop(task);
520                {
521                    let _span = tracing::trace_span!(
522                        "make root node for strongly consistent read",
523                        %task_id
524                    )
525                    .entered();
526                    AggregationUpdateQueue::run(
527                        AggregationUpdateJob::UpdateAggregationNumber {
528                            task_id,
529                            base_aggregation_number: u32::MAX,
530                            distance: None,
531                        },
532                        &mut ctx,
533                    );
534                }
535                task = ctx.task(task_id, TaskDataCategory::All);
536            }
537
538            let is_dirty =
539                get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
540
541            // Check the dirty count of the root node
542            let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
543                .cloned()
544                .unwrap_or_default()
545                .get(self.session_id);
546            if dirty_tasks > 0 || is_dirty {
547                let activeness = get_mut!(task, Activeness);
548                let mut task_ids_to_schedule: Vec<_> = Vec::new();
549                // When there are dirty task, subscribe to the all_clean_event
550                let activeness = if let Some(activeness) = activeness {
551                    // This makes sure all tasks stay active and this task won't stale.
552                    // active_until_clean is automatically removed when this
553                    // task is clean.
554                    activeness.set_active_until_clean();
555                    activeness
556                } else {
557                    // If we don't have a root state, add one. This also makes sure all tasks stay
558                    // active and this task won't stale. active_until_clean
559                    // is automatically removed when this task is clean.
560                    get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
561                        .set_active_until_clean();
562                    if ctx.should_track_activeness() {
563                        // A newly added Activeness need to make sure to schedule the tasks
564                        task_ids_to_schedule = get_many!(
565                            task,
566                            AggregatedDirtyContainer {
567                                task
568                            } count if count.get(self.session_id) > 0 => {
569                                task
570                            }
571                        );
572                        task_ids_to_schedule.push(task_id);
573                    }
574                    get!(task, Activeness).unwrap()
575                };
576                let listener = activeness.all_clean_event.listen_with_note(move || {
577                    let this = self.clone();
578                    let tt = turbo_tasks.pin();
579                    move || {
580                        let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
581                        let mut ctx = this.execute_context(tt);
582                        let mut visited = FxHashSet::default();
583                        fn indent(s: &str) -> String {
584                            s.split_inclusive('\n')
585                                .flat_map(|line: &str| ["  ", line].into_iter())
586                                .collect::<String>()
587                        }
588                        fn get_info(
589                            ctx: &mut impl ExecuteContext<'_>,
590                            task_id: TaskId,
591                            parent_and_count: Option<(TaskId, i32)>,
592                            visited: &mut FxHashSet<TaskId>,
593                        ) -> String {
594                            let task = ctx.task(task_id, TaskDataCategory::Data);
595                            let is_dirty = get!(task, Dirty)
596                                .map_or(false, |dirty_state| dirty_state.get(ctx.session_id()));
597                            let in_progress =
598                                get!(task, InProgress).map_or("not in progress", |p| match p {
599                                    InProgressState::InProgress(_) => "in progress",
600                                    InProgressState::Scheduled { .. } => "scheduled",
601                                    InProgressState::Canceled => "canceled",
602                                });
603                            let activeness = get!(task, Activeness).map_or_else(
604                                || "not active".to_string(),
605                                |activeness| format!("{activeness:?}"),
606                            );
607                            let aggregation_number = get_aggregation_number(&task);
608                            let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
609                            {
610                                let uppers = get_uppers(&task);
611                                !uppers.contains(&parent_task_id)
612                            } else {
613                                false
614                            };
615
616                            // Check the dirty count of the root node
617                            let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
618                                .cloned()
619                                .unwrap_or_default()
620                                .get(ctx.session_id());
621
622                            let task_description = ctx.get_task_description(task_id);
623                            let is_dirty = if is_dirty { ", dirty" } else { "" };
624                            let count = if let Some((_, count)) = parent_and_count {
625                                format!(" {count}")
626                            } else {
627                                String::new()
628                            };
629                            let mut info = format!(
630                                "{task_id} {task_description}{count} (aggr={aggregation_number}, \
631                                 {in_progress}, {activeness}{is_dirty})",
632                            );
633                            let children: Vec<_> = iter_many!(
634                                task,
635                                AggregatedDirtyContainer {
636                                    task
637                                } count => {
638                                    (task, count.get(ctx.session_id()))
639                                }
640                            )
641                            .filter(|(_, count)| *count > 0)
642                            .collect();
643                            drop(task);
644
645                            if missing_upper {
646                                info.push_str("\n  ERROR: missing upper connection");
647                            }
648
649                            if dirty_tasks > 0 || !children.is_empty() {
650                                writeln!(info, "\n  {dirty_tasks} dirty tasks:").unwrap();
651
652                                for (child_task_id, count) in children {
653                                    let task_description = ctx.get_task_description(child_task_id);
654                                    if visited.insert(child_task_id) {
655                                        let child_info = get_info(
656                                            ctx,
657                                            child_task_id,
658                                            Some((task_id, count)),
659                                            visited,
660                                        );
661                                        info.push_str(&indent(&child_info));
662                                        if !info.ends_with('\n') {
663                                            info.push('\n');
664                                        }
665                                    } else {
666                                        writeln!(
667                                            info,
668                                            "  {child_task_id} {task_description} {count} \
669                                             (already visited)"
670                                        )
671                                        .unwrap();
672                                    }
673                                }
674                            }
675                            info
676                        }
677                        let info = get_info(&mut ctx, task_id, None, &mut visited);
678                        format!(
679                            "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
680                        )
681                    }
682                });
683                drop(task);
684                if !task_ids_to_schedule.is_empty() {
685                    let mut queue = AggregationUpdateQueue::new();
686                    queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
687                    queue.execute(&mut ctx);
688                }
689
690                return Ok(Err(listener));
691            }
692        }
693
694        if let Some(value) = check_in_progress(self, &task, reader, &ctx) {
695            return value;
696        }
697
698        if let Some(output) = get!(task, Output) {
699            let result = match output {
700                OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
701                OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
702                OutputValue::Error(error) => {
703                    let err: anyhow::Error = error.clone().into();
704                    Err(err.context(format!(
705                        "Execution of {} failed",
706                        ctx.get_task_description(task_id)
707                    )))
708                }
709            };
710            if self.should_track_dependencies()
711                && let Some(reader) = reader
712                && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
713            {
714                let _ = task.add(CachedDataItem::OutputDependent {
715                    task: reader,
716                    value: (),
717                });
718                drop(task);
719
720                let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
721                if reader_task
722                    .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
723                    .is_none()
724                {
725                    let _ = reader_task.add(CachedDataItem::OutputDependency {
726                        target: task_id,
727                        value: (),
728                    });
729                }
730            }
731
732            return result;
733        }
734
735        let note = move || {
736            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
737            move || {
738                if let Some(reader_desc) = reader_desc.as_ref() {
739                    format!("try_read_task_output (recompute) from {}", (reader_desc)())
740                } else {
741                    "try_read_task_output (recompute, untracked)".to_string()
742                }
743            }
744        };
745
746        // Output doesn't exist. We need to schedule the task to compute it.
747        let (item, listener) = CachedDataItem::new_scheduled_with_listener(
748            TaskExecutionReason::OutputNotAvailable,
749            || self.get_task_desc_fn(task_id),
750            note,
751        );
752        // It's not possible that the task is InProgress at this point. If it is InProgress {
753        // done: true } it must have Output and would early return.
754        task.add_new(item);
755        ctx.schedule_task(task);
756
757        Ok(Err(listener))
758    }
759
760    fn try_read_task_cell(
761        &self,
762        task_id: TaskId,
763        reader: Option<TaskId>,
764        cell: CellId,
765        options: ReadCellOptions,
766        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
767    ) -> Result<Result<TypedCellContent, EventListener>> {
768        if let Some(reader) = reader {
769            self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
770        }
771
772        fn add_cell_dependency<B: BackingStorage>(
773            backend: &TurboTasksBackendInner<B>,
774            mut task: impl TaskGuard,
775            reader: Option<TaskId>,
776            cell: CellId,
777            task_id: TaskId,
778            ctx: &mut impl ExecuteContext<'_>,
779        ) {
780            if backend.should_track_dependencies()
781                && let Some(reader) = reader
782                // We never want to have a dependency on ourselves, otherwise we end up in a
783                // loop of re-executing the same task.
784                && reader != task_id
785                && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
786            {
787                let _ = task.add(CachedDataItem::CellDependent {
788                    cell,
789                    task: reader,
790                    value: (),
791                });
792                drop(task);
793
794                let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
795                let target = CellRef {
796                    task: task_id,
797                    cell,
798                };
799                if reader_task
800                    .remove(&CachedDataItemKey::OutdatedCellDependency { target })
801                    .is_none()
802                {
803                    let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
804                }
805            }
806        }
807
808        let mut ctx = self.execute_context(turbo_tasks);
809        let mut task = ctx.task(task_id, TaskDataCategory::Data);
810        let content = if options.final_read_hint {
811            remove!(task, CellData { cell })
812        } else if let Some(content) = get!(task, CellData { cell }) {
813            let content = content.clone();
814            Some(content)
815        } else {
816            None
817        };
818        if let Some(content) = content {
819            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
820            return Ok(Ok(TypedCellContent(
821                cell.type_id,
822                CellContent(Some(content.reference)),
823            )));
824        }
825
826        let in_progress = get!(task, InProgress);
827        if matches!(
828            in_progress,
829            Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
830        ) {
831            return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
832        }
833        let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
834        let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));
835
836        // Check cell index range (cell might not exist at all)
837        let max_id = get!(
838            task,
839            CellTypeMaxIndex {
840                cell_type: cell.type_id
841            }
842        )
843        .copied();
844        let Some(max_id) = max_id else {
845            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
846            bail!(
847                "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
848                ctx.get_task_description(task_id)
849            );
850        };
851        if cell.index >= max_id {
852            add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
853            bail!(
854                "Cell {cell:?} no longer exists in task {} (index out of bounds)",
855                ctx.get_task_description(task_id)
856            );
857        }
858
859        // Cell should exist, but data was dropped or is not serializable. We need to recompute the
860        // task the get the cell content.
861
862        // Listen to the cell and potentially schedule the task
863        let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
864        if !new_listener {
865            return Ok(Err(listener));
866        }
867
868        let _span = tracing::trace_span!(
869            "recomputation",
870            cell_type = registry::get_value_type_global_name(cell.type_id),
871            cell_index = cell.index
872        )
873        .entered();
874
875        // Schedule the task, if not already scheduled
876        if is_cancelled {
877            bail!("{} was canceled", ctx.get_task_description(task_id));
878        } else if !is_scheduled
879            && task.add(CachedDataItem::new_scheduled(
880                TaskExecutionReason::CellNotAvailable,
881                || self.get_task_desc_fn(task_id),
882            ))
883        {
884            ctx.schedule_task(task);
885        }
886
887        Ok(Err(listener))
888    }
889
890    fn listen_to_cell(
891        &self,
892        task: &mut impl TaskGuard,
893        task_id: TaskId,
894        reader: Option<TaskId>,
895        cell: CellId,
896    ) -> (EventListener, bool) {
897        let note = move || {
898            let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
899            move || {
900                if let Some(reader_desc) = reader_desc.as_ref() {
901                    format!("try_read_task_cell (in progress) from {}", (reader_desc)())
902                } else {
903                    "try_read_task_cell (in progress, untracked)".to_string()
904                }
905            }
906        };
907        if let Some(in_progress) = get!(task, InProgressCell { cell }) {
908            // Someone else is already computing the cell
909            let listener = in_progress.event.listen_with_note(note);
910            return (listener, false);
911        }
912        let in_progress = InProgressCellState::new(task_id, cell);
913        let listener = in_progress.event.listen_with_note(note);
914        task.add_new(CachedDataItem::InProgressCell {
915            cell,
916            value: in_progress,
917        });
918        (listener, true)
919    }
920
921    fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
922        if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
923            return Some(task_type);
924        }
925        if self.should_restore()
926            && self.local_is_partial.load(Ordering::Acquire)
927            && !task_id.is_transient()
928            && let Some(task_type) = unsafe {
929                self.backing_storage
930                    .reverse_lookup_task_cache(None, task_id)
931                    .expect("Failed to lookup task type")
932            }
933        {
934            let _ = self.task_cache.try_insert(task_type.clone(), task_id);
935            return Some(task_type);
936        }
937        None
938    }
939
940    fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
941        let task_type = self.lookup_task_type(task_id);
942        move || {
943            task_type.as_ref().map_or_else(
944                || format!("{task_id:?} transient"),
945                |task_type| format!("{task_id:?} {task_type}"),
946            )
947        }
948    }
949
950    fn snapshot(&self) -> Option<(Instant, bool)> {
951        let start = Instant::now();
952        debug_assert!(self.should_persist());
953        let mut snapshot_request = self.snapshot_request.lock();
954        snapshot_request.snapshot_requested = true;
955        let active_operations = self
956            .in_progress_operations
957            .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
958        if active_operations != 0 {
959            self.operations_suspended
960                .wait_while(&mut snapshot_request, |_| {
961                    self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
962                });
963        }
964        let suspended_operations = snapshot_request
965            .suspended_operations
966            .iter()
967            .map(|op| op.arc().clone())
968            .collect::<Vec<_>>();
969        drop(snapshot_request);
970        let mut persisted_task_cache_log = self
971            .persisted_task_cache_log
972            .as_ref()
973            .map(|l| l.take(|i| i))
974            .unwrap_or_default();
975        self.storage.start_snapshot();
976        let mut snapshot_request = self.snapshot_request.lock();
977        snapshot_request.snapshot_requested = false;
978        self.in_progress_operations
979            .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
980        self.snapshot_completed.notify_all();
981        let snapshot_time = Instant::now();
982        drop(snapshot_request);
983
984        let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
985            if task_id.is_transient() {
986                return (None, None);
987            }
988            let len = inner.len();
989
990            let meta_restored = inner.state().meta_restored();
991            let data_restored = inner.state().data_restored();
992
993            let mut meta = meta_restored.then(|| Vec::with_capacity(len));
994            let mut data = data_restored.then(|| Vec::with_capacity(len));
995            for (key, value) in inner.iter_all() {
996                if key.is_persistent() && value.is_persistent() {
997                    match key.category() {
998                        TaskDataCategory::Meta => {
999                            if let Some(meta) = &mut meta {
1000                                meta.push(CachedDataItem::from_key_and_value_ref(key, value))
1001                            }
1002                        }
1003                        TaskDataCategory::Data => {
1004                            if let Some(data) = &mut data {
1005                                data.push(CachedDataItem::from_key_and_value_ref(key, value))
1006                            }
1007                        }
1008                        _ => {}
1009                    }
1010                }
1011            }
1012
1013            (meta, data)
1014        };
1015        let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
1016            (
1017                task_id,
1018                meta.map(|d| self.backing_storage.serialize(task_id, &d)),
1019                data.map(|d| self.backing_storage.serialize(task_id, &d)),
1020            )
1021        };
1022        let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
1023            if task_id.is_transient() {
1024                return (task_id, None, None);
1025            }
1026            let len = inner.len();
1027            let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
1028            let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
1029            for (key, value) in inner.iter_all() {
1030                if key.is_persistent() && value.is_persistent() {
1031                    match key.category() {
1032                        TaskDataCategory::Meta => {
1033                            if let Some(meta) = &mut meta {
1034                                meta.push(CachedDataItem::from_key_and_value_ref(key, value));
1035                            }
1036                        }
1037                        TaskDataCategory::Data => {
1038                            if let Some(data) = &mut data {
1039                                data.push(CachedDataItem::from_key_and_value_ref(key, value));
1040                            }
1041                        }
1042                        _ => {}
1043                    }
1044                }
1045            }
1046            (
1047                task_id,
1048                meta.map(|meta| self.backing_storage.serialize(task_id, &meta)),
1049                data.map(|data| self.backing_storage.serialize(task_id, &data)),
1050            )
1051        };
1052
1053        let snapshot = {
1054            let _span = tracing::trace_span!("take snapshot");
1055            self.storage
1056                .take_snapshot(&preprocess, &process, &process_snapshot)
1057        };
1058
1059        #[cfg(feature = "print_cache_item_size")]
1060        #[derive(Default)]
1061        struct TaskCacheStats {
1062            data: usize,
1063            data_count: usize,
1064            meta: usize,
1065            meta_count: usize,
1066        }
1067        #[cfg(feature = "print_cache_item_size")]
1068        impl TaskCacheStats {
1069            fn add_data(&mut self, len: usize) {
1070                self.data += len;
1071                self.data_count += 1;
1072            }
1073
1074            fn add_meta(&mut self, len: usize) {
1075                self.meta += len;
1076                self.meta_count += 1;
1077            }
1078        }
1079        #[cfg(feature = "print_cache_item_size")]
1080        let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1081            Mutex::new(FxHashMap::default());
1082
1083        let task_snapshots = snapshot
1084            .into_iter()
1085            .filter_map(|iter| {
1086                let mut iter = iter
1087                    .filter_map(
1088                        |(task_id, meta, data): (
1089                            _,
1090                            Option<Result<SmallVec<_>>>,
1091                            Option<Result<SmallVec<_>>>,
1092                        )| {
1093                            let meta = match meta {
1094                                Some(Ok(meta)) => {
1095                                    #[cfg(feature = "print_cache_item_size")]
1096                                    task_cache_stats
1097                                        .lock()
1098                                        .entry(self.get_task_description(task_id))
1099                                        .or_default()
1100                                        .add_meta(meta.len());
1101                                    Some(meta)
1102                                }
1103                                None => None,
1104                                Some(Err(err)) => {
1105                                    println!(
1106                                        "Serializing task {} failed (meta): {:?}",
1107                                        self.get_task_description(task_id),
1108                                        err
1109                                    );
1110                                    None
1111                                }
1112                            };
1113                            let data = match data {
1114                                Some(Ok(data)) => {
1115                                    #[cfg(feature = "print_cache_item_size")]
1116                                    task_cache_stats
1117                                        .lock()
1118                                        .entry(self.get_task_description(task_id))
1119                                        .or_default()
1120                                        .add_data(data.len());
1121                                    Some(data)
1122                                }
1123                                None => None,
1124                                Some(Err(err)) => {
1125                                    println!(
1126                                        "Serializing task {} failed (data): {:?}",
1127                                        self.get_task_description(task_id),
1128                                        err
1129                                    );
1130                                    None
1131                                }
1132                            };
1133                            (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1134                        },
1135                    )
1136                    .peekable();
1137                iter.peek().is_some().then_some(iter)
1138            })
1139            .collect::<Vec<_>>();
1140
1141        swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1142
1143        let mut new_items = false;
1144
1145        if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
1146            new_items = true;
1147            if let Err(err) = self.backing_storage.save_snapshot(
1148                self.session_id,
1149                suspended_operations,
1150                persisted_task_cache_log,
1151                task_snapshots,
1152            ) {
1153                println!("Persisting failed: {err:?}");
1154                return None;
1155            }
1156            #[cfg(feature = "print_cache_item_size")]
1157            {
1158                let mut task_cache_stats = task_cache_stats
1159                    .into_inner()
1160                    .into_iter()
1161                    .collect::<Vec<_>>();
1162                if !task_cache_stats.is_empty() {
1163                    task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1164                        (stats_b.data + stats_b.meta, key_b)
1165                            .cmp(&(stats_a.data + stats_a.meta, key_a))
1166                    });
1167                    println!("Task cache stats:");
1168                    for (task_desc, stats) in task_cache_stats {
1169                        use std::ops::Div;
1170
1171                        use turbo_tasks::util::FormatBytes;
1172
1173                        println!(
1174                            "  {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1175                            FormatBytes(stats.data + stats.meta),
1176                            FormatBytes(stats.meta),
1177                            stats.meta_count,
1178                            FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1179                            FormatBytes(stats.data),
1180                            stats.data_count,
1181                            FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1182                        );
1183                    }
1184                }
1185            }
1186        }
1187
1188        if new_items {
1189            let elapsed = start.elapsed();
1190            turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1191                "Finished writing to persistent cache".to_string(),
1192                elapsed,
1193            )));
1194        }
1195
1196        Some((snapshot_time, new_items))
1197    }
1198
1199    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1200        if self.should_restore() {
1201            // Continue all uncompleted operations
1202            // They can't be interrupted by a snapshot since the snapshotting job has not been
1203            // scheduled yet.
1204            let uncompleted_operations = self
1205                .backing_storage
1206                .uncompleted_operations()
1207                .expect("Failed to get uncompleted operations");
1208            if !uncompleted_operations.is_empty() {
1209                let mut ctx = self.execute_context(turbo_tasks);
1210                for op in uncompleted_operations {
1211                    op.execute(&mut ctx);
1212                }
1213            }
1214        }
1215
1216        if self.should_persist() {
1217            // Schedule the snapshot job
1218            turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1219        }
1220    }
1221
1222    fn stopping(&self) {
1223        self.stopping.store(true, Ordering::Release);
1224        self.stopping_event.notify(usize::MAX);
1225    }
1226
1227    #[allow(unused_variables)]
1228    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1229        #[cfg(feature = "verify_aggregation_graph")]
1230        {
1231            self.is_idle.store(false, Ordering::Release);
1232            self.verify_aggregation_graph(turbo_tasks, false);
1233        }
1234        if self.should_persist() {
1235            self.snapshot();
1236        }
1237        self.task_cache.drop_contents();
1238        drop_contents(&self.transient_tasks);
1239        self.storage.drop_contents();
1240        if let Err(err) = self.backing_storage.shutdown() {
1241            println!("Shutting down failed: {err}");
1242        }
1243    }
1244
1245    #[allow(unused_variables)]
1246    fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1247        self.idle_start_event.notify(usize::MAX);
1248
1249        #[cfg(feature = "verify_aggregation_graph")]
1250        {
1251            use tokio::select;
1252
1253            self.is_idle.store(true, Ordering::Release);
1254            let this = self.clone();
1255            let turbo_tasks = turbo_tasks.pin();
1256            tokio::task::spawn(async move {
1257                select! {
1258                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
1259                        // do nothing
1260                    }
1261                    _ = this.idle_end_event.listen() => {
1262                        return;
1263                    }
1264                }
1265                if !this.is_idle.load(Ordering::Relaxed) {
1266                    return;
1267                }
1268                this.verify_aggregation_graph(&*turbo_tasks, true);
1269            });
1270        }
1271    }
1272
1273    fn idle_end(&self) {
1274        #[cfg(feature = "verify_aggregation_graph")]
1275        self.is_idle.store(false, Ordering::Release);
1276        self.idle_end_event.notify(usize::MAX);
1277    }
1278
1279    fn get_or_create_persistent_task(
1280        &self,
1281        task_type: CachedTaskType,
1282        parent_task: TaskId,
1283        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1284    ) -> TaskId {
1285        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1286            self.track_cache_hit(&task_type);
1287            self.connect_child(parent_task, task_id, turbo_tasks);
1288            return task_id;
1289        }
1290
1291        let check_backing_storage =
1292            self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1293        let tx = check_backing_storage
1294            .then(|| self.backing_storage.start_read_transaction())
1295            .flatten();
1296        let task_id = {
1297            // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1298            if let Some(task_id) = unsafe {
1299                check_backing_storage
1300                    .then(|| {
1301                        self.backing_storage
1302                            .forward_lookup_task_cache(tx.as_ref(), &task_type)
1303                            .expect("Failed to lookup task id")
1304                    })
1305                    .flatten()
1306            } {
1307                self.track_cache_hit(&task_type);
1308                let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1309                task_id
1310            } else {
1311                let task_type = Arc::new(task_type);
1312                let task_id = self.persisted_task_id_factory.get();
1313                let task_id = if let Err(existing_task_id) =
1314                    self.task_cache.try_insert(task_type.clone(), task_id)
1315                {
1316                    self.track_cache_hit(&task_type);
1317                    // Safety: We just created the id and failed to insert it.
1318                    unsafe {
1319                        self.persisted_task_id_factory.reuse(task_id);
1320                    }
1321                    existing_task_id
1322                } else {
1323                    self.track_cache_miss(&task_type);
1324                    task_id
1325                };
1326                if let Some(log) = &self.persisted_task_cache_log {
1327                    log.lock(task_id).push((task_type, task_id));
1328                }
1329                task_id
1330            }
1331        };
1332
1333        // Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
1334        unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1335
1336        task_id
1337    }
1338
1339    fn get_or_create_transient_task(
1340        &self,
1341        task_type: CachedTaskType,
1342        parent_task: TaskId,
1343        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1344    ) -> TaskId {
1345        if !parent_task.is_transient() {
1346            self.panic_persistent_calling_transient(
1347                self.lookup_task_type(parent_task).as_deref(),
1348                Some(&task_type),
1349                /* cell_id */ None,
1350            );
1351        }
1352        if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1353            self.track_cache_hit(&task_type);
1354            self.connect_child(parent_task, task_id, turbo_tasks);
1355            return task_id;
1356        }
1357
1358        let task_type = Arc::new(task_type);
1359        let task_id = self.transient_task_id_factory.get();
1360        if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1361            self.track_cache_hit(&task_type);
1362            // Safety: We just created the id and failed to insert it.
1363            unsafe {
1364                self.transient_task_id_factory.reuse(task_id);
1365            }
1366            self.connect_child(parent_task, existing_task_id, turbo_tasks);
1367            return existing_task_id;
1368        }
1369
1370        self.track_cache_miss(&task_type);
1371        self.connect_child(parent_task, task_id, turbo_tasks);
1372
1373        task_id
1374    }
1375
1376    /// Generate an object that implements [`fmt::Display`] explaining why the given
1377    /// [`CachedTaskType`] is transient.
1378    fn debug_trace_transient_task(
1379        &self,
1380        task_type: &CachedTaskType,
1381        cell_id: Option<CellId>,
1382    ) -> DebugTraceTransientTask {
1383        // it shouldn't be possible to have cycles in tasks, but we could have an exponential blowup
1384        // from tracing the same task many times, so use a visited_set
1385        fn inner_id(
1386            backend: &TurboTasksBackendInner<impl BackingStorage>,
1387            task_id: TaskId,
1388            cell_type_id: Option<ValueTypeId>,
1389            visited_set: &mut FxHashSet<TaskId>,
1390        ) -> DebugTraceTransientTask {
1391            if let Some(task_type) = backend.lookup_task_type(task_id) {
1392                if visited_set.contains(&task_id) {
1393                    let task_name = task_type.get_name();
1394                    DebugTraceTransientTask::Collapsed {
1395                        task_name,
1396                        cell_type_id,
1397                    }
1398                } else {
1399                    inner_cached(backend, &task_type, cell_type_id, visited_set)
1400                }
1401            } else {
1402                DebugTraceTransientTask::Uncached { cell_type_id }
1403            }
1404        }
1405        fn inner_cached(
1406            backend: &TurboTasksBackendInner<impl BackingStorage>,
1407            task_type: &CachedTaskType,
1408            cell_type_id: Option<ValueTypeId>,
1409            visited_set: &mut FxHashSet<TaskId>,
1410        ) -> DebugTraceTransientTask {
1411            let task_name = task_type.get_name();
1412
1413            let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1414                let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1415                    // `task_id` should never be `None` at this point, as that would imply a
1416                    // non-local task is returning a local `Vc`...
1417                    // Just ignore if it happens, as we're likely already panicking.
1418                    return None;
1419                };
1420                if task_id.is_transient() {
1421                    Some(Box::new(inner_id(
1422                        backend,
1423                        task_id,
1424                        cause_self_raw_vc.try_get_type_id(),
1425                        visited_set,
1426                    )))
1427                } else {
1428                    None
1429                }
1430            });
1431            let cause_args = task_type
1432                .arg
1433                .get_raw_vcs()
1434                .into_iter()
1435                .filter_map(|raw_vc| {
1436                    let Some(task_id) = raw_vc.try_get_task_id() else {
1437                        // `task_id` should never be `None` (see comment above)
1438                        return None;
1439                    };
1440                    if !task_id.is_transient() {
1441                        return None;
1442                    }
1443                    Some((task_id, raw_vc.try_get_type_id()))
1444                })
1445                .collect::<IndexSet<_>>() // dedupe
1446                .into_iter()
1447                .map(|(task_id, cell_type_id)| {
1448                    inner_id(backend, task_id, cell_type_id, visited_set)
1449                })
1450                .collect();
1451
1452            DebugTraceTransientTask::Cached {
1453                task_name,
1454                cell_type_id,
1455                cause_self,
1456                cause_args,
1457            }
1458        }
1459        inner_cached(
1460            self,
1461            task_type,
1462            cell_id.map(|c| c.type_id),
1463            &mut FxHashSet::default(),
1464        )
1465    }
1466
1467    fn invalidate_task(
1468        &self,
1469        task_id: TaskId,
1470        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1471    ) {
1472        if !self.should_track_dependencies() {
1473            panic!("Dependency tracking is disabled so invalidation is not allowed");
1474        }
1475        operation::InvalidateOperation::run(
1476            smallvec![task_id],
1477            #[cfg(feature = "trace_task_dirty")]
1478            TaskDirtyCause::Invalidator,
1479            self.execute_context(turbo_tasks),
1480        );
1481    }
1482
1483    fn invalidate_tasks(
1484        &self,
1485        tasks: &[TaskId],
1486        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1487    ) {
1488        if !self.should_track_dependencies() {
1489            panic!("Dependency tracking is disabled so invalidation is not allowed");
1490        }
1491        operation::InvalidateOperation::run(
1492            tasks.iter().copied().collect(),
1493            #[cfg(feature = "trace_task_dirty")]
1494            TaskDirtyCause::Unknown,
1495            self.execute_context(turbo_tasks),
1496        );
1497    }
1498
1499    fn invalidate_tasks_set(
1500        &self,
1501        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1502        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1503    ) {
1504        if !self.should_track_dependencies() {
1505            panic!("Dependency tracking is disabled so invalidation is not allowed");
1506        }
1507        operation::InvalidateOperation::run(
1508            tasks.iter().copied().collect(),
1509            #[cfg(feature = "trace_task_dirty")]
1510            TaskDirtyCause::Unknown,
1511            self.execute_context(turbo_tasks),
1512        );
1513    }
1514
1515    fn invalidate_serialization(
1516        &self,
1517        task_id: TaskId,
1518        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1519    ) {
1520        if task_id.is_transient() {
1521            return;
1522        }
1523        let mut ctx = self.execute_context(turbo_tasks);
1524        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1525        task.invalidate_serialization();
1526    }
1527
1528    fn get_task_description(&self, task_id: TaskId) -> String {
1529        self.lookup_task_type(task_id).map_or_else(
1530            || format!("{task_id:?} transient"),
1531            |task_type| task_type.to_string(),
1532        )
1533    }
1534
1535    fn task_execution_canceled(
1536        &self,
1537        task_id: TaskId,
1538        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1539    ) {
1540        let mut ctx = self.execute_context(turbo_tasks);
1541        let mut task = ctx.task(task_id, TaskDataCategory::Data);
1542        if let Some(in_progress) = remove!(task, InProgress) {
1543            match in_progress {
1544                InProgressState::Scheduled {
1545                    done_event,
1546                    reason: _,
1547                } => done_event.notify(usize::MAX),
1548                InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1549                    done_event.notify(usize::MAX)
1550                }
1551                InProgressState::Canceled => {}
1552            }
1553        }
1554        task.add_new(CachedDataItem::InProgress {
1555            value: InProgressState::Canceled,
1556        });
1557    }
1558
1559    fn try_start_task_execution(
1560        &self,
1561        task_id: TaskId,
1562        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1563    ) -> Option<TaskExecutionSpec<'_>> {
1564        enum TaskType {
1565            Cached(Arc<CachedTaskType>),
1566            Transient(Arc<TransientTask>),
1567        }
1568        let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1569            (TaskType::Cached(task_type), false)
1570        } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1571            (
1572                TaskType::Transient(task_type.clone()),
1573                matches!(**task_type, TransientTask::Once(_)),
1574            )
1575        } else {
1576            return None;
1577        };
1578        let execution_reason;
1579        {
1580            let mut ctx = self.execute_context(turbo_tasks);
1581            let mut task = ctx.task(task_id, TaskDataCategory::All);
1582            let in_progress = remove!(task, InProgress)?;
1583            let InProgressState::Scheduled { done_event, reason } = in_progress else {
1584                task.add_new(CachedDataItem::InProgress { value: in_progress });
1585                return None;
1586            };
1587            execution_reason = reason;
1588            task.add_new(CachedDataItem::InProgress {
1589                value: InProgressState::InProgress(Box::new(InProgressStateInner {
1590                    stale: false,
1591                    once_task,
1592                    done_event,
1593                    session_dependent: false,
1594                    marked_as_completed: false,
1595                    done: false,
1596                    new_children: Default::default(),
1597                })),
1598            });
1599
1600            if self.should_track_children() {
1601                // Make all current collectibles outdated (remove left-over outdated collectibles)
1602                enum Collectible {
1603                    Current(CollectibleRef, i32),
1604                    Outdated(CollectibleRef),
1605                }
1606                let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1607                    .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1608                    .collect::<Vec<_>>();
1609                for collectible in collectibles {
1610                    match collectible {
1611                        Collectible::Current(collectible, value) => {
1612                            let _ = task
1613                                .insert(CachedDataItem::OutdatedCollectible { collectible, value });
1614                        }
1615                        Collectible::Outdated(collectible) => {
1616                            if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1617                                task.remove(&CachedDataItemKey::OutdatedCollectible {
1618                                    collectible,
1619                                });
1620                            }
1621                        }
1622                    }
1623                }
1624            }
1625
1626            if self.should_track_dependencies() {
1627                // Make all dependencies outdated
1628                enum Dep {
1629                    CurrentCell(CellRef),
1630                    CurrentOutput(TaskId),
1631                    OutdatedCell(CellRef),
1632                    OutdatedOutput(TaskId),
1633                }
1634                let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1635                    .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1636                    .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1637                    .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1638                    .collect::<Vec<_>>();
1639                for dep in dependencies {
1640                    match dep {
1641                        Dep::CurrentCell(cell) => {
1642                            let _ = task.add(CachedDataItem::OutdatedCellDependency {
1643                                target: cell,
1644                                value: (),
1645                            });
1646                        }
1647                        Dep::CurrentOutput(output) => {
1648                            let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1649                                target: output,
1650                                value: (),
1651                            });
1652                        }
1653                        Dep::OutdatedCell(cell) => {
1654                            if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1655                                task.remove(&CachedDataItemKey::OutdatedCellDependency {
1656                                    target: cell,
1657                                });
1658                            }
1659                        }
1660                        Dep::OutdatedOutput(output) => {
1661                            if !task
1662                                .has_key(&CachedDataItemKey::OutputDependency { target: output })
1663                            {
1664                                task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1665                                    target: output,
1666                                });
1667                            }
1668                        }
1669                    }
1670                }
1671            }
1672        }
1673
1674        let (span, future) = match task_type {
1675            TaskType::Cached(task_type) => {
1676                let CachedTaskType {
1677                    native_fn,
1678                    this,
1679                    arg,
1680                } = &*task_type;
1681                (
1682                    native_fn.span(task_id.persistence(), execution_reason),
1683                    native_fn.execute(*this, &**arg),
1684                )
1685            }
1686            TaskType::Transient(task_type) => {
1687                let span = tracing::trace_span!("turbo_tasks::root_task");
1688                let future = match &*task_type {
1689                    TransientTask::Root(f) => f(),
1690                    TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1691                };
1692                (span, future)
1693            }
1694        };
1695        Some(TaskExecutionSpec { future, span })
1696    }
1697
1698    fn task_execution_result(
1699        &self,
1700        task_id: TaskId,
1701        result: Result<RawVc, TurboTasksExecutionError>,
1702        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1703    ) {
1704        operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
1705    }
1706
1707    fn task_execution_completed(
1708        &self,
1709        task_id: TaskId,
1710        _duration: Duration,
1711        _memory_usage: usize,
1712        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1713        stateful: bool,
1714        has_invalidator: bool,
1715        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1716    ) -> bool {
1717        // Task completion is a 4 step process:
1718        // 1. Remove old edges (dependencies, collectibles, children, cells) and update the
1719        //    aggregation number of the task and the new children.
1720        // 2. Connect the new children to the task (and do the relevant aggregation updates).
1721        // 3. Remove dirty flag (and propagate that to uppers) and remove the in-progress state.
1722        // 4. Shrink the task memory to reduce footprint of the task.
1723
1724        // Due to persistence it is possible that the process is cancelled after any step. This is
1725        // ok, since the dirty flag won't be removed until step 3 and step 4 is only affecting the
1726        // in-memory representation.
1727
1728        // The task might be invalidated during this process, so we need to change the stale flag
1729        // at the start of every step.
1730
1731        let span = tracing::trace_span!("task execution completed", immutable = Empty).entered();
1732        let mut ctx = self.execute_context(turbo_tasks);
1733
1734        //// STEP 1 ////
1735
1736        let mut task = ctx.task(task_id, TaskDataCategory::All);
1737        let Some(in_progress) = get_mut!(task, InProgress) else {
1738            panic!("Task execution completed, but task is not in progress: {task:#?}");
1739        };
1740        let &mut InProgressState::InProgress(box InProgressStateInner {
1741            stale,
1742            ref mut done,
1743            ref done_event,
1744            ref mut new_children,
1745            session_dependent,
1746            ..
1747        }) = in_progress
1748        else {
1749            panic!("Task execution completed, but task is not in progress: {task:#?}");
1750        };
1751
1752        // If the task is stale, reschedule it
1753        #[cfg(not(feature = "no_fast_stale"))]
1754        if stale {
1755            let Some(InProgressState::InProgress(box InProgressStateInner {
1756                done_event,
1757                mut new_children,
1758                ..
1759            })) = remove!(task, InProgress)
1760            else {
1761                unreachable!();
1762            };
1763            task.add_new(CachedDataItem::InProgress {
1764                value: InProgressState::Scheduled {
1765                    done_event,
1766                    reason: TaskExecutionReason::Stale,
1767                },
1768            });
1769            // Remove old children from new_children to leave only the children that had their
1770            // active count increased
1771            for task in iter_many!(task, Child { task } => task) {
1772                new_children.remove(&task);
1773            }
1774            drop(task);
1775
1776            // We need to undo the active count increase for the children since we throw away the
1777            // new_children list now.
1778            AggregationUpdateQueue::run(
1779                AggregationUpdateJob::DecreaseActiveCounts {
1780                    task_ids: new_children.into_iter().collect(),
1781                },
1782                &mut ctx,
1783            );
1784            return true;
1785        }
1786
1787        if cfg!(not(feature = "no_fast_stale")) || !stale {
1788            // mark the task as completed, so dependent tasks can continue working
1789            *done = true;
1790            done_event.notify(usize::MAX);
1791        }
1792
1793        // take the children from the task to process them
1794        let mut new_children = take(new_children);
1795
1796        // handle stateful
1797        if stateful {
1798            let _ = task.add(CachedDataItem::Stateful { value: () });
1799        }
1800
1801        // handle has_invalidator
1802        if has_invalidator {
1803            let _ = task.add(CachedDataItem::HasInvalidator { value: () });
1804        }
1805
1806        // handle cell counters: update max index and remove cells that are no longer used
1807        let old_counters: FxHashMap<_, _> =
1808            get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1809        let mut counters_to_remove = old_counters.clone();
1810        for (&cell_type, &max_index) in cell_counters.iter() {
1811            if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1812                if old_max_index != max_index {
1813                    task.insert(CachedDataItem::CellTypeMaxIndex {
1814                        cell_type,
1815                        value: max_index,
1816                    });
1817                }
1818            } else {
1819                task.add_new(CachedDataItem::CellTypeMaxIndex {
1820                    cell_type,
1821                    value: max_index,
1822                });
1823            }
1824        }
1825        for (cell_type, _) in counters_to_remove {
1826            task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1827        }
1828
1829        let mut queue = AggregationUpdateQueue::new();
1830
1831        let mut removed_data = Vec::new();
1832        let mut old_edges = Vec::new();
1833
1834        let has_children = !new_children.is_empty();
1835        let is_immutable = task.is_immutable();
1836        let task_dependencies_for_immutable =
1837            // Task was previously marked as immutable
1838            if !is_immutable
1839            // Task is not session dependent (session dependent tasks can change between sessions)
1840            && !session_dependent
1841            // Task has no invalidator
1842            && !task.has_key(&CachedDataItemKey::HasInvalidator {})
1843            // This is a hack for the streaming hack.
1844            && !task.has_key(&CachedDataItemKey::Stateful {})
1845            // Task has no dependencies on collectibles
1846            && count!(task, CollectiblesDependency) == 0
1847        {
1848            Some(
1849                // Collect all dependencies on tasks to check if all dependencies are immutable
1850                iter_many!(task, OutputDependency { target } => target)
1851                    .chain(iter_many!(task, CellDependency { target } => target.task))
1852                    .collect::<FxHashSet<_>>(),
1853            )
1854        } else {
1855            None
1856        };
1857
1858        // Prepare all new children
1859        if has_children {
1860            prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1861        }
1862
1863        // Filter actual new children
1864        if has_children {
1865            old_edges.extend(
1866                iter_many!(task, Child { task } => task)
1867                    .filter(|task| !new_children.remove(task))
1868                    .map(OutdatedEdge::Child),
1869            );
1870        } else {
1871            old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
1872        }
1873
1874        // Remove no longer existing cells and
1875        // find all outdated data items (removed cells, outdated edges)
1876        // Note: For persistent tasks we only want to call extract_if when there are actual cells to
1877        // remove to avoid tracking that as modification.
1878        if task_id.is_transient() || iter_many!(task, CellData { cell }
1879            if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
1880        ).count() > 0 {
1881            removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
1882                matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
1883                            .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
1884            }));
1885        }
1886        if self.should_track_children() {
1887            old_edges.extend(
1888                task.iter(CachedDataItemType::OutdatedCollectible)
1889                    .filter_map(|(key, value)| match (key, value) {
1890                        (
1891                            CachedDataItemKey::OutdatedCollectible { collectible },
1892                            CachedDataItemValueRef::OutdatedCollectible { value },
1893                        ) => Some(OutdatedEdge::Collectible(collectible, *value)),
1894                        _ => None,
1895                    }),
1896            );
1897        }
1898        if self.should_track_dependencies() {
1899            old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
1900            old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
1901            old_edges.extend(
1902                iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
1903                    |(cell, task)| {
1904                        if cell_counters
1905                            .get(&cell.type_id)
1906                            .is_none_or(|start_index| cell.index >= *start_index)
1907                            && let Some(old_counter) = old_counters.get(&cell.type_id)
1908                            && cell.index < *old_counter
1909                        {
1910                            return Some(OutdatedEdge::RemovedCellDependent {
1911                                task_id: task,
1912                                #[cfg(feature = "trace_task_dirty")]
1913                                value_type_id: cell.type_id,
1914                            });
1915                        }
1916                        None
1917                    },
1918                ),
1919            );
1920        }
1921
1922        drop(task);
1923
1924        // Check if the task can be marked as immutable
1925        let mut is_now_immutable = false;
1926        if let Some(dependencies) = task_dependencies_for_immutable
1927            && dependencies
1928                .iter()
1929                .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).is_immutable())
1930        {
1931            is_now_immutable = true;
1932        }
1933        span.record("immutable", is_immutable || is_now_immutable);
1934
1935        if !queue.is_empty() || !old_edges.is_empty() {
1936            #[cfg(feature = "trace_task_completion")]
1937            let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
1938            // Remove outdated edges first, before removing in_progress+dirty flag.
1939            // We need to make sure all outdated edges are removed before the task can potentially
1940            // be scheduled and executed again
1941            CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx);
1942        }
1943
1944        // When restoring from persistent caching the following might not be executed (since we can
1945        // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
1946        // would be executed again.
1947
1948        //// STEP 2 ////
1949
1950        let mut task = ctx.task(task_id, TaskDataCategory::All);
1951        let Some(in_progress) = get!(task, InProgress) else {
1952            panic!("Task execution completed, but task is not in progress: {task:#?}");
1953        };
1954        let InProgressState::InProgress(box InProgressStateInner {
1955            #[cfg(not(feature = "no_fast_stale"))]
1956            stale,
1957            ..
1958        }) = in_progress
1959        else {
1960            panic!("Task execution completed, but task is not in progress: {task:#?}");
1961        };
1962
1963        // If the task is stale, reschedule it
1964        #[cfg(not(feature = "no_fast_stale"))]
1965        if *stale {
1966            let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
1967                remove!(task, InProgress)
1968            else {
1969                unreachable!();
1970            };
1971            task.add_new(CachedDataItem::InProgress {
1972                value: InProgressState::Scheduled {
1973                    done_event,
1974                    reason: TaskExecutionReason::Stale,
1975                },
1976            });
1977            drop(task);
1978
1979            // All `new_children` are currently hold active with an active count and we need to undo
1980            // that. (We already filtered out the old children from that list)
1981            AggregationUpdateQueue::run(
1982                AggregationUpdateJob::DecreaseActiveCounts {
1983                    task_ids: new_children.into_iter().collect(),
1984                },
1985                &mut ctx,
1986            );
1987            return true;
1988        }
1989
1990        // If the task is not stateful and has no mutable children, it does not have a way to be
1991        // invalidated and we can mark it as immutable.
1992        if is_now_immutable {
1993            let _ = task.add(CachedDataItem::Immutable { value: () });
1994        }
1995
1996        let mut queue = AggregationUpdateQueue::new();
1997
1998        if has_children {
1999            let has_active_count = ctx.should_track_activeness()
2000                && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
2001            connect_children(
2002                task_id,
2003                &mut task,
2004                new_children,
2005                &mut queue,
2006                has_active_count,
2007                ctx.should_track_activeness(),
2008            );
2009        }
2010
2011        drop(task);
2012
2013        if has_children {
2014            #[cfg(feature = "trace_task_completion")]
2015            let _span = tracing::trace_span!("connect new children").entered();
2016            queue.execute(&mut ctx);
2017        }
2018
2019        let mut task = ctx.task(task_id, TaskDataCategory::All);
2020        let Some(in_progress) = remove!(task, InProgress) else {
2021            panic!("Task execution completed, but task is not in progress: {task:#?}");
2022        };
2023        let InProgressState::InProgress(box InProgressStateInner {
2024            done_event,
2025            once_task: _,
2026            stale,
2027            session_dependent,
2028            done: _,
2029            marked_as_completed: _,
2030            new_children,
2031        }) = in_progress
2032        else {
2033            panic!("Task execution completed, but task is not in progress: {task:#?}");
2034        };
2035        debug_assert!(new_children.is_empty());
2036
2037        // If the task is stale, reschedule it
2038        if stale {
2039            task.add_new(CachedDataItem::InProgress {
2040                value: InProgressState::Scheduled {
2041                    done_event,
2042                    reason: TaskExecutionReason::Stale,
2043                },
2044            });
2045            return true;
2046        }
2047
2048        // Notify in progress cells
2049        removed_data.extend(task.extract_if(
2050            CachedDataItemType::InProgressCell,
2051            |key, value| match (key, value) {
2052                (
2053                    CachedDataItemKey::InProgressCell { .. },
2054                    CachedDataItemValueRef::InProgressCell { value },
2055                ) => {
2056                    value.event.notify(usize::MAX);
2057                    true
2058                }
2059                _ => false,
2060            },
2061        ));
2062
2063        // Update the dirty state
2064        let old_dirty_state = get!(task, Dirty).copied();
2065
2066        let new_dirty_state = if session_dependent {
2067            Some(DirtyState {
2068                clean_in_session: Some(self.session_id),
2069            })
2070        } else {
2071            None
2072        };
2073
2074        let data_update = if old_dirty_state != new_dirty_state {
2075            if let Some(new_dirty_state) = new_dirty_state {
2076                task.insert(CachedDataItem::Dirty {
2077                    value: new_dirty_state,
2078                });
2079            } else {
2080                task.remove(&CachedDataItemKey::Dirty {});
2081            }
2082
2083            if self.should_track_children()
2084                && (old_dirty_state.is_some() || new_dirty_state.is_some())
2085            {
2086                let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
2087                    .cloned()
2088                    .unwrap_or_default();
2089                if let Some(old_dirty_state) = old_dirty_state {
2090                    dirty_containers.update_with_dirty_state(&old_dirty_state);
2091                }
2092                let aggregated_update = match (old_dirty_state, new_dirty_state) {
2093                    (None, None) => unreachable!(),
2094                    (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
2095                    (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
2096                    (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
2097                };
2098                if !aggregated_update.is_zero() {
2099                    if aggregated_update.get(self.session_id) < 0
2100                        && let Some(activeness_state) = get_mut!(task, Activeness)
2101                    {
2102                        activeness_state.all_clean_event.notify(usize::MAX);
2103                        activeness_state.unset_active_until_clean();
2104                        if activeness_state.is_empty() {
2105                            task.remove(&CachedDataItemKey::Activeness {});
2106                        }
2107                    }
2108                    AggregationUpdateJob::data_update(
2109                        &mut task,
2110                        AggregatedDataUpdate::new()
2111                            .dirty_container_update(task_id, aggregated_update),
2112                    )
2113                } else {
2114                    None
2115                }
2116            } else {
2117                None
2118            }
2119        } else {
2120            None
2121        };
2122
2123        drop(task);
2124
2125        if let Some(data_update) = data_update {
2126            AggregationUpdateQueue::run(data_update, &mut ctx);
2127        }
2128
2129        drop(removed_data);
2130
2131        //// STEP 4 ////
2132
2133        let mut task = ctx.task(task_id, TaskDataCategory::All);
2134        task.shrink_to_fit(CachedDataItemType::CellData);
2135        task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
2136        task.shrink_to_fit(CachedDataItemType::CellDependency);
2137        task.shrink_to_fit(CachedDataItemType::OutputDependency);
2138        task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
2139        drop(task);
2140
2141        false
2142    }
2143
2144    fn run_backend_job<'a>(
2145        self: &'a Arc<Self>,
2146        job: TurboTasksBackendJob,
2147        turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2148    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2149        Box::pin(async move {
2150            match job {
2151                TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2152                    debug_assert!(self.should_persist());
2153
2154                    let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2155                    let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2156                    loop {
2157                        const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2158                        const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2159                        const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
2160
2161                        let time = if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2162                            FIRST_SNAPSHOT_WAIT
2163                        } else {
2164                            SNAPSHOT_INTERVAL
2165                        };
2166
2167                        let until = last_snapshot + time;
2168                        if until > Instant::now() {
2169                            let mut stop_listener = self.stopping_event.listen();
2170                            if self.stopping.load(Ordering::Acquire) {
2171                                return;
2172                            }
2173                            let mut idle_start_listener = self.idle_start_event.listen();
2174                            let mut idle_end_listener = self.idle_end_event.listen();
2175                            let mut idle_time = if turbo_tasks.is_idle() {
2176                                Instant::now() + IDLE_TIMEOUT
2177                            } else {
2178                                far_future()
2179                            };
2180                            loop {
2181                                tokio::select! {
2182                                    _ = &mut stop_listener => {
2183                                        return;
2184                                    },
2185                                    _ = &mut idle_start_listener => {
2186                                        idle_time = Instant::now() + IDLE_TIMEOUT;
2187                                        idle_start_listener = self.idle_start_event.listen()
2188                                    },
2189                                    _ = &mut idle_end_listener => {
2190                                        idle_time = until + IDLE_TIMEOUT;
2191                                        idle_end_listener = self.idle_end_event.listen()
2192                                    },
2193                                    _ = tokio::time::sleep_until(until) => {
2194                                        break;
2195                                    },
2196                                    _ = tokio::time::sleep_until(idle_time) => {
2197                                        if turbo_tasks.is_idle() {
2198                                            break;
2199                                        }
2200                                    },
2201                                }
2202                            }
2203                        }
2204
2205                        let this = self.clone();
2206                        let snapshot = this.snapshot();
2207                        if let Some((snapshot_start, new_data)) = snapshot {
2208                            last_snapshot = snapshot_start;
2209                            if new_data {
2210                                continue;
2211                            }
2212                            let last_snapshot = last_snapshot.duration_since(self.start_time);
2213                            self.last_snapshot.store(
2214                                last_snapshot.as_millis().try_into().unwrap(),
2215                                Ordering::Relaxed,
2216                            );
2217
2218                            turbo_tasks.schedule_backend_background_job(
2219                                TurboTasksBackendJob::FollowUpSnapshot,
2220                            );
2221                            return;
2222                        }
2223                    }
2224                }
2225                TurboTasksBackendJob::Prefetch { data, range } => {
2226                    let range = if let Some(range) = range {
2227                        range
2228                    } else {
2229                        if data.len() > 128 {
2230                            let chunk_size = good_chunk_size(data.len());
2231                            let chunks = data.len().div_ceil(chunk_size);
2232                            for i in 0..chunks {
2233                                turbo_tasks.schedule_backend_foreground_job(
2234                                    TurboTasksBackendJob::Prefetch {
2235                                        data: data.clone(),
2236                                        range: Some(
2237                                            (i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2238                                        ),
2239                                    },
2240                                );
2241                            }
2242                            return;
2243                        }
2244                        0..data.len()
2245                    };
2246
2247                    let _span = info_span!("prefetching").entered();
2248                    let mut ctx = self.execute_context(turbo_tasks);
2249                    for i in range {
2250                        let (&task, &with_data) = data.get_index(i).unwrap();
2251                        let category = if with_data {
2252                            TaskDataCategory::All
2253                        } else {
2254                            TaskDataCategory::Meta
2255                        };
2256                        // Prefetch the task
2257                        drop(ctx.task(task, category));
2258                    }
2259                }
2260            }
2261        })
2262    }
2263
2264    fn try_read_own_task_cell_untracked(
2265        &self,
2266        task_id: TaskId,
2267        cell: CellId,
2268        _options: ReadCellOptions,
2269        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2270    ) -> Result<TypedCellContent> {
2271        let mut ctx = self.execute_context(turbo_tasks);
2272        let task = ctx.task(task_id, TaskDataCategory::Data);
2273        if let Some(content) = get!(task, CellData { cell }) {
2274            Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
2275        } else {
2276            Ok(CellContent(None).into_typed(cell.type_id))
2277        }
2278    }
2279
2280    fn read_task_collectibles(
2281        &self,
2282        task_id: TaskId,
2283        collectible_type: TraitTypeId,
2284        reader_id: TaskId,
2285        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2286    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2287        if !self.should_track_children() {
2288            return AutoMap::default();
2289        }
2290
2291        let mut ctx = self.execute_context(turbo_tasks);
2292        let mut collectibles = AutoMap::default();
2293        {
2294            let mut task = ctx.task(task_id, TaskDataCategory::All);
2295            // Ensure it's an root node
2296            loop {
2297                let aggregation_number = get_aggregation_number(&task);
2298                if is_root_node(aggregation_number) {
2299                    break;
2300                }
2301                drop(task);
2302                AggregationUpdateQueue::run(
2303                    AggregationUpdateJob::UpdateAggregationNumber {
2304                        task_id,
2305                        base_aggregation_number: u32::MAX,
2306                        distance: None,
2307                    },
2308                    &mut ctx,
2309                );
2310                task = ctx.task(task_id, TaskDataCategory::All);
2311            }
2312            for collectible in iter_many!(
2313                task,
2314                AggregatedCollectible {
2315                    collectible
2316                } count if collectible.collectible_type == collectible_type && *count > 0 => {
2317                    collectible.cell
2318                }
2319            ) {
2320                *collectibles
2321                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2322                    .or_insert(0) += 1;
2323            }
2324            for (collectible, count) in iter_many!(
2325                task,
2326                Collectible {
2327                    collectible
2328                } count if collectible.collectible_type == collectible_type => {
2329                    (collectible.cell, *count)
2330                }
2331            ) {
2332                *collectibles
2333                    .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2334                    .or_insert(0) += count;
2335            }
2336            let _ = task.add(CachedDataItem::CollectiblesDependent {
2337                collectible_type,
2338                task: reader_id,
2339                value: (),
2340            });
2341        }
2342        {
2343            let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2344            let target = CollectiblesRef {
2345                task: task_id,
2346                collectible_type,
2347            };
2348            if reader
2349                .remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target })
2350                .is_none()
2351            {
2352                let _ = reader.add(CachedDataItem::CollectiblesDependency { target, value: () });
2353            }
2354        }
2355        collectibles
2356    }
2357
2358    fn emit_collectible(
2359        &self,
2360        collectible_type: TraitTypeId,
2361        collectible: RawVc,
2362        task_id: TaskId,
2363        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2364    ) {
2365        self.assert_valid_collectible(task_id, collectible);
2366        if !self.should_track_children() {
2367            return;
2368        }
2369
2370        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2371            panic!("Collectibles need to be resolved");
2372        };
2373        let cell = CellRef {
2374            task: collectible_task,
2375            cell,
2376        };
2377        operation::UpdateCollectibleOperation::run(
2378            task_id,
2379            CollectibleRef {
2380                collectible_type,
2381                cell,
2382            },
2383            1,
2384            self.execute_context(turbo_tasks),
2385        );
2386    }
2387
2388    fn unemit_collectible(
2389        &self,
2390        collectible_type: TraitTypeId,
2391        collectible: RawVc,
2392        count: u32,
2393        task_id: TaskId,
2394        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2395    ) {
2396        self.assert_valid_collectible(task_id, collectible);
2397        if !self.should_track_children() {
2398            return;
2399        }
2400
2401        let RawVc::TaskCell(collectible_task, cell) = collectible else {
2402            panic!("Collectibles need to be resolved");
2403        };
2404        let cell = CellRef {
2405            task: collectible_task,
2406            cell,
2407        };
2408        operation::UpdateCollectibleOperation::run(
2409            task_id,
2410            CollectibleRef {
2411                collectible_type,
2412                cell,
2413            },
2414            -(i32::try_from(count).unwrap()),
2415            self.execute_context(turbo_tasks),
2416        );
2417    }
2418
2419    fn update_task_cell(
2420        &self,
2421        task_id: TaskId,
2422        cell: CellId,
2423        content: CellContent,
2424        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2425    ) {
2426        operation::UpdateCellOperation::run(
2427            task_id,
2428            cell,
2429            content,
2430            self.execute_context(turbo_tasks),
2431        );
2432    }
2433
2434    fn mark_own_task_as_session_dependent(
2435        &self,
2436        task_id: TaskId,
2437        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2438    ) {
2439        if !self.should_track_dependencies() {
2440            // Without dependency tracking we don't need session dependent tasks
2441            return;
2442        }
2443        const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2444        let mut ctx = self.execute_context(turbo_tasks);
2445        let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2446        let aggregation_number = get_aggregation_number(&task);
2447        if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2448            drop(task);
2449            // We want to use a high aggregation number to avoid large aggregation chains for
2450            // session dependent tasks (which change on every run)
2451            AggregationUpdateQueue::run(
2452                AggregationUpdateJob::UpdateAggregationNumber {
2453                    task_id,
2454                    base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2455                    distance: None,
2456                },
2457                &mut ctx,
2458            );
2459            task = ctx.task(task_id, TaskDataCategory::Meta);
2460        }
2461        if let Some(InProgressState::InProgress(box InProgressStateInner {
2462            session_dependent,
2463            ..
2464        })) = get_mut!(task, InProgress)
2465        {
2466            *session_dependent = true;
2467        }
2468    }
2469
2470    fn mark_own_task_as_finished(
2471        &self,
2472        task: TaskId,
2473        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2474    ) {
2475        let mut ctx = self.execute_context(turbo_tasks);
2476        let mut task = ctx.task(task, TaskDataCategory::Data);
2477        if let Some(InProgressState::InProgress(box InProgressStateInner {
2478            marked_as_completed,
2479            ..
2480        })) = get_mut!(task, InProgress)
2481        {
2482            *marked_as_completed = true;
2483            // TODO this should remove the dirty state (also check session_dependent)
2484            // but this would break some assumptions for strongly consistent reads.
2485            // Client tasks are not connected yet, so we wouldn't wait for them.
2486            // Maybe that's ok in cases where mark_finished() is used? Seems like it?
2487        }
2488    }
2489
2490    fn set_own_task_aggregation_number(
2491        &self,
2492        task: TaskId,
2493        aggregation_number: u32,
2494        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2495    ) {
2496        let mut ctx = self.execute_context(turbo_tasks);
2497        AggregationUpdateQueue::run(
2498            AggregationUpdateJob::UpdateAggregationNumber {
2499                task_id: task,
2500                base_aggregation_number: aggregation_number,
2501                distance: None,
2502            },
2503            &mut ctx,
2504        );
2505    }
2506
2507    fn connect_task(
2508        &self,
2509        task: TaskId,
2510        parent_task: TaskId,
2511        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2512    ) {
2513        self.assert_not_persistent_calling_transient(parent_task, task, None);
2514        ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2515    }
2516
2517    fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2518        let task_id = self.transient_task_id_factory.get();
2519        let root_type = match task_type {
2520            TransientTaskType::Root(_) => RootType::RootTask,
2521            TransientTaskType::Once(_) => RootType::OnceTask,
2522        };
2523        self.transient_tasks.insert(
2524            task_id,
2525            Arc::new(match task_type {
2526                TransientTaskType::Root(f) => TransientTask::Root(f),
2527                TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2528            }),
2529        );
2530        {
2531            let mut task = self.storage.access_mut(task_id);
2532            task.add(CachedDataItem::AggregationNumber {
2533                value: AggregationNumber {
2534                    base: u32::MAX,
2535                    distance: 0,
2536                    effective: u32::MAX,
2537                },
2538            });
2539            if self.should_track_activeness() {
2540                task.add(CachedDataItem::Activeness {
2541                    value: ActivenessState::new_root(root_type, task_id),
2542                });
2543            }
2544            task.add(CachedDataItem::new_scheduled(
2545                TaskExecutionReason::Initial,
2546                move || {
2547                    move || match root_type {
2548                        RootType::RootTask => "Root Task".to_string(),
2549                        RootType::OnceTask => "Once Task".to_string(),
2550                    }
2551                },
2552            ));
2553        }
2554        #[cfg(feature = "verify_aggregation_graph")]
2555        self.root_tasks.lock().insert(task_id);
2556        task_id
2557    }
2558
2559    fn dispose_root_task(
2560        &self,
2561        task_id: TaskId,
2562        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2563    ) {
2564        #[cfg(feature = "verify_aggregation_graph")]
2565        self.root_tasks.lock().remove(&task_id);
2566
2567        let mut ctx = self.execute_context(turbo_tasks);
2568        let mut task = ctx.task(task_id, TaskDataCategory::All);
2569        let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2570        let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2571            .map_or(false, |dirty_containers| {
2572                dirty_containers.get(self.session_id) > 0
2573            });
2574        if is_dirty || has_dirty_containers {
2575            if let Some(activeness_state) = get_mut!(task, Activeness) {
2576                // We will finish the task, but it would be removed after the task is done
2577                activeness_state.unset_root_type();
2578                activeness_state.set_active_until_clean();
2579            };
2580        } else if let Some(activeness_state) = remove!(task, Activeness) {
2581            // Technically nobody should be listening to this event, but just in case
2582            // we notify it anyway
2583            activeness_state.all_clean_event.notify(usize::MAX);
2584        }
2585    }
2586
2587    #[cfg(feature = "verify_aggregation_graph")]
2588    fn verify_aggregation_graph(
2589        &self,
2590        turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2591        idle: bool,
2592    ) {
2593        if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2594            return;
2595        }
2596        use std::{collections::VecDeque, env, io::stdout};
2597
2598        use crate::backend::operation::{get_uppers, is_aggregating_node};
2599
2600        let mut ctx = self.execute_context(turbo_tasks);
2601        let root_tasks = self.root_tasks.lock().clone();
2602
2603        for task_id in root_tasks.into_iter() {
2604            let mut queue = VecDeque::new();
2605            let mut visited = FxHashSet::default();
2606            let mut aggregated_nodes = FxHashSet::default();
2607            let mut collectibles = FxHashMap::default();
2608            let root_task_id = task_id;
2609            visited.insert(task_id);
2610            aggregated_nodes.insert(task_id);
2611            queue.push_back(task_id);
2612            let mut counter = 0;
2613            while let Some(task_id) = queue.pop_front() {
2614                counter += 1;
2615                if counter % 100000 == 0 {
2616                    println!(
2617                        "queue={}, visited={}, aggregated_nodes={}",
2618                        queue.len(),
2619                        visited.len(),
2620                        aggregated_nodes.len()
2621                    );
2622                }
2623                let task = ctx.task(task_id, TaskDataCategory::All);
2624                if idle && !self.is_idle.load(Ordering::Relaxed) {
2625                    return;
2626                }
2627
2628                let uppers = get_uppers(&task);
2629                if task_id != root_task_id
2630                    && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2631                {
2632                    panic!(
2633                        "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2634                         {:?})",
2635                        task_id,
2636                        ctx.get_task_description(task_id),
2637                        uppers
2638                    );
2639                }
2640
2641                let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2642                for collectible in aggregated_collectibles {
2643                    collectibles
2644                        .entry(collectible)
2645                        .or_insert_with(|| (false, Vec::new()))
2646                        .1
2647                        .push(task_id);
2648                }
2649
2650                let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2651                for collectible in own_collectibles {
2652                    if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2653                        *flag = true
2654                    } else {
2655                        panic!(
2656                            "Task {} has a collectible {:?} that is not in any upper task",
2657                            task_id, collectible
2658                        );
2659                    }
2660                }
2661
2662                let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2663                let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2664                    .is_some_and(|count| count.get(self.session_id) > 0);
2665                let should_be_in_upper = is_dirty || has_dirty_container;
2666
2667                let aggregation_number = get_aggregation_number(&task);
2668                if is_aggregating_node(aggregation_number) {
2669                    aggregated_nodes.insert(task_id);
2670                }
2671                // println!(
2672                //     "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}",
2673                //     ctx.get_task_description(task_id),
2674                //     uppers
2675                // );
2676
2677                for child_id in iter_many!(task, Child { task } => task) {
2678                    // println!("{task_id}: child -> {child_id}");
2679                    if visited.insert(child_id) {
2680                        queue.push_back(child_id);
2681                    }
2682                }
2683                drop(task);
2684
2685                if should_be_in_upper {
2686                    for upper_id in uppers {
2687                        let task = ctx.task(task_id, TaskDataCategory::All);
2688                        let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2689                            .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2690                        if !in_upper {
2691                            panic!(
2692                                "Task {} ({}) is dirty, but is not listed in the upper task {} \
2693                                 ({})",
2694                                task_id,
2695                                ctx.get_task_description(task_id),
2696                                upper_id,
2697                                ctx.get_task_description(upper_id)
2698                            );
2699                        }
2700                    }
2701                }
2702            }
2703
2704            for (collectible, (flag, task_ids)) in collectibles {
2705                if !flag {
2706                    use std::io::Write;
2707                    let mut stdout = stdout().lock();
2708                    writeln!(
2709                        stdout,
2710                        "{:?} that is not emitted in any child task but in these aggregated \
2711                         tasks: {:#?}",
2712                        collectible,
2713                        task_ids
2714                            .iter()
2715                            .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
2716                            .collect::<Vec<_>>()
2717                    )
2718                    .unwrap();
2719
2720                    let task_id = collectible.cell.task;
2721                    let mut queue = {
2722                        let task = ctx.task(task_id, TaskDataCategory::All);
2723                        get_uppers(&task)
2724                    };
2725                    let mut visited = FxHashSet::default();
2726                    for &upper_id in queue.iter() {
2727                        visited.insert(upper_id);
2728                        writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
2729                    }
2730                    while let Some(task_id) = queue.pop() {
2731                        let desc = ctx.get_task_description(task_id);
2732                        let task = ctx.task(task_id, TaskDataCategory::All);
2733                        let aggregated_collectible =
2734                            get!(task, AggregatedCollectible { collectible })
2735                                .copied()
2736                                .unwrap_or_default();
2737                        let uppers = get_uppers(&task);
2738                        drop(task);
2739                        writeln!(
2740                            stdout,
2741                            "upper {task_id} {desc} collectible={aggregated_collectible}"
2742                        )
2743                        .unwrap();
2744                        if task_ids.contains(&task_id) {
2745                            writeln!(
2746                                stdout,
2747                                "Task has an upper connection to an aggregated task that doesn't \
2748                                 reference it. Upper connection is invalid!"
2749                            )
2750                            .unwrap();
2751                        }
2752                        for upper_id in uppers {
2753                            writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
2754                            if !visited.contains(&upper_id) {
2755                                queue.push(upper_id);
2756                            }
2757                        }
2758                    }
2759                    panic!("See stdout for more details");
2760                }
2761            }
2762        }
2763    }
2764
2765    fn assert_not_persistent_calling_transient(
2766        &self,
2767        parent_id: TaskId,
2768        child_id: TaskId,
2769        cell_id: Option<CellId>,
2770    ) {
2771        if !parent_id.is_transient() && child_id.is_transient() {
2772            self.panic_persistent_calling_transient(
2773                self.lookup_task_type(parent_id).as_deref(),
2774                self.lookup_task_type(child_id).as_deref(),
2775                cell_id,
2776            );
2777        }
2778    }
2779
2780    fn panic_persistent_calling_transient(
2781        &self,
2782        parent: Option<&CachedTaskType>,
2783        child: Option<&CachedTaskType>,
2784        cell_id: Option<CellId>,
2785    ) {
2786        let transient_reason = if let Some(child) = child {
2787            Cow::Owned(format!(
2788                " The callee is transient because it depends on:\n{}",
2789                self.debug_trace_transient_task(child, cell_id),
2790            ))
2791        } else {
2792            Cow::Borrowed("")
2793        };
2794        panic!(
2795            "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
2796            parent.map_or("unknown", |t| t.get_name()),
2797            child.map_or("unknown", |t| t.get_name()),
2798            transient_reason,
2799        );
2800    }
2801
2802    fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
2803        // these checks occur in a potentially hot codepath, but they're cheap
2804        let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
2805            // This should never happen: The collectible APIs use ResolvedVc
2806            let task_info = if let Some(col_task_ty) = collectible
2807                .try_get_task_id()
2808                .and_then(|t| self.lookup_task_type(t))
2809            {
2810                Cow::Owned(format!(" (return type of {col_task_ty})"))
2811            } else {
2812                Cow::Borrowed("")
2813            };
2814            panic!("Collectible{task_info} must be a ResolvedVc")
2815        };
2816        if col_task_id.is_transient() && !task_id.is_transient() {
2817            let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
2818                Cow::Owned(format!(
2819                    ". The collectible is transient because it depends on:\n{}",
2820                    self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
2821                ))
2822            } else {
2823                Cow::Borrowed("")
2824            };
2825            // this should never happen: How would a persistent function get a transient Vc?
2826            panic!(
2827                "Collectible is transient, transient collectibles cannot be emitted from \
2828                 persistent tasks{transient_reason}",
2829            )
2830        }
2831    }
2832}
2833
2834impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
2835    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2836        self.0.startup(turbo_tasks);
2837    }
2838
2839    fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2840        self.0.stopping();
2841    }
2842
2843    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2844        self.0.stop(turbo_tasks);
2845    }
2846
2847    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2848        self.0.idle_start(turbo_tasks);
2849    }
2850
2851    fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2852        self.0.idle_end();
2853    }
2854
2855    fn get_or_create_persistent_task(
2856        &self,
2857        task_type: CachedTaskType,
2858        parent_task: TaskId,
2859        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2860    ) -> TaskId {
2861        self.0
2862            .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
2863    }
2864
2865    fn get_or_create_transient_task(
2866        &self,
2867        task_type: CachedTaskType,
2868        parent_task: TaskId,
2869        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2870    ) -> TaskId {
2871        self.0
2872            .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
2873    }
2874
2875    fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2876        self.0.invalidate_task(task_id, turbo_tasks);
2877    }
2878
2879    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2880        self.0.invalidate_tasks(tasks, turbo_tasks);
2881    }
2882
2883    fn invalidate_tasks_set(
2884        &self,
2885        tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
2886        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2887    ) {
2888        self.0.invalidate_tasks_set(tasks, turbo_tasks);
2889    }
2890
2891    fn invalidate_serialization(
2892        &self,
2893        task_id: TaskId,
2894        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2895    ) {
2896        self.0.invalidate_serialization(task_id, turbo_tasks);
2897    }
2898
2899    fn get_task_description(&self, task: TaskId) -> String {
2900        self.0.get_task_description(task)
2901    }
2902
2903    type TaskState = ();
2904    fn new_task_state(&self, _task: TaskId) -> Self::TaskState {}
2905
2906    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2907        self.0.task_execution_canceled(task, turbo_tasks)
2908    }
2909
2910    fn try_start_task_execution(
2911        &self,
2912        task_id: TaskId,
2913        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2914    ) -> Option<TaskExecutionSpec<'_>> {
2915        self.0.try_start_task_execution(task_id, turbo_tasks)
2916    }
2917
2918    fn task_execution_result(
2919        &self,
2920        task_id: TaskId,
2921        result: Result<RawVc, TurboTasksExecutionError>,
2922        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2923    ) {
2924        self.0.task_execution_result(task_id, result, turbo_tasks);
2925    }
2926
2927    fn task_execution_completed(
2928        &self,
2929        task_id: TaskId,
2930        _duration: Duration,
2931        _memory_usage: usize,
2932        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2933        stateful: bool,
2934        has_invalidator: bool,
2935        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2936    ) -> bool {
2937        self.0.task_execution_completed(
2938            task_id,
2939            _duration,
2940            _memory_usage,
2941            cell_counters,
2942            stateful,
2943            has_invalidator,
2944            turbo_tasks,
2945        )
2946    }
2947
2948    type BackendJob = TurboTasksBackendJob;
2949
2950    fn run_backend_job<'a>(
2951        &'a self,
2952        job: Self::BackendJob,
2953        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
2954    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2955        self.0.run_backend_job(job, turbo_tasks)
2956    }
2957
2958    fn try_read_task_output(
2959        &self,
2960        task_id: TaskId,
2961        reader: TaskId,
2962        consistency: ReadConsistency,
2963        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2964    ) -> Result<Result<RawVc, EventListener>> {
2965        self.0
2966            .try_read_task_output(task_id, Some(reader), consistency, turbo_tasks)
2967    }
2968
2969    fn try_read_task_output_untracked(
2970        &self,
2971        task_id: TaskId,
2972        consistency: ReadConsistency,
2973        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2974    ) -> Result<Result<RawVc, EventListener>> {
2975        self.0
2976            .try_read_task_output(task_id, None, consistency, turbo_tasks)
2977    }
2978
2979    fn try_read_task_cell(
2980        &self,
2981        task_id: TaskId,
2982        cell: CellId,
2983        reader: TaskId,
2984        options: ReadCellOptions,
2985        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2986    ) -> Result<Result<TypedCellContent, EventListener>> {
2987        self.0
2988            .try_read_task_cell(task_id, Some(reader), cell, options, turbo_tasks)
2989    }
2990
2991    fn try_read_task_cell_untracked(
2992        &self,
2993        task_id: TaskId,
2994        cell: CellId,
2995        options: ReadCellOptions,
2996        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2997    ) -> Result<Result<TypedCellContent, EventListener>> {
2998        self.0
2999            .try_read_task_cell(task_id, None, cell, options, turbo_tasks)
3000    }
3001
3002    fn try_read_own_task_cell_untracked(
3003        &self,
3004        task_id: TaskId,
3005        cell: CellId,
3006        options: ReadCellOptions,
3007        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3008    ) -> Result<TypedCellContent> {
3009        self.0
3010            .try_read_own_task_cell_untracked(task_id, cell, options, turbo_tasks)
3011    }
3012
3013    fn read_task_collectibles(
3014        &self,
3015        task_id: TaskId,
3016        collectible_type: TraitTypeId,
3017        reader: TaskId,
3018        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3019    ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3020        self.0
3021            .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3022    }
3023
3024    fn emit_collectible(
3025        &self,
3026        collectible_type: TraitTypeId,
3027        collectible: RawVc,
3028        task_id: TaskId,
3029        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3030    ) {
3031        self.0
3032            .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3033    }
3034
3035    fn unemit_collectible(
3036        &self,
3037        collectible_type: TraitTypeId,
3038        collectible: RawVc,
3039        count: u32,
3040        task_id: TaskId,
3041        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3042    ) {
3043        self.0
3044            .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3045    }
3046
3047    fn update_task_cell(
3048        &self,
3049        task_id: TaskId,
3050        cell: CellId,
3051        content: CellContent,
3052        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3053    ) {
3054        self.0.update_task_cell(task_id, cell, content, turbo_tasks);
3055    }
3056
3057    fn mark_own_task_as_finished(
3058        &self,
3059        task_id: TaskId,
3060        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3061    ) {
3062        self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3063    }
3064
3065    fn set_own_task_aggregation_number(
3066        &self,
3067        task: TaskId,
3068        aggregation_number: u32,
3069        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3070    ) {
3071        self.0
3072            .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3073    }
3074
3075    fn mark_own_task_as_session_dependent(
3076        &self,
3077        task: TaskId,
3078        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3079    ) {
3080        self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3081    }
3082
3083    fn connect_task(
3084        &self,
3085        task: TaskId,
3086        parent_task: TaskId,
3087        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3088    ) {
3089        self.0.connect_task(task, parent_task, turbo_tasks);
3090    }
3091
3092    fn create_transient_task(
3093        &self,
3094        task_type: TransientTaskType,
3095        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3096    ) -> TaskId {
3097        self.0.create_transient_task(task_type)
3098    }
3099
3100    fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3101        self.0.dispose_root_task(task_id, turbo_tasks);
3102    }
3103
3104    fn task_statistics(&self) -> &TaskStatisticsApi {
3105        &self.0.task_statistics
3106    }
3107}
3108
3109enum DebugTraceTransientTask {
3110    Cached {
3111        task_name: &'static str,
3112        cell_type_id: Option<ValueTypeId>,
3113        cause_self: Option<Box<DebugTraceTransientTask>>,
3114        cause_args: Vec<DebugTraceTransientTask>,
3115    },
3116    /// This representation is used when this task is a duplicate of one previously shown
3117    Collapsed {
3118        task_name: &'static str,
3119        cell_type_id: Option<ValueTypeId>,
3120    },
3121    Uncached {
3122        cell_type_id: Option<ValueTypeId>,
3123    },
3124}
3125
3126impl DebugTraceTransientTask {
3127    fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3128        let indent = "    ".repeat(level);
3129        f.write_str(&indent)?;
3130
3131        fn fmt_cell_type_id(
3132            f: &mut fmt::Formatter<'_>,
3133            cell_type_id: Option<ValueTypeId>,
3134        ) -> fmt::Result {
3135            if let Some(ty) = cell_type_id {
3136                write!(f, " (read cell of type {})", get_value_type_global_name(ty))
3137            } else {
3138                Ok(())
3139            }
3140        }
3141
3142        // write the name and type
3143        match self {
3144            Self::Cached {
3145                task_name,
3146                cell_type_id,
3147                ..
3148            }
3149            | Self::Collapsed {
3150                task_name,
3151                cell_type_id,
3152                ..
3153            } => {
3154                f.write_str(task_name)?;
3155                fmt_cell_type_id(f, *cell_type_id)?;
3156                if matches!(self, Self::Collapsed { .. }) {
3157                    f.write_str(" (collapsed)")?;
3158                }
3159            }
3160            Self::Uncached { cell_type_id } => {
3161                f.write_str("unknown transient task")?;
3162                fmt_cell_type_id(f, *cell_type_id)?;
3163            }
3164        }
3165        f.write_char('\n')?;
3166
3167        // write any extra "cause" information we might have
3168        if let Self::Cached {
3169            cause_self,
3170            cause_args,
3171            ..
3172        } = self
3173        {
3174            if let Some(c) = cause_self {
3175                writeln!(f, "{indent}  self:")?;
3176                c.fmt_indented(f, level + 1)?;
3177            }
3178            if !cause_args.is_empty() {
3179                writeln!(f, "{indent}  args:")?;
3180                for c in cause_args {
3181                    c.fmt_indented(f, level + 1)?;
3182                }
3183            }
3184        }
3185        Ok(())
3186    }
3187}
3188
3189impl fmt::Display for DebugTraceTransientTask {
3190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3191        self.fmt_indented(f, 0)
3192    }
3193}
3194
3195// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
3196fn far_future() -> Instant {
3197    // Roughly 30 years from now.
3198    // API does not provide a way to obtain max `Instant`
3199    // or convert specific date in the future to instant.
3200    // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
3201    Instant::now() + Duration::from_secs(86400 * 365 * 30)
3202}