1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6 borrow::Cow,
7 fmt::{self, Write},
8 future::Future,
9 hash::BuildHasherDefault,
10 mem::take,
11 pin::Pin,
12 sync::{
13 Arc,
14 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
15 },
16 thread::available_parallelism,
17};
18
19use anyhow::{Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use turbo_tasks::{
27 CellId, FunctionId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
28 SessionId, TRANSIENT_TASK_BIT, TaskId, TraitTypeId, TurboTasksBackendApi, ValueTypeId,
29 backend::{
30 Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
31 TransientTaskType, TurboTasksExecutionError, TypedCellContent,
32 },
33 event::{Event, EventListener},
34 registry::{self, get_value_type_global_name},
35 task_statistics::TaskStatisticsApi,
36 trace::TraceRawVcs,
37 util::IdFactoryWithReuse,
38};
39
40pub use self::{operation::AnyOperation, storage::TaskDataCategory};
41#[cfg(feature = "trace_task_dirty")]
42use crate::backend::operation::TaskDirtyCause;
43use crate::{
44 backend::{
45 operation::{
46 AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
47 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
48 Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
49 is_root_node, prepare_new_children,
50 },
51 storage::{
52 InnerStorageSnapshot, Storage, get, get_many, get_mut, get_mut_or_insert_with,
53 iter_many, remove,
54 },
55 },
56 backing_storage::BackingStorage,
57 data::{
58 ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
59 CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
60 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
61 },
62 utils::{
63 bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
64 },
65};
66
67const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
68const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) };
69
70const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
71
72struct SnapshotRequest {
73 snapshot_requested: bool,
74 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
75}
76
77impl SnapshotRequest {
78 fn new() -> Self {
79 Self {
80 snapshot_requested: false,
81 suspended_operations: FxHashSet::default(),
82 }
83 }
84}
85
86type TransientTaskOnce =
87 Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
88
89pub enum TransientTask {
90 Root(TransientTaskRoot),
96
97 Once(TransientTaskOnce),
106}
107
108pub enum StorageMode {
109 ReadOnly,
111 ReadWrite,
114}
115
116pub struct BackendOptions {
117 pub dependency_tracking: bool,
122
123 pub children_tracking: bool,
128
129 pub active_tracking: bool,
135
136 pub storage_mode: Option<StorageMode>,
138}
139
140impl Default for BackendOptions {
141 fn default() -> Self {
142 Self {
143 dependency_tracking: true,
144 children_tracking: true,
145 active_tracking: true,
146 storage_mode: Some(StorageMode::ReadWrite),
147 }
148 }
149}
150
151pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
152
153type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
154
155struct TurboTasksBackendInner<B: BackingStorage> {
156 options: BackendOptions,
157
158 start_time: Instant,
159 session_id: SessionId,
160
161 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
162 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
163
164 persisted_task_cache_log: Option<TaskCacheLog>,
165 task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
166 transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
167
168 storage: Storage,
169
170 in_progress_operations: AtomicUsize,
176
177 snapshot_request: Mutex<SnapshotRequest>,
178 operations_suspended: Condvar,
182 snapshot_completed: Condvar,
185 last_snapshot: AtomicU64,
187
188 stopping: AtomicBool,
189 stopping_event: Event,
190 idle_start_event: Event,
191 idle_end_event: Event,
192 #[cfg(feature = "verify_aggregation_graph")]
193 is_idle: AtomicBool,
194
195 task_statistics: TaskStatisticsApi,
196
197 backing_storage: B,
198
199 #[cfg(feature = "verify_aggregation_graph")]
200 root_tasks: Mutex<FxHashSet<TaskId>>,
201}
202
203impl<B: BackingStorage> TurboTasksBackend<B> {
204 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
205 Self(Arc::new(TurboTasksBackendInner::new(
206 options,
207 backing_storage,
208 )))
209 }
210}
211
212impl<B: BackingStorage> TurboTasksBackendInner<B> {
213 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
214 let shard_amount =
215 (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
216 let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
217 if !options.dependency_tracking {
218 options.active_tracking = false;
219 }
220 Self {
221 options,
222 start_time: Instant::now(),
223 session_id: backing_storage.next_session_id(),
224 persisted_task_id_factory: IdFactoryWithReuse::new(
225 backing_storage.next_free_task_id(),
226 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
227 ),
228 transient_task_id_factory: IdFactoryWithReuse::new(
229 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
230 TaskId::MAX,
231 ),
232 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
233 task_cache: BiMap::new(),
234 transient_tasks: FxDashMap::default(),
235 storage: Storage::new(),
236 in_progress_operations: AtomicUsize::new(0),
237 snapshot_request: Mutex::new(SnapshotRequest::new()),
238 operations_suspended: Condvar::new(),
239 snapshot_completed: Condvar::new(),
240 last_snapshot: AtomicU64::new(0),
241 stopping: AtomicBool::new(false),
242 stopping_event: Event::new(|| "TurboTasksBackend::stopping_event".to_string()),
243 idle_start_event: Event::new(|| "TurboTasksBackend::idle_start_event".to_string()),
244 idle_end_event: Event::new(|| "TurboTasksBackend::idle_end_event".to_string()),
245 #[cfg(feature = "verify_aggregation_graph")]
246 is_idle: AtomicBool::new(false),
247 task_statistics: TaskStatisticsApi::default(),
248 backing_storage,
249 #[cfg(feature = "verify_aggregation_graph")]
250 root_tasks: Default::default(),
251 }
252 }
253
254 fn execute_context<'a>(
255 &'a self,
256 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
257 ) -> impl ExecuteContext<'a> {
258 ExecuteContextImpl::new(self, turbo_tasks)
259 }
260
261 fn session_id(&self) -> SessionId {
262 self.session_id
263 }
264
265 unsafe fn execute_context_with_tx<'e, 'tx>(
269 &'e self,
270 tx: Option<&'e B::ReadTransaction<'tx>>,
271 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
272 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
273 where
274 'tx: 'e,
275 {
276 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
278 }
279
280 fn suspending_requested(&self) -> bool {
281 self.should_persist()
282 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
283 }
284
285 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
286 #[cold]
287 fn operation_suspend_point_cold<B: BackingStorage>(
288 this: &TurboTasksBackendInner<B>,
289 suspend: impl FnOnce() -> AnyOperation,
290 ) {
291 let operation = Arc::new(suspend());
292 let mut snapshot_request = this.snapshot_request.lock();
293 if snapshot_request.snapshot_requested {
294 snapshot_request
295 .suspended_operations
296 .insert(operation.clone().into());
297 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
298 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
299 if value == SNAPSHOT_REQUESTED_BIT {
300 this.operations_suspended.notify_all();
301 }
302 this.snapshot_completed
303 .wait_while(&mut snapshot_request, |snapshot_request| {
304 snapshot_request.snapshot_requested
305 });
306 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
307 snapshot_request
308 .suspended_operations
309 .remove(&operation.into());
310 }
311 }
312
313 if self.suspending_requested() {
314 operation_suspend_point_cold(self, suspend);
315 }
316 }
317
318 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
319 if !self.should_persist() {
320 return OperationGuard { backend: None };
321 }
322 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
323 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
324 let mut snapshot_request = self.snapshot_request.lock();
325 if snapshot_request.snapshot_requested {
326 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
327 if value == SNAPSHOT_REQUESTED_BIT {
328 self.operations_suspended.notify_all();
329 }
330 self.snapshot_completed
331 .wait_while(&mut snapshot_request, |snapshot_request| {
332 snapshot_request.snapshot_requested
333 });
334 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
335 }
336 }
337 OperationGuard {
338 backend: Some(self),
339 }
340 }
341
342 fn should_persist(&self) -> bool {
343 matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
344 }
345
346 fn should_restore(&self) -> bool {
347 self.options.storage_mode.is_some()
348 }
349
350 fn should_track_dependencies(&self) -> bool {
351 self.options.dependency_tracking
352 }
353
354 fn should_track_activeness(&self) -> bool {
355 self.options.active_tracking
356 }
357
358 fn should_track_children(&self) -> bool {
359 self.options.children_tracking
360 }
361
362 fn track_cache_hit(&self, task_type: &CachedTaskType) {
363 self.task_statistics
364 .map(|stats| stats.increment_cache_hit(task_type.fn_type));
365 }
366
367 fn track_cache_miss(&self, task_type: &CachedTaskType) {
368 self.task_statistics
369 .map(|stats| stats.increment_cache_miss(task_type.fn_type));
370 }
371}
372
373pub(crate) struct OperationGuard<'a, B: BackingStorage> {
374 backend: Option<&'a TurboTasksBackendInner<B>>,
375}
376
377impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
378 fn drop(&mut self) {
379 if let Some(backend) = self.backend {
380 let fetch_sub = backend
381 .in_progress_operations
382 .fetch_sub(1, Ordering::AcqRel);
383 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
384 backend.operations_suspended.notify_all();
385 }
386 }
387 }
388}
389
390impl<B: BackingStorage> TurboTasksBackendInner<B> {
392 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
396 &'l self,
397 tx: Option<&'l B::ReadTransaction<'tx>>,
398 parent_task: TaskId,
399 child_task: TaskId,
400 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
401 ) {
402 operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
403 self.execute_context_with_tx(tx, turbo_tasks)
404 });
405 }
406
407 fn connect_child(
408 &self,
409 parent_task: TaskId,
410 child_task: TaskId,
411 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
412 ) {
413 operation::ConnectChildOperation::run(
414 parent_task,
415 child_task,
416 self.execute_context(turbo_tasks),
417 );
418 }
419
420 fn try_read_task_output(
421 &self,
422 task_id: TaskId,
423 reader: Option<TaskId>,
424 consistency: ReadConsistency,
425 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
426 ) -> Result<Result<RawVc, EventListener>> {
427 if let Some(reader) = reader {
428 self.assert_not_persistent_calling_transient(reader, task_id, None);
429 }
430
431 let mut ctx = self.execute_context(turbo_tasks);
432 let mut task = ctx.task(task_id, TaskDataCategory::All);
433
434 fn listen_to_done_event<B: BackingStorage>(
435 this: &TurboTasksBackendInner<B>,
436 reader: Option<TaskId>,
437 done_event: &Event,
438 ) -> EventListener {
439 let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
440 done_event.listen_with_note(move || {
441 if let Some(reader_desc) = reader_desc.as_ref() {
442 format!("try_read_task_output from {}", reader_desc())
443 } else {
444 "try_read_task_output (untracked)".to_string()
445 }
446 })
447 }
448
449 fn check_in_progress<B: BackingStorage>(
450 this: &TurboTasksBackendInner<B>,
451 task: &impl TaskGuard,
452 reader: Option<TaskId>,
453 ctx: &impl ExecuteContext<'_>,
454 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
455 {
456 match get!(task, InProgress) {
457 Some(InProgressState::Scheduled { done_event, .. }) => {
458 Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
459 }
460 Some(InProgressState::InProgress(box InProgressStateInner {
461 done,
462 done_event,
463 ..
464 })) => {
465 if !*done {
466 Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
467 } else {
468 None
469 }
470 }
471 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
472 "{} was canceled",
473 ctx.get_task_description(task.id())
474 ))),
475 None => None,
476 }
477 }
478
479 if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
480 loop {
482 let aggregation_number = get_aggregation_number(&task);
483 if is_root_node(aggregation_number) {
484 break;
485 }
486 drop(task);
487 {
488 let _span = tracing::trace_span!(
489 "make root node for strongly consistent read",
490 %task_id
491 )
492 .entered();
493 AggregationUpdateQueue::run(
494 AggregationUpdateJob::UpdateAggregationNumber {
495 task_id,
496 base_aggregation_number: u32::MAX,
497 distance: None,
498 },
499 &mut ctx,
500 );
501 }
502 task = ctx.task(task_id, TaskDataCategory::All);
503 }
504
505 let is_dirty =
506 get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
507
508 let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
510 .cloned()
511 .unwrap_or_default()
512 .get(self.session_id);
513 if dirty_tasks > 0 || is_dirty {
514 let root = get_mut!(task, Activeness);
515 let mut task_ids_to_schedule: Vec<_> = Vec::new();
516 let root = if let Some(root) = root {
518 root.set_active_until_clean();
522 root
523 } else {
524 get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
528 .set_active_until_clean();
529 if ctx.should_track_activeness() {
530 task_ids_to_schedule = get_many!(
532 task,
533 AggregatedDirtyContainer {
534 task
535 } count if count.get(self.session_id) > 0 => {
536 task
537 }
538 );
539 task_ids_to_schedule.push(task_id);
540 }
541 get!(task, Activeness).unwrap()
542 };
543 let listener = root.all_clean_event.listen_with_note(move || {
544 format!("try_read_task_output (strongly consistent) from {reader:?}")
545 });
546 drop(task);
547 if !task_ids_to_schedule.is_empty() {
548 let mut queue = AggregationUpdateQueue::new();
549 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
550 queue.execute(&mut ctx);
551 }
552
553 return Ok(Err(listener));
554 }
555 }
556
557 if let Some(value) = check_in_progress(self, &task, reader, &ctx) {
558 return value;
559 }
560
561 if let Some(output) = get!(task, Output) {
562 let result = match output {
563 OutputValue::Cell(cell) => Some(Ok(Ok(RawVc::TaskCell(cell.task, cell.cell)))),
564 OutputValue::Output(task) => Some(Ok(Ok(RawVc::TaskOutput(*task)))),
565 OutputValue::Error => get!(task, Error).map(|error| Err(error.clone().into())),
566 };
567 if let Some(result) = result {
568 if self.should_track_dependencies() {
569 if let Some(reader) = reader {
570 let _ = task.add(CachedDataItem::OutputDependent {
571 task: reader,
572 value: (),
573 });
574 drop(task);
575
576 let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
577 if reader_task
578 .remove(&CachedDataItemKey::OutdatedOutputDependency {
579 target: task_id,
580 })
581 .is_none()
582 {
583 let _ = reader_task.add(CachedDataItem::OutputDependency {
584 target: task_id,
585 value: (),
586 });
587 }
588 }
589 }
590
591 return result;
592 }
593 }
594
595 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
596 let note = move || {
597 if let Some(reader_desc) = reader_desc.as_ref() {
598 format!("try_read_task_output (recompute) from {}", reader_desc())
599 } else {
600 "try_read_task_output (recompute, untracked)".to_string()
601 }
602 };
603
604 let (item, listener) =
606 CachedDataItem::new_scheduled_with_listener(self.get_task_desc_fn(task_id), note);
607 task.add_new(item);
610 turbo_tasks.schedule(task_id);
611
612 Ok(Err(listener))
613 }
614
615 fn try_read_task_cell(
616 &self,
617 task_id: TaskId,
618 reader: Option<TaskId>,
619 cell: CellId,
620 options: ReadCellOptions,
621 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
622 ) -> Result<Result<TypedCellContent, EventListener>> {
623 if let Some(reader) = reader {
624 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
625 }
626
627 fn add_cell_dependency<B: BackingStorage>(
628 backend: &TurboTasksBackendInner<B>,
629 mut task: impl TaskGuard,
630 reader: Option<TaskId>,
631 cell: CellId,
632 task_id: TaskId,
633 ctx: &mut impl ExecuteContext<'_>,
634 ) {
635 if !backend.should_track_dependencies() {
636 return;
637 }
638 if let Some(reader) = reader {
639 if reader == task_id {
640 return;
643 }
644 let _ = task.add(CachedDataItem::CellDependent {
645 cell,
646 task: reader,
647 value: (),
648 });
649 drop(task);
650
651 let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
652 let target = CellRef {
653 task: task_id,
654 cell,
655 };
656 if reader_task
657 .remove(&CachedDataItemKey::OutdatedCellDependency { target })
658 .is_none()
659 {
660 let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
661 }
662 }
663 }
664
665 let mut ctx = self.execute_context(turbo_tasks);
666 let mut task = ctx.task(task_id, TaskDataCategory::Data);
667 let content = if options.final_read_hint {
668 remove!(task, CellData { cell })
669 } else if let Some(content) = get!(task, CellData { cell }) {
670 let content = content.clone();
671 Some(content)
672 } else {
673 None
674 };
675 if let Some(content) = content {
676 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
677 return Ok(Ok(TypedCellContent(
678 cell.type_id,
679 CellContent(Some(content.1)),
680 )));
681 }
682
683 let in_progress = get!(task, InProgress);
684 if matches!(
685 in_progress,
686 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
687 ) {
688 return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
689 }
690 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
691 let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));
692
693 let max_id = get!(
695 task,
696 CellTypeMaxIndex {
697 cell_type: cell.type_id
698 }
699 )
700 .copied();
701 let Some(max_id) = max_id else {
702 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
703 bail!(
704 "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
705 ctx.get_task_description(task_id)
706 );
707 };
708 if cell.index >= max_id {
709 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
710 bail!(
711 "Cell {cell:?} no longer exists in task {} (index out of bounds)",
712 ctx.get_task_description(task_id)
713 );
714 }
715
716 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
721 if !new_listener {
722 return Ok(Err(listener));
723 }
724
725 let _span = tracing::trace_span!(
726 "recomputation",
727 cell_type = registry::get_value_type_global_name(cell.type_id),
728 cell_index = cell.index
729 )
730 .entered();
731
732 if is_cancelled {
734 bail!("{} was canceled", ctx.get_task_description(task_id));
735 } else if !is_scheduled
736 && task.add(CachedDataItem::new_scheduled(
737 self.get_task_desc_fn(task_id),
738 ))
739 {
740 turbo_tasks.schedule(task_id);
741 }
742
743 Ok(Err(listener))
744 }
745
746 fn listen_to_cell(
747 &self,
748 task: &mut impl TaskGuard,
749 task_id: TaskId,
750 reader: Option<TaskId>,
751 cell: CellId,
752 ) -> (EventListener, bool) {
753 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
754 let note = move || {
755 if let Some(reader_desc) = reader_desc.as_ref() {
756 format!("try_read_task_cell (in progress) from {}", reader_desc())
757 } else {
758 "try_read_task_cell (in progress, untracked)".to_string()
759 }
760 };
761 if let Some(in_progress) = get!(task, InProgressCell { cell }) {
762 let listener = in_progress.event.listen_with_note(note);
764 return (listener, false);
765 }
766 let in_progress = InProgressCellState::new(task_id, cell);
767 let listener = in_progress.event.listen_with_note(note);
768 task.add_new(CachedDataItem::InProgressCell {
769 cell,
770 value: in_progress,
771 });
772 (listener, true)
773 }
774
775 fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
776 if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
777 return Some(task_type);
778 }
779 if self.should_restore() && !task_id.is_transient() {
780 if let Some(task_type) = unsafe {
781 self.backing_storage
782 .reverse_lookup_task_cache(None, task_id)
783 } {
784 let _ = self.task_cache.try_insert(task_type.clone(), task_id);
785 return Some(task_type);
786 }
787 }
788 None
789 }
790
791 fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
793 let task_type = self.lookup_task_type(task_id);
794 move || {
795 task_type.as_ref().map_or_else(
796 || format!("{task_id:?} transient"),
797 |task_type| format!("{task_id:?} {task_type}"),
798 )
799 }
800 }
801
802 fn snapshot(&self) -> Option<(Instant, bool)> {
803 debug_assert!(self.should_persist());
804 let mut snapshot_request = self.snapshot_request.lock();
805 snapshot_request.snapshot_requested = true;
806 let active_operations = self
807 .in_progress_operations
808 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
809 if active_operations != 0 {
810 self.operations_suspended
811 .wait_while(&mut snapshot_request, |_| {
812 self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
813 });
814 }
815 let suspended_operations = snapshot_request
816 .suspended_operations
817 .iter()
818 .map(|op| op.arc().clone())
819 .collect::<Vec<_>>();
820 drop(snapshot_request);
821 let mut persisted_task_cache_log = self
822 .persisted_task_cache_log
823 .as_ref()
824 .map(|l| l.take(|i| i))
825 .unwrap_or_default();
826 self.storage.start_snapshot();
827 let mut snapshot_request = self.snapshot_request.lock();
828 snapshot_request.snapshot_requested = false;
829 self.in_progress_operations
830 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
831 self.snapshot_completed.notify_all();
832 let snapshot_time = Instant::now();
833 drop(snapshot_request);
834
835 let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
836 if task_id.is_transient() {
837 return (None, None);
838 }
839 let len = inner.len();
840 let mut meta = Vec::with_capacity(len);
841 let mut data = Vec::with_capacity(len);
842 for (key, value) in inner.iter_all() {
843 if key.is_persistent() && value.is_persistent() {
844 match key.category() {
845 TaskDataCategory::Meta => {
846 meta.push(CachedDataItem::from_key_and_value_ref(key, value))
847 }
848 TaskDataCategory::Data => {
849 data.push(CachedDataItem::from_key_and_value_ref(key, value))
850 }
851 _ => {}
852 }
853 }
854 }
855
856 (
857 inner.state().meta_restored().then_some(meta),
858 inner.state().data_restored().then_some(data),
859 )
860 };
861 let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
862 (
863 task_id,
864 meta.map(|d| B::serialize(task_id, &d)),
865 data.map(|d| B::serialize(task_id, &d)),
866 )
867 };
868 let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
869 if task_id.is_transient() {
870 return (task_id, None, None);
871 }
872 let len = inner.len();
873 let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
874 let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
875 for (key, value) in inner.iter_all() {
876 if key.is_persistent() && value.is_persistent() {
877 match key.category() {
878 TaskDataCategory::Meta => {
879 if let Some(meta) = &mut meta {
880 meta.push(CachedDataItem::from_key_and_value_ref(key, value));
881 }
882 }
883 TaskDataCategory::Data => {
884 if let Some(data) = &mut data {
885 data.push(CachedDataItem::from_key_and_value_ref(key, value));
886 }
887 }
888 _ => {}
889 }
890 }
891 }
892 (
893 task_id,
894 meta.map(|meta| B::serialize(task_id, &meta)),
895 data.map(|data| B::serialize(task_id, &data)),
896 )
897 };
898
899 let snapshot = {
900 let _span = tracing::trace_span!("take snapshot");
901 self.storage
902 .take_snapshot(&preprocess, &process, &process_snapshot)
903 };
904
905 let task_snapshots = snapshot
906 .into_iter()
907 .filter_map(|iter| {
908 let mut iter = iter
909 .filter_map(
910 |(task_id, meta, data): (
911 _,
912 Option<Result<SmallVec<_>>>,
913 Option<Result<SmallVec<_>>>,
914 )| {
915 let meta = match meta {
916 Some(Ok(meta)) => Some(meta),
917 None => None,
918 Some(Err(err)) => {
919 println!(
920 "Serializing task {} failed (meta): {:?}",
921 self.get_task_description(task_id),
922 err
923 );
924 None
925 }
926 };
927 let data = match data {
928 Some(Ok(data)) => Some(data),
929 None => None,
930 Some(Err(err)) => {
931 println!(
932 "Serializing task {} failed (data): {:?}",
933 self.get_task_description(task_id),
934 err
935 );
936 None
937 }
938 };
939 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
940 },
941 )
942 .peekable();
943 iter.peek().is_some().then_some(iter)
944 })
945 .collect::<Vec<_>>();
946
947 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
948
949 let mut new_items = false;
950
951 if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
952 new_items = true;
953 if let Err(err) = self.backing_storage.save_snapshot(
954 self.session_id,
955 suspended_operations,
956 persisted_task_cache_log,
957 task_snapshots,
958 ) {
959 println!("Persisting failed: {err:?}");
960 return None;
961 }
962 }
963
964 Some((snapshot_time, new_items))
965 }
966
967 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
968 if self.should_restore() {
969 let uncompleted_operations = self.backing_storage.uncompleted_operations();
973 if !uncompleted_operations.is_empty() {
974 let mut ctx = self.execute_context(turbo_tasks);
975 for op in uncompleted_operations {
976 op.execute(&mut ctx);
977 }
978 }
979 }
980
981 if self.should_persist() {
982 turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT);
984 }
985 }
986
987 fn stopping(&self) {
988 self.stopping.store(true, Ordering::Release);
989 self.stopping_event.notify(usize::MAX);
990 }
991
992 #[allow(unused_variables)]
993 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
994 #[cfg(feature = "verify_aggregation_graph")]
995 {
996 self.is_idle.store(false, Ordering::Release);
997 self.verify_aggregation_graph(turbo_tasks, false);
998 }
999 if let Err(err) = self.backing_storage.shutdown() {
1000 println!("Shutting down failed: {err}");
1001 }
1002 }
1003
1004 #[allow(unused_variables)]
1005 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1006 self.idle_start_event.notify(usize::MAX);
1007
1008 #[cfg(feature = "verify_aggregation_graph")]
1009 {
1010 use tokio::select;
1011
1012 self.is_idle.store(true, Ordering::Release);
1013 let this = self.clone();
1014 let turbo_tasks = turbo_tasks.pin();
1015 tokio::task::spawn(async move {
1016 select! {
1017 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1018 }
1020 _ = this.idle_end_event.listen() => {
1021 return;
1022 }
1023 }
1024 if !this.is_idle.load(Ordering::Relaxed) {
1025 return;
1026 }
1027 this.verify_aggregation_graph(&*turbo_tasks, true);
1028 });
1029 }
1030 }
1031
1032 fn idle_end(&self) {
1033 #[cfg(feature = "verify_aggregation_graph")]
1034 self.is_idle.store(false, Ordering::Release);
1035 self.idle_end_event.notify(usize::MAX);
1036 }
1037
1038 fn get_or_create_persistent_task(
1039 &self,
1040 task_type: CachedTaskType,
1041 parent_task: TaskId,
1042 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1043 ) -> TaskId {
1044 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1045 self.track_cache_hit(&task_type);
1046 self.connect_child(parent_task, task_id, turbo_tasks);
1047 return task_id;
1048 }
1049
1050 self.track_cache_miss(&task_type);
1051 let tx = self
1052 .should_restore()
1053 .then(|| self.backing_storage.start_read_transaction())
1054 .flatten();
1055 let task_id = {
1056 if let Some(task_id) = unsafe {
1058 self.backing_storage
1059 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1060 } {
1061 let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1062 task_id
1063 } else {
1064 let task_type = Arc::new(task_type);
1065 let task_id = self.persisted_task_id_factory.get();
1066 let task_id = if let Err(existing_task_id) =
1067 self.task_cache.try_insert(task_type.clone(), task_id)
1068 {
1069 unsafe {
1071 self.persisted_task_id_factory.reuse(task_id);
1072 }
1073 existing_task_id
1074 } else {
1075 task_id
1076 };
1077 if let Some(log) = &self.persisted_task_cache_log {
1078 log.lock(task_id).push((task_type, task_id));
1079 }
1080 task_id
1081 }
1082 };
1083
1084 unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1086
1087 task_id
1088 }
1089
1090 fn get_or_create_transient_task(
1091 &self,
1092 task_type: CachedTaskType,
1093 parent_task: TaskId,
1094 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1095 ) -> TaskId {
1096 if !parent_task.is_transient() {
1097 self.panic_persistent_calling_transient(
1098 self.lookup_task_type(parent_task).as_deref(),
1099 Some(&task_type),
1100 None,
1101 );
1102 }
1103 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1104 self.track_cache_hit(&task_type);
1105 self.connect_child(parent_task, task_id, turbo_tasks);
1106 return task_id;
1107 }
1108
1109 self.track_cache_miss(&task_type);
1110 let task_type = Arc::new(task_type);
1111 let task_id = self.transient_task_id_factory.get();
1112 if let Err(existing_task_id) = self.task_cache.try_insert(task_type, task_id) {
1113 unsafe {
1115 self.transient_task_id_factory.reuse(task_id);
1116 }
1117 self.connect_child(parent_task, existing_task_id, turbo_tasks);
1118 return existing_task_id;
1119 }
1120
1121 self.connect_child(parent_task, task_id, turbo_tasks);
1122
1123 task_id
1124 }
1125
1126 fn debug_trace_transient_task(
1129 &self,
1130 task_type: &CachedTaskType,
1131 cell_id: Option<CellId>,
1132 ) -> DebugTraceTransientTask {
1133 fn inner_id(
1136 backend: &TurboTasksBackendInner<impl BackingStorage>,
1137 task_id: TaskId,
1138 cell_type_id: Option<ValueTypeId>,
1139 visited_set: &mut FxHashSet<TaskId>,
1140 ) -> DebugTraceTransientTask {
1141 if let Some(task_type) = backend.lookup_task_type(task_id) {
1142 if visited_set.contains(&task_id) {
1143 let task_name = task_type.get_name();
1144 DebugTraceTransientTask::Collapsed {
1145 task_name,
1146 cell_type_id,
1147 }
1148 } else {
1149 inner_cached(backend, &task_type, cell_type_id, visited_set)
1150 }
1151 } else {
1152 DebugTraceTransientTask::Uncached { cell_type_id }
1153 }
1154 }
1155 fn inner_cached(
1156 backend: &TurboTasksBackendInner<impl BackingStorage>,
1157 task_type: &CachedTaskType,
1158 cell_type_id: Option<ValueTypeId>,
1159 visited_set: &mut FxHashSet<TaskId>,
1160 ) -> DebugTraceTransientTask {
1161 let task_name = task_type.get_name();
1162
1163 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1164 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1165 return None;
1169 };
1170 if task_id.is_transient() {
1171 Some(Box::new(inner_id(
1172 backend,
1173 task_id,
1174 cause_self_raw_vc.try_get_type_id(),
1175 visited_set,
1176 )))
1177 } else {
1178 None
1179 }
1180 });
1181 let cause_args = task_type
1182 .arg
1183 .get_raw_vcs()
1184 .into_iter()
1185 .filter_map(|raw_vc| {
1186 let Some(task_id) = raw_vc.try_get_task_id() else {
1187 return None;
1189 };
1190 if !task_id.is_transient() {
1191 return None;
1192 }
1193 Some((task_id, raw_vc.try_get_type_id()))
1194 })
1195 .collect::<IndexSet<_>>() .into_iter()
1197 .map(|(task_id, cell_type_id)| {
1198 inner_id(backend, task_id, cell_type_id, visited_set)
1199 })
1200 .collect();
1201
1202 DebugTraceTransientTask::Cached {
1203 task_name,
1204 cell_type_id,
1205 cause_self,
1206 cause_args,
1207 }
1208 }
1209 inner_cached(
1210 self,
1211 task_type,
1212 cell_id.map(|c| c.type_id),
1213 &mut FxHashSet::default(),
1214 )
1215 }
1216
1217 fn invalidate_task(
1218 &self,
1219 task_id: TaskId,
1220 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1221 ) {
1222 if !self.should_track_dependencies() {
1223 panic!("Dependency tracking is disabled so invalidation is not allowed");
1224 }
1225 operation::InvalidateOperation::run(
1226 smallvec![task_id],
1227 #[cfg(feature = "trace_task_dirty")]
1228 TaskDirtyCause::Invalidator,
1229 self.execute_context(turbo_tasks),
1230 );
1231 }
1232
1233 fn invalidate_tasks(
1234 &self,
1235 tasks: &[TaskId],
1236 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1237 ) {
1238 if !self.should_track_dependencies() {
1239 panic!("Dependency tracking is disabled so invalidation is not allowed");
1240 }
1241 operation::InvalidateOperation::run(
1242 tasks.iter().copied().collect(),
1243 #[cfg(feature = "trace_task_dirty")]
1244 TaskDirtyCause::Unknown,
1245 self.execute_context(turbo_tasks),
1246 );
1247 }
1248
1249 fn invalidate_tasks_set(
1250 &self,
1251 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1252 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1253 ) {
1254 if !self.should_track_dependencies() {
1255 panic!("Dependency tracking is disabled so invalidation is not allowed");
1256 }
1257 operation::InvalidateOperation::run(
1258 tasks.iter().copied().collect(),
1259 #[cfg(feature = "trace_task_dirty")]
1260 TaskDirtyCause::Unknown,
1261 self.execute_context(turbo_tasks),
1262 );
1263 }
1264
1265 fn invalidate_serialization(
1266 &self,
1267 task_id: TaskId,
1268 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1269 ) {
1270 if task_id.is_transient() {
1271 return;
1272 }
1273 let mut ctx = self.execute_context(turbo_tasks);
1274 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1275 task.invalidate_serialization();
1276 }
1277
1278 fn get_task_description(&self, task_id: TaskId) -> String {
1279 self.lookup_task_type(task_id).map_or_else(
1280 || format!("{task_id:?} transient"),
1281 |task_type| task_type.to_string(),
1282 )
1283 }
1284
1285 fn try_get_function_id(&self, task_id: TaskId) -> Option<FunctionId> {
1286 self.lookup_task_type(task_id)
1287 .map(|task_type| task_type.fn_type)
1288 }
1289
1290 fn task_execution_canceled(
1291 &self,
1292 task_id: TaskId,
1293 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1294 ) {
1295 let mut ctx = self.execute_context(turbo_tasks);
1296 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1297 if let Some(in_progress) = remove!(task, InProgress) {
1298 match in_progress {
1299 InProgressState::Scheduled { done_event } => done_event.notify(usize::MAX),
1300 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1301 done_event.notify(usize::MAX)
1302 }
1303 InProgressState::Canceled => {}
1304 }
1305 }
1306 task.add_new(CachedDataItem::InProgress {
1307 value: InProgressState::Canceled,
1308 });
1309 }
1310
1311 fn try_start_task_execution(
1312 &self,
1313 task_id: TaskId,
1314 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1315 ) -> Option<TaskExecutionSpec<'_>> {
1316 enum TaskType {
1317 Cached(Arc<CachedTaskType>),
1318 Transient(Arc<TransientTask>),
1319 }
1320 let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1321 (TaskType::Cached(task_type), false)
1322 } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1323 (
1324 TaskType::Transient(task_type.clone()),
1325 matches!(**task_type, TransientTask::Once(_)),
1326 )
1327 } else {
1328 return None;
1329 };
1330 {
1331 let mut ctx = self.execute_context(turbo_tasks);
1332 let mut task = ctx.task(task_id, TaskDataCategory::All);
1333 let in_progress = remove!(task, InProgress)?;
1334 let InProgressState::Scheduled { done_event } = in_progress else {
1335 task.add_new(CachedDataItem::InProgress { value: in_progress });
1336 return None;
1337 };
1338 task.add_new(CachedDataItem::InProgress {
1339 value: InProgressState::InProgress(Box::new(InProgressStateInner {
1340 stale: false,
1341 once_task,
1342 done_event,
1343 session_dependent: false,
1344 marked_as_completed: false,
1345 done: false,
1346 new_children: Default::default(),
1347 })),
1348 });
1349
1350 if self.should_track_children() {
1351 enum Collectible {
1353 Current(CollectibleRef, i32),
1354 Outdated(CollectibleRef),
1355 }
1356 let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1357 .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1358 .collect::<Vec<_>>();
1359 for collectible in collectibles {
1360 match collectible {
1361 Collectible::Current(collectible, value) => {
1362 let _ = task
1363 .insert(CachedDataItem::OutdatedCollectible { collectible, value });
1364 }
1365 Collectible::Outdated(collectible) => {
1366 if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1367 task.remove(&CachedDataItemKey::OutdatedCollectible {
1368 collectible,
1369 });
1370 }
1371 }
1372 }
1373 }
1374 }
1375
1376 if self.should_track_dependencies() {
1377 enum Dep {
1379 CurrentCell(CellRef),
1380 CurrentOutput(TaskId),
1381 OutdatedCell(CellRef),
1382 OutdatedOutput(TaskId),
1383 }
1384 let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1385 .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1386 .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1387 .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1388 .collect::<Vec<_>>();
1389 for dep in dependencies {
1390 match dep {
1391 Dep::CurrentCell(cell) => {
1392 let _ = task.add(CachedDataItem::OutdatedCellDependency {
1393 target: cell,
1394 value: (),
1395 });
1396 }
1397 Dep::CurrentOutput(output) => {
1398 let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1399 target: output,
1400 value: (),
1401 });
1402 }
1403 Dep::OutdatedCell(cell) => {
1404 if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1405 task.remove(&CachedDataItemKey::OutdatedCellDependency {
1406 target: cell,
1407 });
1408 }
1409 }
1410 Dep::OutdatedOutput(output) => {
1411 if !task
1412 .has_key(&CachedDataItemKey::OutputDependency { target: output })
1413 {
1414 task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1415 target: output,
1416 });
1417 }
1418 }
1419 }
1420 }
1421 }
1422 }
1423
1424 let (span, future) = match task_type {
1425 TaskType::Cached(task_type) => {
1426 let CachedTaskType { fn_type, this, arg } = &*task_type;
1427 (
1428 registry::get_function(*fn_type).span(task_id.persistence()),
1429 registry::get_function(*fn_type).execute(*this, &**arg),
1430 )
1431 }
1432 TaskType::Transient(task_type) => {
1433 let task_type = task_type.clone();
1434 let span = tracing::trace_span!("turbo_tasks::root_task");
1435 let future = match &*task_type {
1436 TransientTask::Root(f) => f(),
1437 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1438 };
1439 (span, future)
1440 }
1441 };
1442 Some(TaskExecutionSpec { future, span })
1443 }
1444
1445 fn task_execution_result(
1446 &self,
1447 task_id: TaskId,
1448 result: Result<RawVc, Arc<TurboTasksExecutionError>>,
1449 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1450 ) {
1451 operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
1452 }
1453
1454 fn task_execution_completed(
1455 &self,
1456 task_id: TaskId,
1457 _duration: Duration,
1458 _memory_usage: usize,
1459 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1460 stateful: bool,
1461 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1462 ) -> bool {
1463 let _span = tracing::trace_span!("task execution completed").entered();
1478 let mut ctx = self.execute_context(turbo_tasks);
1479
1480 let mut task = ctx.task(task_id, TaskDataCategory::All);
1483 let Some(in_progress) = get_mut!(task, InProgress) else {
1484 panic!("Task execution completed, but task is not in progress: {task:#?}");
1485 };
1486 let &mut InProgressState::InProgress(box InProgressStateInner {
1487 stale,
1488 ref mut done,
1489 ref done_event,
1490 ref mut new_children,
1491 ..
1492 }) = in_progress
1493 else {
1494 panic!("Task execution completed, but task is not in progress: {task:#?}");
1495 };
1496
1497 if stale {
1499 let Some(InProgressState::InProgress(box InProgressStateInner {
1500 done_event,
1501 mut new_children,
1502 ..
1503 })) = remove!(task, InProgress)
1504 else {
1505 unreachable!();
1506 };
1507 task.add_new(CachedDataItem::InProgress {
1508 value: InProgressState::Scheduled { done_event },
1509 });
1510 for task in iter_many!(task, Child { task } => task) {
1513 new_children.remove(&task);
1514 }
1515 drop(task);
1516
1517 AggregationUpdateQueue::run(
1520 AggregationUpdateJob::DecreaseActiveCounts {
1521 task_ids: new_children.into_iter().collect(),
1522 },
1523 &mut ctx,
1524 );
1525 return true;
1526 }
1527
1528 *done = true;
1530 done_event.notify(usize::MAX);
1531
1532 let mut new_children = take(new_children);
1534
1535 if stateful {
1537 let _ = task.add(CachedDataItem::Stateful { value: () });
1538 }
1539
1540 let old_counters: FxHashMap<_, _> =
1542 get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1543 let mut counters_to_remove = old_counters.clone();
1544 for (&cell_type, &max_index) in cell_counters.iter() {
1545 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1546 if old_max_index != max_index {
1547 task.insert(CachedDataItem::CellTypeMaxIndex {
1548 cell_type,
1549 value: max_index,
1550 });
1551 }
1552 } else {
1553 task.add_new(CachedDataItem::CellTypeMaxIndex {
1554 cell_type,
1555 value: max_index,
1556 });
1557 }
1558 }
1559 for (cell_type, _) in counters_to_remove {
1560 task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1561 }
1562
1563 let mut queue = AggregationUpdateQueue::new();
1564
1565 let mut removed_data = Vec::new();
1566 let mut old_edges = Vec::new();
1567
1568 let has_children = !new_children.is_empty();
1569
1570 if has_children {
1572 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1573 }
1574
1575 if has_children {
1577 old_edges.extend(
1578 iter_many!(task, Child { task } => task)
1579 .filter(|task| !new_children.remove(task))
1580 .map(OutdatedEdge::Child),
1581 );
1582 } else {
1583 old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
1584 }
1585
1586 if task_id.is_transient() || iter_many!(task, CellData { cell }
1591 if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
1592 ).count() > 0 {
1593 removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
1594 matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
1595 .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
1596 }));
1597 }
1598 if self.should_track_children() {
1599 old_edges.extend(
1600 task.iter(CachedDataItemType::OutdatedCollectible)
1601 .filter_map(|(key, value)| match (key, value) {
1602 (
1603 CachedDataItemKey::OutdatedCollectible { collectible },
1604 CachedDataItemValueRef::OutdatedCollectible { value },
1605 ) => Some(OutdatedEdge::Collectible(collectible, *value)),
1606 _ => None,
1607 }),
1608 );
1609 }
1610 if self.should_track_dependencies() {
1611 old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
1612 old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
1613 old_edges.extend(
1614 iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
1615 |(cell, task)| {
1616 if cell_counters
1617 .get(&cell.type_id)
1618 .is_none_or(|start_index| cell.index >= *start_index)
1619 {
1620 if let Some(old_counter) = old_counters.get(&cell.type_id) {
1621 if cell.index < *old_counter {
1622 return Some(OutdatedEdge::RemovedCellDependent {
1623 task_id: task,
1624 #[cfg(feature = "trace_task_dirty")]
1625 value_type_id: cell.type_id,
1626 });
1627 }
1628 }
1629 }
1630 None
1631 },
1632 ),
1633 );
1634 }
1635
1636 drop(task);
1637
1638 if !queue.is_empty() || !old_edges.is_empty() {
1639 #[cfg(feature = "trace_task_completion")]
1640 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
1641 CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx);
1645 }
1646
1647 let mut task = ctx.task(task_id, TaskDataCategory::All);
1654 let Some(in_progress) = get!(task, InProgress) else {
1655 panic!("Task execution completed, but task is not in progress: {task:#?}");
1656 };
1657 let InProgressState::InProgress(box InProgressStateInner { stale, .. }) = in_progress
1658 else {
1659 panic!("Task execution completed, but task is not in progress: {task:#?}");
1660 };
1661
1662 if *stale {
1664 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
1665 remove!(task, InProgress)
1666 else {
1667 unreachable!();
1668 };
1669 task.add_new(CachedDataItem::InProgress {
1670 value: InProgressState::Scheduled { done_event },
1671 });
1672 drop(task);
1673
1674 AggregationUpdateQueue::run(
1677 AggregationUpdateJob::DecreaseActiveCounts {
1678 task_ids: new_children.into_iter().collect(),
1679 },
1680 &mut ctx,
1681 );
1682 return true;
1683 }
1684
1685 let mut queue = AggregationUpdateQueue::new();
1686
1687 if has_children {
1688 let has_active_count = ctx.should_track_activeness()
1689 && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
1690 connect_children(
1691 task_id,
1692 &mut task,
1693 new_children,
1694 &mut queue,
1695 has_active_count,
1696 ctx.should_track_activeness(),
1697 );
1698 }
1699
1700 drop(task);
1701
1702 if has_children {
1703 #[cfg(feature = "trace_task_completion")]
1704 let _span = tracing::trace_span!("connect new children").entered();
1705 queue.execute(&mut ctx);
1706 }
1707
1708 let mut task = ctx.task(task_id, TaskDataCategory::All);
1709 let Some(in_progress) = remove!(task, InProgress) else {
1710 panic!("Task execution completed, but task is not in progress: {task:#?}");
1711 };
1712 let InProgressState::InProgress(box InProgressStateInner {
1713 done_event,
1714 once_task: _,
1715 stale,
1716 session_dependent,
1717 done: _,
1718 marked_as_completed: _,
1719 new_children,
1720 }) = in_progress
1721 else {
1722 panic!("Task execution completed, but task is not in progress: {task:#?}");
1723 };
1724 debug_assert!(new_children.is_empty());
1725
1726 if stale {
1728 task.add_new(CachedDataItem::InProgress {
1729 value: InProgressState::Scheduled { done_event },
1730 });
1731 return true;
1732 }
1733
1734 removed_data.extend(task.extract_if(
1736 CachedDataItemType::InProgressCell,
1737 |key, value| match (key, value) {
1738 (
1739 CachedDataItemKey::InProgressCell { .. },
1740 CachedDataItemValueRef::InProgressCell { value },
1741 ) => {
1742 value.event.notify(usize::MAX);
1743 true
1744 }
1745 _ => false,
1746 },
1747 ));
1748
1749 let old_dirty_state = get!(task, Dirty).copied();
1751
1752 let new_dirty_state = if session_dependent {
1753 Some(DirtyState {
1754 clean_in_session: Some(self.session_id),
1755 })
1756 } else {
1757 None
1758 };
1759
1760 let data_update = if old_dirty_state != new_dirty_state {
1761 if let Some(new_dirty_state) = new_dirty_state {
1762 task.insert(CachedDataItem::Dirty {
1763 value: new_dirty_state,
1764 });
1765 } else {
1766 task.remove(&CachedDataItemKey::Dirty {});
1767 }
1768
1769 if self.should_track_children()
1770 && (old_dirty_state.is_some() || new_dirty_state.is_some())
1771 {
1772 let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
1773 .cloned()
1774 .unwrap_or_default();
1775 if let Some(old_dirty_state) = old_dirty_state {
1776 dirty_containers.update_with_dirty_state(&old_dirty_state);
1777 }
1778 let aggregated_update = match (old_dirty_state, new_dirty_state) {
1779 (None, None) => unreachable!(),
1780 (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
1781 (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
1782 (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
1783 };
1784 if !aggregated_update.is_zero() {
1785 if aggregated_update.get(self.session_id) < 0 {
1786 if let Some(root_state) = get_mut!(task, Activeness) {
1787 root_state.all_clean_event.notify(usize::MAX);
1788 root_state.unset_active_until_clean();
1789 if root_state.is_empty() {
1790 task.remove(&CachedDataItemKey::Activeness {});
1791 }
1792 }
1793 }
1794 AggregationUpdateJob::data_update(
1795 &mut task,
1796 AggregatedDataUpdate::new()
1797 .dirty_container_update(task_id, aggregated_update),
1798 )
1799 } else {
1800 None
1801 }
1802 } else {
1803 None
1804 }
1805 } else {
1806 None
1807 };
1808
1809 drop(task);
1810
1811 if let Some(data_update) = data_update {
1812 AggregationUpdateQueue::run(data_update, &mut ctx);
1813 }
1814
1815 drop(removed_data);
1816
1817 let mut task = ctx.task(task_id, TaskDataCategory::All);
1820 task.shrink_to_fit(CachedDataItemType::CellData);
1821 task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
1822 task.shrink_to_fit(CachedDataItemType::CellDependency);
1823 task.shrink_to_fit(CachedDataItemType::OutputDependency);
1824 task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
1825 drop(task);
1826
1827 false
1828 }
1829
1830 fn run_backend_job<'a>(
1831 self: &'a Arc<Self>,
1832 id: BackendJobId,
1833 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1834 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
1835 Box::pin(async move {
1836 if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
1837 debug_assert!(self.should_persist());
1838
1839 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
1840 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
1841 loop {
1842 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(60);
1843 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(30);
1844 const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
1845
1846 let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
1847 FIRST_SNAPSHOT_WAIT
1848 } else {
1849 SNAPSHOT_INTERVAL
1850 };
1851
1852 let until = last_snapshot + time;
1853 if until > Instant::now() {
1854 let mut stop_listener = self.stopping_event.listen();
1855 if !self.stopping.load(Ordering::Acquire) {
1856 let mut idle_start_listener = self.idle_start_event.listen();
1857 let mut idle_end_listener = self.idle_end_event.listen();
1858 let mut idle_time = if turbo_tasks.is_idle() {
1859 Instant::now() + IDLE_TIMEOUT
1860 } else {
1861 far_future()
1862 };
1863 loop {
1864 tokio::select! {
1865 _ = &mut stop_listener => {
1866 break;
1867 },
1868 _ = &mut idle_start_listener => {
1869 idle_time = Instant::now() + IDLE_TIMEOUT;
1870 idle_start_listener = self.idle_start_event.listen()
1871 },
1872 _ = &mut idle_end_listener => {
1873 idle_time = until + IDLE_TIMEOUT;
1874 idle_end_listener = self.idle_end_event.listen()
1875 },
1876 _ = tokio::time::sleep_until(until) => {
1877 break;
1878 },
1879 _ = tokio::time::sleep_until(idle_time) => {
1880 if turbo_tasks.is_idle() {
1881 break;
1882 }
1883 },
1884 }
1885 }
1886 }
1887 }
1888
1889 let this = self.clone();
1890 let snapshot = turbo_tasks::spawn_blocking(move || this.snapshot()).await;
1891 if let Some((snapshot_start, new_data)) = snapshot {
1892 last_snapshot = snapshot_start;
1893 if new_data {
1894 continue;
1895 }
1896 let last_snapshot = last_snapshot.duration_since(self.start_time);
1897 self.last_snapshot.store(
1898 last_snapshot.as_millis().try_into().unwrap(),
1899 Ordering::Relaxed,
1900 );
1901
1902 turbo_tasks.schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT);
1903 return;
1904 }
1905 }
1906 }
1907 })
1908 }
1909
1910 fn try_read_own_task_cell_untracked(
1911 &self,
1912 task_id: TaskId,
1913 cell: CellId,
1914 _options: ReadCellOptions,
1915 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1916 ) -> Result<TypedCellContent> {
1917 let mut ctx = self.execute_context(turbo_tasks);
1918 let task = ctx.task(task_id, TaskDataCategory::Data);
1919 if let Some(content) = get!(task, CellData { cell }) {
1920 Ok(CellContent(Some(content.1.clone())).into_typed(cell.type_id))
1921 } else {
1922 Ok(CellContent(None).into_typed(cell.type_id))
1923 }
1924 }
1925
1926 fn read_task_collectibles(
1927 &self,
1928 task_id: TaskId,
1929 collectible_type: TraitTypeId,
1930 reader_id: TaskId,
1931 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1932 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
1933 if !self.should_track_children() {
1934 return AutoMap::default();
1935 }
1936
1937 let mut ctx = self.execute_context(turbo_tasks);
1938 let mut collectibles = AutoMap::default();
1939 {
1940 let mut task = ctx.task(task_id, TaskDataCategory::All);
1941 loop {
1943 let aggregation_number = get_aggregation_number(&task);
1944 if is_root_node(aggregation_number) {
1945 break;
1946 }
1947 drop(task);
1948 AggregationUpdateQueue::run(
1949 AggregationUpdateJob::UpdateAggregationNumber {
1950 task_id,
1951 base_aggregation_number: u32::MAX,
1952 distance: None,
1953 },
1954 &mut ctx,
1955 );
1956 task = ctx.task(task_id, TaskDataCategory::All);
1957 }
1958 for collectible in iter_many!(
1959 task,
1960 AggregatedCollectible {
1961 collectible
1962 } count if collectible.collectible_type == collectible_type && *count > 0 => {
1963 collectible.cell
1964 }
1965 ) {
1966 *collectibles
1967 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
1968 .or_insert(0) += 1;
1969 }
1970 for (collectible, count) in iter_many!(
1971 task,
1972 Collectible {
1973 collectible
1974 } count if collectible.collectible_type == collectible_type => {
1975 (collectible.cell, *count)
1976 }
1977 ) {
1978 *collectibles
1979 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
1980 .or_insert(0) += count;
1981 }
1982 let _ = task.add(CachedDataItem::CollectiblesDependent {
1983 collectible_type,
1984 task: reader_id,
1985 value: (),
1986 });
1987 }
1988 {
1989 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
1990 let target = CollectiblesRef {
1991 task: task_id,
1992 collectible_type,
1993 };
1994 if reader.add(CachedDataItem::CollectiblesDependency { target, value: () }) {
1995 reader.remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target });
1996 }
1997 }
1998 collectibles
1999 }
2000
2001 fn emit_collectible(
2002 &self,
2003 collectible_type: TraitTypeId,
2004 collectible: RawVc,
2005 task_id: TaskId,
2006 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2007 ) {
2008 self.assert_valid_collectible(task_id, collectible);
2009 if !self.should_track_children() {
2010 return;
2011 }
2012
2013 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2014 panic!("Collectibles need to be resolved");
2015 };
2016 let cell = CellRef {
2017 task: collectible_task,
2018 cell,
2019 };
2020 operation::UpdateCollectibleOperation::run(
2021 task_id,
2022 CollectibleRef {
2023 collectible_type,
2024 cell,
2025 },
2026 1,
2027 self.execute_context(turbo_tasks),
2028 );
2029 }
2030
2031 fn unemit_collectible(
2032 &self,
2033 collectible_type: TraitTypeId,
2034 collectible: RawVc,
2035 count: u32,
2036 task_id: TaskId,
2037 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2038 ) {
2039 self.assert_valid_collectible(task_id, collectible);
2040 if !self.should_track_children() {
2041 return;
2042 }
2043
2044 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2045 panic!("Collectibles need to be resolved");
2046 };
2047 let cell = CellRef {
2048 task: collectible_task,
2049 cell,
2050 };
2051 operation::UpdateCollectibleOperation::run(
2052 task_id,
2053 CollectibleRef {
2054 collectible_type,
2055 cell,
2056 },
2057 -(i32::try_from(count).unwrap()),
2058 self.execute_context(turbo_tasks),
2059 );
2060 }
2061
2062 fn update_task_cell(
2063 &self,
2064 task_id: TaskId,
2065 cell: CellId,
2066 content: CellContent,
2067 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2068 ) {
2069 operation::UpdateCellOperation::run(
2070 task_id,
2071 cell,
2072 content,
2073 self.execute_context(turbo_tasks),
2074 );
2075 }
2076
2077 fn mark_own_task_as_session_dependent(
2078 &self,
2079 task_id: TaskId,
2080 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2081 ) {
2082 if !self.should_track_dependencies() {
2083 return;
2085 }
2086 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2087 let mut ctx = self.execute_context(turbo_tasks);
2088 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2089 let aggregation_number = get_aggregation_number(&task);
2090 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2091 drop(task);
2092 AggregationUpdateQueue::run(
2095 AggregationUpdateJob::UpdateAggregationNumber {
2096 task_id,
2097 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2098 distance: None,
2099 },
2100 &mut ctx,
2101 );
2102 task = ctx.task(task_id, TaskDataCategory::Meta);
2103 }
2104 if let Some(InProgressState::InProgress(box InProgressStateInner {
2105 session_dependent,
2106 ..
2107 })) = get_mut!(task, InProgress)
2108 {
2109 *session_dependent = true;
2110 }
2111 }
2112
2113 fn mark_own_task_as_finished(
2114 &self,
2115 task: TaskId,
2116 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2117 ) {
2118 let mut ctx = self.execute_context(turbo_tasks);
2119 let mut task = ctx.task(task, TaskDataCategory::Data);
2120 if let Some(InProgressState::InProgress(box InProgressStateInner {
2121 marked_as_completed,
2122 ..
2123 })) = get_mut!(task, InProgress)
2124 {
2125 *marked_as_completed = true;
2126 }
2131 }
2132
2133 fn set_own_task_aggregation_number(
2134 &self,
2135 task: TaskId,
2136 aggregation_number: u32,
2137 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2138 ) {
2139 let mut ctx = self.execute_context(turbo_tasks);
2140 AggregationUpdateQueue::run(
2141 AggregationUpdateJob::UpdateAggregationNumber {
2142 task_id: task,
2143 base_aggregation_number: aggregation_number,
2144 distance: None,
2145 },
2146 &mut ctx,
2147 );
2148 }
2149
2150 fn connect_task(
2151 &self,
2152 task: TaskId,
2153 parent_task: TaskId,
2154 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2155 ) {
2156 self.assert_not_persistent_calling_transient(parent_task, task, None);
2157 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2158 }
2159
2160 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2161 let task_id = self.transient_task_id_factory.get();
2162 let root_type = match task_type {
2163 TransientTaskType::Root(_) => RootType::RootTask,
2164 TransientTaskType::Once(_) => RootType::OnceTask,
2165 };
2166 self.transient_tasks.insert(
2167 task_id,
2168 Arc::new(match task_type {
2169 TransientTaskType::Root(f) => TransientTask::Root(f),
2170 TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2171 }),
2172 );
2173 {
2174 let mut task = self.storage.access_mut(task_id);
2175 task.add(CachedDataItem::AggregationNumber {
2176 value: AggregationNumber {
2177 base: u32::MAX,
2178 distance: 0,
2179 effective: u32::MAX,
2180 },
2181 });
2182 if self.should_track_activeness() {
2183 task.add(CachedDataItem::Activeness {
2184 value: ActivenessState::new_root(root_type, task_id),
2185 });
2186 }
2187 task.add(CachedDataItem::new_scheduled(move || match root_type {
2188 RootType::RootTask => "Root Task".to_string(),
2189 RootType::OnceTask => "Once Task".to_string(),
2190 }));
2191 }
2192 #[cfg(feature = "verify_aggregation_graph")]
2193 self.root_tasks.lock().insert(task_id);
2194 task_id
2195 }
2196
2197 fn dispose_root_task(
2198 &self,
2199 task_id: TaskId,
2200 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2201 ) {
2202 #[cfg(feature = "verify_aggregation_graph")]
2203 self.root_tasks.lock().remove(&task_id);
2204
2205 let mut ctx = self.execute_context(turbo_tasks);
2206 let mut task = ctx.task(task_id, TaskDataCategory::All);
2207 let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2208 let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2209 .map_or(false, |dirty_containers| {
2210 dirty_containers.get(self.session_id) > 0
2211 });
2212 if is_dirty || has_dirty_containers {
2213 if let Some(root_state) = get_mut!(task, Activeness) {
2214 root_state.unset_root_type();
2216 root_state.set_active_until_clean();
2217 };
2218 } else if let Some(root_state) = remove!(task, Activeness) {
2219 root_state.all_clean_event.notify(usize::MAX);
2222 }
2223 }
2224
2225 #[cfg(feature = "verify_aggregation_graph")]
2226 fn verify_aggregation_graph(
2227 &self,
2228 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2229 idle: bool,
2230 ) {
2231 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2232 return;
2233 }
2234 use std::{collections::VecDeque, env, io::stdout};
2235
2236 use crate::backend::operation::{get_uppers, is_aggregating_node};
2237
2238 let mut ctx = self.execute_context(turbo_tasks);
2239 let root_tasks = self.root_tasks.lock().clone();
2240 let len = root_tasks.len();
2241
2242 for (i, task_id) in root_tasks.into_iter().enumerate() {
2243 println!("Verifying graph from root {task_id} {i}/{len}...");
2244 let mut queue = VecDeque::new();
2245 let mut visited = FxHashSet::default();
2246 let mut aggregated_nodes = FxHashSet::default();
2247 let mut collectibles = FxHashMap::default();
2248 let root_task_id = task_id;
2249 visited.insert(task_id);
2250 aggregated_nodes.insert(task_id);
2251 queue.push_back(task_id);
2252 let mut counter = 0;
2253 while let Some(task_id) = queue.pop_front() {
2254 counter += 1;
2255 if counter % 100000 == 0 {
2256 println!(
2257 "queue={}, visited={}, aggregated_nodes={}",
2258 queue.len(),
2259 visited.len(),
2260 aggregated_nodes.len()
2261 );
2262 }
2263 let task = ctx.task(task_id, TaskDataCategory::All);
2264 if idle && !self.is_idle.load(Ordering::Relaxed) {
2265 return;
2266 }
2267
2268 let uppers = get_uppers(&task);
2269 if task_id != root_task_id
2270 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2271 {
2272 println!(
2273 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2274 {:?})",
2275 task_id,
2276 ctx.get_task_description(task_id),
2277 uppers
2278 );
2279 }
2280
2281 let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2282 for collectible in aggregated_collectibles {
2283 collectibles
2284 .entry(collectible)
2285 .or_insert_with(|| (false, Vec::new()))
2286 .1
2287 .push(task_id);
2288 }
2289
2290 let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2291 for collectible in own_collectibles {
2292 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2293 *flag = true
2294 } else {
2295 println!(
2296 "Task {} has a collectible {:?} that is not in any upper task",
2297 task_id, collectible
2298 );
2299 }
2300 }
2301
2302 let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2303 let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2304 .is_some_and(|count| count.get(self.session_id) > 0);
2305 let should_be_in_upper = is_dirty || has_dirty_container;
2306
2307 let aggregation_number = get_aggregation_number(&task);
2308 if is_aggregating_node(aggregation_number) {
2309 aggregated_nodes.insert(task_id);
2310 }
2311 for child_id in iter_many!(task, Child { task } => task) {
2318 if visited.insert(child_id) {
2320 queue.push_back(child_id);
2321 }
2322 }
2323 drop(task);
2324
2325 if should_be_in_upper {
2326 for upper_id in uppers {
2327 let task = ctx.task(task_id, TaskDataCategory::All);
2328 let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2329 .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2330 if !in_upper {
2331 println!(
2332 "Task {} is dirty, but is not listed in the upper task {}",
2333 task_id, upper_id
2334 );
2335 }
2336 }
2337 }
2338 }
2339
2340 for (collectible, (flag, task_ids)) in collectibles {
2341 if !flag {
2342 use std::io::Write;
2343 let mut stdout = stdout().lock();
2344 writeln!(
2345 stdout,
2346 "{:?} that is not emitted in any child task but in these aggregated \
2347 tasks: {:#?}",
2348 collectible,
2349 task_ids
2350 .iter()
2351 .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
2352 .collect::<Vec<_>>()
2353 );
2354
2355 let task_id = collectible.cell.task;
2356 let mut queue = {
2357 let task = ctx.task(task_id, TaskDataCategory::All);
2358 get_uppers(&task)
2359 };
2360 let mut visited = FxHashSet::default();
2361 for &upper_id in queue.iter() {
2362 visited.insert(upper_id);
2363 writeln!(stdout, "{task_id:?} -> {upper_id:?}");
2364 }
2365 while let Some(task_id) = queue.pop() {
2366 let desc = ctx.get_task_description(task_id);
2367 let task = ctx.task(task_id, TaskDataCategory::All);
2368 let aggregated_collectible =
2369 get!(task, AggregatedCollectible { collectible })
2370 .copied()
2371 .unwrap_or_default();
2372 let uppers = get_uppers(&task);
2373 drop(task);
2374 writeln!(
2375 stdout,
2376 "upper {task_id} {desc} collectible={aggregated_collectible}"
2377 );
2378 if task_ids.contains(&task_id) {
2379 writeln!(
2380 stdout,
2381 "Task has an upper connection to an aggregated task that doesn't \
2382 reference it. Upper connection is invalid!"
2383 );
2384 }
2385 for upper_id in uppers {
2386 writeln!(stdout, "{task_id:?} -> {upper_id:?}");
2387 if !visited.contains(&upper_id) {
2388 queue.push(upper_id);
2389 }
2390 }
2391 }
2392 }
2393 }
2394 println!("visited {task_id} {} tasks", visited.len());
2395 }
2396 }
2397
2398 fn assert_not_persistent_calling_transient(
2399 &self,
2400 parent_id: TaskId,
2401 child_id: TaskId,
2402 cell_id: Option<CellId>,
2403 ) {
2404 if !parent_id.is_transient() && child_id.is_transient() {
2405 self.panic_persistent_calling_transient(
2406 self.lookup_task_type(parent_id).as_deref(),
2407 self.lookup_task_type(child_id).as_deref(),
2408 cell_id,
2409 );
2410 }
2411 }
2412
2413 fn panic_persistent_calling_transient(
2414 &self,
2415 parent: Option<&CachedTaskType>,
2416 child: Option<&CachedTaskType>,
2417 cell_id: Option<CellId>,
2418 ) {
2419 let transient_reason = if let Some(child) = child {
2420 Cow::Owned(format!(
2421 " The callee is transient because it depends on:\n{}",
2422 self.debug_trace_transient_task(child, cell_id),
2423 ))
2424 } else {
2425 Cow::Borrowed("")
2426 };
2427 panic!(
2428 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
2429 parent.map_or("unknown", |t| t.get_name()),
2430 child.map_or("unknown", |t| t.get_name()),
2431 transient_reason,
2432 );
2433 }
2434
2435 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
2436 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
2438 let task_info = if let Some(col_task_ty) = collectible
2440 .try_get_task_id()
2441 .and_then(|t| self.lookup_task_type(t))
2442 {
2443 Cow::Owned(format!(" (return type of {col_task_ty})"))
2444 } else {
2445 Cow::Borrowed("")
2446 };
2447 panic!("Collectible{task_info} must be a ResolvedVc")
2448 };
2449 if col_task_id.is_transient() && !task_id.is_transient() {
2450 let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
2451 Cow::Owned(format!(
2452 ". The collectible is transient because it depends on:\n{}",
2453 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
2454 ))
2455 } else {
2456 Cow::Borrowed("")
2457 };
2458 panic!(
2460 "Collectible is transient, transient collectibles cannot be emitted from \
2461 persistent tasks{transient_reason}",
2462 )
2463 }
2464 }
2465}
2466
2467impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
2468 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2469 self.0.startup(turbo_tasks);
2470 }
2471
2472 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2473 self.0.stopping();
2474 }
2475
2476 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2477 self.0.stop(turbo_tasks);
2478 }
2479
2480 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2481 self.0.idle_start(turbo_tasks);
2482 }
2483
2484 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2485 self.0.idle_end();
2486 }
2487
2488 fn get_or_create_persistent_task(
2489 &self,
2490 task_type: CachedTaskType,
2491 parent_task: TaskId,
2492 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2493 ) -> TaskId {
2494 self.0
2495 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
2496 }
2497
2498 fn get_or_create_transient_task(
2499 &self,
2500 task_type: CachedTaskType,
2501 parent_task: TaskId,
2502 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2503 ) -> TaskId {
2504 self.0
2505 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
2506 }
2507
2508 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2509 self.0.invalidate_task(task_id, turbo_tasks);
2510 }
2511
2512 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2513 self.0.invalidate_tasks(tasks, turbo_tasks);
2514 }
2515
2516 fn invalidate_tasks_set(
2517 &self,
2518 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
2519 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2520 ) {
2521 self.0.invalidate_tasks_set(tasks, turbo_tasks);
2522 }
2523
2524 fn invalidate_serialization(
2525 &self,
2526 task_id: TaskId,
2527 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2528 ) {
2529 self.0.invalidate_serialization(task_id, turbo_tasks);
2530 }
2531
2532 fn get_task_description(&self, task: TaskId) -> String {
2533 self.0.get_task_description(task)
2534 }
2535
2536 fn try_get_function_id(&self, task_id: TaskId) -> Option<FunctionId> {
2537 self.0.try_get_function_id(task_id)
2538 }
2539
2540 type TaskState = ();
2541 fn new_task_state(&self, _task: TaskId) -> Self::TaskState {}
2542
2543 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2544 self.0.task_execution_canceled(task, turbo_tasks)
2545 }
2546
2547 fn try_start_task_execution(
2548 &self,
2549 task_id: TaskId,
2550 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2551 ) -> Option<TaskExecutionSpec<'_>> {
2552 self.0.try_start_task_execution(task_id, turbo_tasks)
2553 }
2554
2555 fn task_execution_result(
2556 &self,
2557 task_id: TaskId,
2558 result: Result<RawVc, Arc<TurboTasksExecutionError>>,
2559 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2560 ) {
2561 self.0.task_execution_result(task_id, result, turbo_tasks);
2562 }
2563
2564 fn task_execution_completed(
2565 &self,
2566 task_id: TaskId,
2567 _duration: Duration,
2568 _memory_usage: usize,
2569 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2570 stateful: bool,
2571 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2572 ) -> bool {
2573 self.0.task_execution_completed(
2574 task_id,
2575 _duration,
2576 _memory_usage,
2577 cell_counters,
2578 stateful,
2579 turbo_tasks,
2580 )
2581 }
2582
2583 fn run_backend_job<'a>(
2584 &'a self,
2585 id: BackendJobId,
2586 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
2587 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2588 self.0.run_backend_job(id, turbo_tasks)
2589 }
2590
2591 fn try_read_task_output(
2592 &self,
2593 task_id: TaskId,
2594 reader: TaskId,
2595 consistency: ReadConsistency,
2596 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2597 ) -> Result<Result<RawVc, EventListener>> {
2598 self.0
2599 .try_read_task_output(task_id, Some(reader), consistency, turbo_tasks)
2600 }
2601
2602 fn try_read_task_output_untracked(
2603 &self,
2604 task_id: TaskId,
2605 consistency: ReadConsistency,
2606 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2607 ) -> Result<Result<RawVc, EventListener>> {
2608 self.0
2609 .try_read_task_output(task_id, None, consistency, turbo_tasks)
2610 }
2611
2612 fn try_read_task_cell(
2613 &self,
2614 task_id: TaskId,
2615 cell: CellId,
2616 reader: TaskId,
2617 options: ReadCellOptions,
2618 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2619 ) -> Result<Result<TypedCellContent, EventListener>> {
2620 self.0
2621 .try_read_task_cell(task_id, Some(reader), cell, options, turbo_tasks)
2622 }
2623
2624 fn try_read_task_cell_untracked(
2625 &self,
2626 task_id: TaskId,
2627 cell: CellId,
2628 options: ReadCellOptions,
2629 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2630 ) -> Result<Result<TypedCellContent, EventListener>> {
2631 self.0
2632 .try_read_task_cell(task_id, None, cell, options, turbo_tasks)
2633 }
2634
2635 fn try_read_own_task_cell_untracked(
2636 &self,
2637 task_id: TaskId,
2638 cell: CellId,
2639 options: ReadCellOptions,
2640 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2641 ) -> Result<TypedCellContent> {
2642 self.0
2643 .try_read_own_task_cell_untracked(task_id, cell, options, turbo_tasks)
2644 }
2645
2646 fn read_task_collectibles(
2647 &self,
2648 task_id: TaskId,
2649 collectible_type: TraitTypeId,
2650 reader: TaskId,
2651 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2652 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2653 self.0
2654 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
2655 }
2656
2657 fn emit_collectible(
2658 &self,
2659 collectible_type: TraitTypeId,
2660 collectible: RawVc,
2661 task_id: TaskId,
2662 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2663 ) {
2664 self.0
2665 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
2666 }
2667
2668 fn unemit_collectible(
2669 &self,
2670 collectible_type: TraitTypeId,
2671 collectible: RawVc,
2672 count: u32,
2673 task_id: TaskId,
2674 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2675 ) {
2676 self.0
2677 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
2678 }
2679
2680 fn update_task_cell(
2681 &self,
2682 task_id: TaskId,
2683 cell: CellId,
2684 content: CellContent,
2685 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2686 ) {
2687 self.0.update_task_cell(task_id, cell, content, turbo_tasks);
2688 }
2689
2690 fn mark_own_task_as_finished(
2691 &self,
2692 task_id: TaskId,
2693 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2694 ) {
2695 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
2696 }
2697
2698 fn set_own_task_aggregation_number(
2699 &self,
2700 task: TaskId,
2701 aggregation_number: u32,
2702 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2703 ) {
2704 self.0
2705 .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
2706 }
2707
2708 fn mark_own_task_as_session_dependent(
2709 &self,
2710 task: TaskId,
2711 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2712 ) {
2713 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
2714 }
2715
2716 fn connect_task(
2717 &self,
2718 task: TaskId,
2719 parent_task: TaskId,
2720 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2721 ) {
2722 self.0.connect_task(task, parent_task, turbo_tasks);
2723 }
2724
2725 fn create_transient_task(
2726 &self,
2727 task_type: TransientTaskType,
2728 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2729 ) -> TaskId {
2730 self.0.create_transient_task(task_type)
2731 }
2732
2733 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2734 self.0.dispose_root_task(task_id, turbo_tasks);
2735 }
2736
2737 fn task_statistics(&self) -> &TaskStatisticsApi {
2738 &self.0.task_statistics
2739 }
2740}
2741
2742enum DebugTraceTransientTask {
2743 Cached {
2744 task_name: &'static str,
2745 cell_type_id: Option<ValueTypeId>,
2746 cause_self: Option<Box<DebugTraceTransientTask>>,
2747 cause_args: Vec<DebugTraceTransientTask>,
2748 },
2749 Collapsed {
2751 task_name: &'static str,
2752 cell_type_id: Option<ValueTypeId>,
2753 },
2754 Uncached {
2755 cell_type_id: Option<ValueTypeId>,
2756 },
2757}
2758
2759impl DebugTraceTransientTask {
2760 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
2761 let indent = " ".repeat(level);
2762 f.write_str(&indent)?;
2763
2764 fn fmt_cell_type_id(
2765 f: &mut fmt::Formatter<'_>,
2766 cell_type_id: Option<ValueTypeId>,
2767 ) -> fmt::Result {
2768 if let Some(ty) = cell_type_id {
2769 write!(f, " (read cell of type {})", get_value_type_global_name(ty))
2770 } else {
2771 Ok(())
2772 }
2773 }
2774
2775 match self {
2777 Self::Cached {
2778 task_name,
2779 cell_type_id,
2780 ..
2781 }
2782 | Self::Collapsed {
2783 task_name,
2784 cell_type_id,
2785 ..
2786 } => {
2787 f.write_str(task_name)?;
2788 fmt_cell_type_id(f, *cell_type_id)?;
2789 if matches!(self, Self::Collapsed { .. }) {
2790 f.write_str(" (collapsed)")?;
2791 }
2792 }
2793 Self::Uncached { cell_type_id } => {
2794 f.write_str("unknown transient task")?;
2795 fmt_cell_type_id(f, *cell_type_id)?;
2796 }
2797 }
2798 f.write_char('\n')?;
2799
2800 if let Self::Cached {
2802 cause_self,
2803 cause_args,
2804 ..
2805 } = self
2806 {
2807 if let Some(c) = cause_self {
2808 writeln!(f, "{indent} self:")?;
2809 c.fmt_indented(f, level + 1)?;
2810 }
2811 if !cause_args.is_empty() {
2812 writeln!(f, "{indent} args:")?;
2813 for c in cause_args {
2814 c.fmt_indented(f, level + 1)?;
2815 }
2816 }
2817 }
2818 Ok(())
2819 }
2820}
2821
2822impl fmt::Display for DebugTraceTransientTask {
2823 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2824 self.fmt_indented(f, 0)
2825 }
2826}
2827
2828fn far_future() -> Instant {
2830 Instant::now() + Duration::from_secs(86400 * 365 * 30)
2835}