1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6 borrow::Cow,
7 fmt::{self, Write},
8 future::Future,
9 hash::BuildHasherDefault,
10 mem::take,
11 pin::Pin,
12 sync::{
13 Arc,
14 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
15 },
16 thread::available_parallelism,
17};
18
19use anyhow::{Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use turbo_tasks::{
27 CellId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency, SessionId,
28 TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi,
29 ValueTypeId,
30 backend::{
31 Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
32 TransientTaskType, TurboTasksExecutionError, TypedCellContent,
33 },
34 event::{Event, EventListener},
35 message_queue::TimingEvent,
36 registry::{self, get_value_type_global_name},
37 task_statistics::TaskStatisticsApi,
38 trace::TraceRawVcs,
39 turbo_tasks,
40 util::IdFactoryWithReuse,
41};
42
43pub use self::{operation::AnyOperation, storage::TaskDataCategory};
44#[cfg(feature = "trace_task_dirty")]
45use crate::backend::operation::TaskDirtyCause;
46use crate::{
47 backend::{
48 operation::{
49 AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
50 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
51 Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
52 is_root_node, prepare_new_children,
53 },
54 storage::{
55 InnerStorageSnapshot, Storage, get, get_many, get_mut, get_mut_or_insert_with,
56 iter_many, remove,
57 },
58 },
59 backing_storage::BackingStorage,
60 data::{
61 ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
62 CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
63 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
64 },
65 utils::{
66 bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
67 },
68};
69
70const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
71const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) };
72
73const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
74
75struct SnapshotRequest {
76 snapshot_requested: bool,
77 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
78}
79
80impl SnapshotRequest {
81 fn new() -> Self {
82 Self {
83 snapshot_requested: false,
84 suspended_operations: FxHashSet::default(),
85 }
86 }
87}
88
89type TransientTaskOnce =
90 Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
91
92pub enum TransientTask {
93 Root(TransientTaskRoot),
99
100 Once(TransientTaskOnce),
109}
110
111pub enum StorageMode {
112 ReadOnly,
114 ReadWrite,
117}
118
119pub struct BackendOptions {
120 pub dependency_tracking: bool,
125
126 pub children_tracking: bool,
131
132 pub active_tracking: bool,
138
139 pub storage_mode: Option<StorageMode>,
141
142 pub small_preallocation: bool,
144}
145
146impl Default for BackendOptions {
147 fn default() -> Self {
148 Self {
149 dependency_tracking: true,
150 children_tracking: true,
151 active_tracking: true,
152 storage_mode: Some(StorageMode::ReadWrite),
153 small_preallocation: false,
154 }
155 }
156}
157
158pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
159
160type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
161
162struct TurboTasksBackendInner<B: BackingStorage> {
163 options: BackendOptions,
164
165 start_time: Instant,
166 session_id: SessionId,
167
168 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
169 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
170
171 persisted_task_cache_log: Option<TaskCacheLog>,
172 task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
173 transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
174
175 storage: Storage,
176
177 in_progress_operations: AtomicUsize,
183
184 snapshot_request: Mutex<SnapshotRequest>,
185 operations_suspended: Condvar,
189 snapshot_completed: Condvar,
192 last_snapshot: AtomicU64,
194
195 stopping: AtomicBool,
196 stopping_event: Event,
197 idle_start_event: Event,
198 idle_end_event: Event,
199 #[cfg(feature = "verify_aggregation_graph")]
200 is_idle: AtomicBool,
201
202 task_statistics: TaskStatisticsApi,
203
204 backing_storage: B,
205
206 #[cfg(feature = "verify_aggregation_graph")]
207 root_tasks: Mutex<FxHashSet<TaskId>>,
208}
209
210impl<B: BackingStorage> TurboTasksBackend<B> {
211 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
212 Self(Arc::new(TurboTasksBackendInner::new(
213 options,
214 backing_storage,
215 )))
216 }
217
218 pub fn backing_storage(&self) -> &B {
219 &self.0.backing_storage
220 }
221}
222
223impl<B: BackingStorage> TurboTasksBackendInner<B> {
224 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
225 let shard_amount =
226 (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
227 let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
228 if !options.dependency_tracking {
229 options.active_tracking = false;
230 }
231 let small_preallocation = options.small_preallocation;
232 Self {
233 options,
234 start_time: Instant::now(),
235 session_id: backing_storage
236 .next_session_id()
237 .expect("Failed get session id"),
238 persisted_task_id_factory: IdFactoryWithReuse::new(
239 backing_storage
240 .next_free_task_id()
241 .expect("Failed to get task id"),
242 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
243 ),
244 transient_task_id_factory: IdFactoryWithReuse::new(
245 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
246 TaskId::MAX,
247 ),
248 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
249 task_cache: BiMap::new(),
250 transient_tasks: FxDashMap::default(),
251 storage: Storage::new(small_preallocation),
252 in_progress_operations: AtomicUsize::new(0),
253 snapshot_request: Mutex::new(SnapshotRequest::new()),
254 operations_suspended: Condvar::new(),
255 snapshot_completed: Condvar::new(),
256 last_snapshot: AtomicU64::new(0),
257 stopping: AtomicBool::new(false),
258 stopping_event: Event::new(|| "TurboTasksBackend::stopping_event".to_string()),
259 idle_start_event: Event::new(|| "TurboTasksBackend::idle_start_event".to_string()),
260 idle_end_event: Event::new(|| "TurboTasksBackend::idle_end_event".to_string()),
261 #[cfg(feature = "verify_aggregation_graph")]
262 is_idle: AtomicBool::new(false),
263 task_statistics: TaskStatisticsApi::default(),
264 backing_storage,
265 #[cfg(feature = "verify_aggregation_graph")]
266 root_tasks: Default::default(),
267 }
268 }
269
270 fn execute_context<'a>(
271 &'a self,
272 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
273 ) -> impl ExecuteContext<'a> {
274 ExecuteContextImpl::new(self, turbo_tasks)
275 }
276
277 fn session_id(&self) -> SessionId {
278 self.session_id
279 }
280
281 unsafe fn execute_context_with_tx<'e, 'tx>(
285 &'e self,
286 tx: Option<&'e B::ReadTransaction<'tx>>,
287 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
288 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
289 where
290 'tx: 'e,
291 {
292 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
294 }
295
296 fn suspending_requested(&self) -> bool {
297 self.should_persist()
298 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
299 }
300
301 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
302 #[cold]
303 fn operation_suspend_point_cold<B: BackingStorage>(
304 this: &TurboTasksBackendInner<B>,
305 suspend: impl FnOnce() -> AnyOperation,
306 ) {
307 let operation = Arc::new(suspend());
308 let mut snapshot_request = this.snapshot_request.lock();
309 if snapshot_request.snapshot_requested {
310 snapshot_request
311 .suspended_operations
312 .insert(operation.clone().into());
313 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
314 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
315 if value == SNAPSHOT_REQUESTED_BIT {
316 this.operations_suspended.notify_all();
317 }
318 this.snapshot_completed
319 .wait_while(&mut snapshot_request, |snapshot_request| {
320 snapshot_request.snapshot_requested
321 });
322 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
323 snapshot_request
324 .suspended_operations
325 .remove(&operation.into());
326 }
327 }
328
329 if self.suspending_requested() {
330 operation_suspend_point_cold(self, suspend);
331 }
332 }
333
334 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
335 if !self.should_persist() {
336 return OperationGuard { backend: None };
337 }
338 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
339 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
340 let mut snapshot_request = self.snapshot_request.lock();
341 if snapshot_request.snapshot_requested {
342 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
343 if value == SNAPSHOT_REQUESTED_BIT {
344 self.operations_suspended.notify_all();
345 }
346 self.snapshot_completed
347 .wait_while(&mut snapshot_request, |snapshot_request| {
348 snapshot_request.snapshot_requested
349 });
350 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
351 }
352 }
353 OperationGuard {
354 backend: Some(self),
355 }
356 }
357
358 fn should_persist(&self) -> bool {
359 matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
360 }
361
362 fn should_restore(&self) -> bool {
363 self.options.storage_mode.is_some()
364 }
365
366 fn should_track_dependencies(&self) -> bool {
367 self.options.dependency_tracking
368 }
369
370 fn should_track_activeness(&self) -> bool {
371 self.options.active_tracking
372 }
373
374 fn should_track_children(&self) -> bool {
375 self.options.children_tracking
376 }
377
378 fn track_cache_hit(&self, task_type: &CachedTaskType) {
379 self.task_statistics
380 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
381 }
382
383 fn track_cache_miss(&self, task_type: &CachedTaskType) {
384 self.task_statistics
385 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
386 }
387}
388
389pub(crate) struct OperationGuard<'a, B: BackingStorage> {
390 backend: Option<&'a TurboTasksBackendInner<B>>,
391}
392
393impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
394 fn drop(&mut self) {
395 if let Some(backend) = self.backend {
396 let fetch_sub = backend
397 .in_progress_operations
398 .fetch_sub(1, Ordering::AcqRel);
399 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
400 backend.operations_suspended.notify_all();
401 }
402 }
403 }
404}
405
406impl<B: BackingStorage> TurboTasksBackendInner<B> {
408 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
412 &'l self,
413 tx: Option<&'l B::ReadTransaction<'tx>>,
414 parent_task: TaskId,
415 child_task: TaskId,
416 is_immutable: bool,
417 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
418 ) {
419 operation::ConnectChildOperation::run(parent_task, child_task, is_immutable, unsafe {
420 self.execute_context_with_tx(tx, turbo_tasks)
421 });
422 }
423
424 fn connect_child(
425 &self,
426 parent_task: TaskId,
427 child_task: TaskId,
428 is_immutable: bool,
429 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
430 ) {
431 operation::ConnectChildOperation::run(
432 parent_task,
433 child_task,
434 is_immutable,
435 self.execute_context(turbo_tasks),
436 );
437 }
438
439 fn try_read_task_output(
440 &self,
441 task_id: TaskId,
442 reader: Option<TaskId>,
443 consistency: ReadConsistency,
444 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
445 ) -> Result<Result<RawVc, EventListener>> {
446 if let Some(reader) = reader {
447 self.assert_not_persistent_calling_transient(reader, task_id, None);
448 }
449
450 let mut ctx = self.execute_context(turbo_tasks);
451 let mut task = ctx.task(task_id, TaskDataCategory::All);
452
453 fn listen_to_done_event<B: BackingStorage>(
454 this: &TurboTasksBackendInner<B>,
455 reader: Option<TaskId>,
456 done_event: &Event,
457 ) -> EventListener {
458 let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
459 done_event.listen_with_note(move || {
460 if let Some(reader_desc) = reader_desc.as_ref() {
461 format!("try_read_task_output from {}", reader_desc())
462 } else {
463 "try_read_task_output (untracked)".to_string()
464 }
465 })
466 }
467
468 fn check_in_progress<B: BackingStorage>(
469 this: &TurboTasksBackendInner<B>,
470 task: &impl TaskGuard,
471 reader: Option<TaskId>,
472 ctx: &impl ExecuteContext<'_>,
473 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
474 {
475 match get!(task, InProgress) {
476 Some(InProgressState::Scheduled { done_event, .. }) => {
477 Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
478 }
479 Some(InProgressState::InProgress(box InProgressStateInner {
480 done,
481 done_event,
482 ..
483 })) => {
484 if !*done {
485 Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
486 } else {
487 None
488 }
489 }
490 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
491 "{} was canceled",
492 ctx.get_task_description(task.id())
493 ))),
494 None => None,
495 }
496 }
497
498 if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
499 loop {
501 let aggregation_number = get_aggregation_number(&task);
502 if is_root_node(aggregation_number) {
503 break;
504 }
505 drop(task);
506 {
507 let _span = tracing::trace_span!(
508 "make root node for strongly consistent read",
509 %task_id
510 )
511 .entered();
512 AggregationUpdateQueue::run(
513 AggregationUpdateJob::UpdateAggregationNumber {
514 task_id,
515 base_aggregation_number: u32::MAX,
516 distance: None,
517 },
518 &mut ctx,
519 );
520 }
521 task = ctx.task(task_id, TaskDataCategory::All);
522 }
523
524 let is_dirty =
525 get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
526
527 let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
529 .cloned()
530 .unwrap_or_default()
531 .get(self.session_id);
532 if dirty_tasks > 0 || is_dirty {
533 let root = get_mut!(task, Activeness);
534 let mut task_ids_to_schedule: Vec<_> = Vec::new();
535 let root = if let Some(root) = root {
537 root.set_active_until_clean();
541 root
542 } else {
543 get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
547 .set_active_until_clean();
548 if ctx.should_track_activeness() {
549 task_ids_to_schedule = get_many!(
551 task,
552 AggregatedDirtyContainer {
553 task
554 } count if count.get(self.session_id) > 0 => {
555 task
556 }
557 );
558 task_ids_to_schedule.push(task_id);
559 }
560 get!(task, Activeness).unwrap()
561 };
562 let listener = root.all_clean_event.listen_with_note(move || {
563 format!("try_read_task_output (strongly consistent) from {reader:?}")
564 });
565 drop(task);
566 if !task_ids_to_schedule.is_empty() {
567 let mut queue = AggregationUpdateQueue::new();
568 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
569 queue.execute(&mut ctx);
570 }
571
572 return Ok(Err(listener));
573 }
574 }
575
576 if let Some(value) = check_in_progress(self, &task, reader, &ctx) {
577 return value;
578 }
579
580 if let Some(output) = get!(task, Output) {
581 let result = match output {
582 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
583 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
584 OutputValue::Error(error) => {
585 let err: anyhow::Error = error.clone().into();
586 Err(err.context(format!(
587 "Execution of {} failed",
588 ctx.get_task_description(task_id)
589 )))
590 }
591 };
592 if self.should_track_dependencies()
593 && let Some(reader) = reader
594 {
595 let _ = task.add(CachedDataItem::OutputDependent {
596 task: reader,
597 value: (),
598 });
599 drop(task);
600
601 let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
602 if reader_task
603 .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
604 .is_none()
605 {
606 let _ = reader_task.add(CachedDataItem::OutputDependency {
607 target: task_id,
608 value: (),
609 });
610 }
611 }
612
613 return result;
614 }
615
616 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
617 let note = move || {
618 if let Some(reader_desc) = reader_desc.as_ref() {
619 format!("try_read_task_output (recompute) from {}", reader_desc())
620 } else {
621 "try_read_task_output (recompute, untracked)".to_string()
622 }
623 };
624
625 let (item, listener) = CachedDataItem::new_scheduled_with_listener(
627 TaskExecutionReason::OutputNotAvailable,
628 self.get_task_desc_fn(task_id),
629 note,
630 );
631 task.add_new(item);
634 turbo_tasks.schedule(task_id);
635
636 Ok(Err(listener))
637 }
638
639 fn try_read_task_cell(
640 &self,
641 task_id: TaskId,
642 reader: Option<TaskId>,
643 cell: CellId,
644 options: ReadCellOptions,
645 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
646 ) -> Result<Result<TypedCellContent, EventListener>> {
647 if let Some(reader) = reader {
648 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
649 }
650
651 fn add_cell_dependency<B: BackingStorage>(
652 backend: &TurboTasksBackendInner<B>,
653 mut task: impl TaskGuard,
654 reader: Option<TaskId>,
655 cell: CellId,
656 task_id: TaskId,
657 ctx: &mut impl ExecuteContext<'_>,
658 ) {
659 if !backend.should_track_dependencies() {
660 return;
661 }
662 if let Some(reader) = reader {
663 if reader == task_id {
664 return;
667 }
668 let _ = task.add(CachedDataItem::CellDependent {
669 cell,
670 task: reader,
671 value: (),
672 });
673 drop(task);
674
675 let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
676 let target = CellRef {
677 task: task_id,
678 cell,
679 };
680 if reader_task
681 .remove(&CachedDataItemKey::OutdatedCellDependency { target })
682 .is_none()
683 {
684 let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
685 }
686 }
687 }
688
689 let mut ctx = self.execute_context(turbo_tasks);
690 let mut task = ctx.task(task_id, TaskDataCategory::Data);
691 let content = if options.final_read_hint {
692 remove!(task, CellData { cell })
693 } else if let Some(content) = get!(task, CellData { cell }) {
694 let content = content.clone();
695 Some(content)
696 } else {
697 None
698 };
699 if let Some(content) = content {
700 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
701 return Ok(Ok(TypedCellContent(
702 cell.type_id,
703 CellContent(Some(content.reference)),
704 )));
705 }
706
707 let in_progress = get!(task, InProgress);
708 if matches!(
709 in_progress,
710 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
711 ) {
712 return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
713 }
714 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
715 let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));
716
717 let max_id = get!(
719 task,
720 CellTypeMaxIndex {
721 cell_type: cell.type_id
722 }
723 )
724 .copied();
725 let Some(max_id) = max_id else {
726 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
727 bail!(
728 "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
729 ctx.get_task_description(task_id)
730 );
731 };
732 if cell.index >= max_id {
733 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
734 bail!(
735 "Cell {cell:?} no longer exists in task {} (index out of bounds)",
736 ctx.get_task_description(task_id)
737 );
738 }
739
740 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
745 if !new_listener {
746 return Ok(Err(listener));
747 }
748
749 let _span = tracing::trace_span!(
750 "recomputation",
751 cell_type = registry::get_value_type_global_name(cell.type_id),
752 cell_index = cell.index
753 )
754 .entered();
755
756 if is_cancelled {
758 bail!("{} was canceled", ctx.get_task_description(task_id));
759 } else if !is_scheduled
760 && task.add(CachedDataItem::new_scheduled(
761 TaskExecutionReason::CellNotAvailable,
762 self.get_task_desc_fn(task_id),
763 ))
764 {
765 turbo_tasks.schedule(task_id);
766 }
767
768 Ok(Err(listener))
769 }
770
771 fn listen_to_cell(
772 &self,
773 task: &mut impl TaskGuard,
774 task_id: TaskId,
775 reader: Option<TaskId>,
776 cell: CellId,
777 ) -> (EventListener, bool) {
778 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
779 let note = move || {
780 if let Some(reader_desc) = reader_desc.as_ref() {
781 format!("try_read_task_cell (in progress) from {}", reader_desc())
782 } else {
783 "try_read_task_cell (in progress, untracked)".to_string()
784 }
785 };
786 if let Some(in_progress) = get!(task, InProgressCell { cell }) {
787 let listener = in_progress.event.listen_with_note(note);
789 return (listener, false);
790 }
791 let in_progress = InProgressCellState::new(task_id, cell);
792 let listener = in_progress.event.listen_with_note(note);
793 task.add_new(CachedDataItem::InProgressCell {
794 cell,
795 value: in_progress,
796 });
797 (listener, true)
798 }
799
800 fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
801 if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
802 return Some(task_type);
803 }
804 if self.should_restore()
805 && !task_id.is_transient()
806 && let Some(task_type) = unsafe {
807 self.backing_storage
808 .reverse_lookup_task_cache(None, task_id)
809 .expect("Failed to lookup task type")
810 }
811 {
812 let _ = self.task_cache.try_insert(task_type.clone(), task_id);
813 return Some(task_type);
814 }
815 None
816 }
817
818 fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
820 let task_type = self.lookup_task_type(task_id);
821 move || {
822 task_type.as_ref().map_or_else(
823 || format!("{task_id:?} transient"),
824 |task_type| format!("{task_id:?} {task_type}"),
825 )
826 }
827 }
828
829 fn snapshot(&self) -> Option<(Instant, bool)> {
830 let start = Instant::now();
831 debug_assert!(self.should_persist());
832 let mut snapshot_request = self.snapshot_request.lock();
833 snapshot_request.snapshot_requested = true;
834 let active_operations = self
835 .in_progress_operations
836 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
837 if active_operations != 0 {
838 self.operations_suspended
839 .wait_while(&mut snapshot_request, |_| {
840 self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
841 });
842 }
843 let suspended_operations = snapshot_request
844 .suspended_operations
845 .iter()
846 .map(|op| op.arc().clone())
847 .collect::<Vec<_>>();
848 drop(snapshot_request);
849 let mut persisted_task_cache_log = self
850 .persisted_task_cache_log
851 .as_ref()
852 .map(|l| l.take(|i| i))
853 .unwrap_or_default();
854 self.storage.start_snapshot();
855 let mut snapshot_request = self.snapshot_request.lock();
856 snapshot_request.snapshot_requested = false;
857 self.in_progress_operations
858 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
859 self.snapshot_completed.notify_all();
860 let snapshot_time = Instant::now();
861 drop(snapshot_request);
862
863 let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
864 if task_id.is_transient() {
865 return (None, None);
866 }
867 let len = inner.len();
868
869 let meta_restored = inner.state().meta_restored();
870 let data_restored = inner.state().data_restored();
871
872 let mut meta = meta_restored.then(|| Vec::with_capacity(len));
873 let mut data = data_restored.then(|| Vec::with_capacity(len));
874 for (key, value) in inner.iter_all() {
875 if key.is_persistent() && value.is_persistent() {
876 match key.category() {
877 TaskDataCategory::Meta => {
878 if let Some(meta) = &mut meta {
879 meta.push(CachedDataItem::from_key_and_value_ref(key, value))
880 }
881 }
882 TaskDataCategory::Data => {
883 if let Some(data) = &mut data {
884 data.push(CachedDataItem::from_key_and_value_ref(key, value))
885 }
886 }
887 _ => {}
888 }
889 }
890 }
891
892 (meta, data)
893 };
894 let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
895 (
896 task_id,
897 meta.map(|d| B::serialize(task_id, &d)),
898 data.map(|d| B::serialize(task_id, &d)),
899 )
900 };
901 let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
902 if task_id.is_transient() {
903 return (task_id, None, None);
904 }
905 let len = inner.len();
906 let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
907 let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
908 for (key, value) in inner.iter_all() {
909 if key.is_persistent() && value.is_persistent() {
910 match key.category() {
911 TaskDataCategory::Meta => {
912 if let Some(meta) = &mut meta {
913 meta.push(CachedDataItem::from_key_and_value_ref(key, value));
914 }
915 }
916 TaskDataCategory::Data => {
917 if let Some(data) = &mut data {
918 data.push(CachedDataItem::from_key_and_value_ref(key, value));
919 }
920 }
921 _ => {}
922 }
923 }
924 }
925 (
926 task_id,
927 meta.map(|meta| B::serialize(task_id, &meta)),
928 data.map(|data| B::serialize(task_id, &data)),
929 )
930 };
931
932 let snapshot = {
933 let _span = tracing::trace_span!("take snapshot");
934 self.storage
935 .take_snapshot(&preprocess, &process, &process_snapshot)
936 };
937
938 #[cfg(feature = "print_cache_item_size")]
939 #[derive(Default)]
940 struct TaskCacheStats {
941 data: usize,
942 data_count: usize,
943 meta: usize,
944 meta_count: usize,
945 }
946 #[cfg(feature = "print_cache_item_size")]
947 impl TaskCacheStats {
948 fn add_data(&mut self, len: usize) {
949 self.data += len;
950 self.data_count += 1;
951 }
952
953 fn add_meta(&mut self, len: usize) {
954 self.meta += len;
955 self.meta_count += 1;
956 }
957 }
958 #[cfg(feature = "print_cache_item_size")]
959 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
960 Mutex::new(FxHashMap::default());
961
962 let task_snapshots = snapshot
963 .into_iter()
964 .filter_map(|iter| {
965 let mut iter = iter
966 .filter_map(
967 |(task_id, meta, data): (
968 _,
969 Option<Result<SmallVec<_>>>,
970 Option<Result<SmallVec<_>>>,
971 )| {
972 let meta = match meta {
973 Some(Ok(meta)) => {
974 #[cfg(feature = "print_cache_item_size")]
975 task_cache_stats
976 .lock()
977 .entry(self.get_task_description(task_id))
978 .or_default()
979 .add_meta(meta.len());
980 Some(meta)
981 }
982 None => None,
983 Some(Err(err)) => {
984 println!(
985 "Serializing task {} failed (meta): {:?}",
986 self.get_task_description(task_id),
987 err
988 );
989 None
990 }
991 };
992 let data = match data {
993 Some(Ok(data)) => {
994 #[cfg(feature = "print_cache_item_size")]
995 task_cache_stats
996 .lock()
997 .entry(self.get_task_description(task_id))
998 .or_default()
999 .add_data(data.len());
1000 Some(data)
1001 }
1002 None => None,
1003 Some(Err(err)) => {
1004 println!(
1005 "Serializing task {} failed (data): {:?}",
1006 self.get_task_description(task_id),
1007 err
1008 );
1009 None
1010 }
1011 };
1012 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1013 },
1014 )
1015 .peekable();
1016 iter.peek().is_some().then_some(iter)
1017 })
1018 .collect::<Vec<_>>();
1019
1020 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1021
1022 let mut new_items = false;
1023
1024 if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
1025 new_items = true;
1026 if let Err(err) = self.backing_storage.save_snapshot(
1027 self.session_id,
1028 suspended_operations,
1029 persisted_task_cache_log,
1030 task_snapshots,
1031 ) {
1032 println!("Persisting failed: {err:?}");
1033 return None;
1034 }
1035 #[cfg(feature = "print_cache_item_size")]
1036 {
1037 let mut task_cache_stats = task_cache_stats
1038 .into_inner()
1039 .into_iter()
1040 .collect::<Vec<_>>();
1041 if !task_cache_stats.is_empty() {
1042 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1043 (stats_b.data + stats_b.meta, key_b)
1044 .cmp(&(stats_a.data + stats_a.meta, key_a))
1045 });
1046 println!("Task cache stats:");
1047 for (task_desc, stats) in task_cache_stats {
1048 use std::ops::Div;
1049
1050 use turbo_tasks::util::FormatBytes;
1051
1052 println!(
1053 " {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1054 FormatBytes(stats.data + stats.meta),
1055 FormatBytes(stats.meta),
1056 stats.meta_count,
1057 FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1058 FormatBytes(stats.data),
1059 stats.data_count,
1060 FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1061 );
1062 }
1063 }
1064 }
1065 }
1066
1067 if new_items {
1068 let elapsed = start.elapsed();
1069 turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1070 "Finished writing to persistent cache".to_string(),
1071 elapsed,
1072 )));
1073 }
1074
1075 Some((snapshot_time, new_items))
1076 }
1077
1078 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1079 if self.should_restore() {
1080 let uncompleted_operations = self
1084 .backing_storage
1085 .uncompleted_operations()
1086 .expect("Failed to get uncompleted operations");
1087 if !uncompleted_operations.is_empty() {
1088 let mut ctx = self.execute_context(turbo_tasks);
1089 for op in uncompleted_operations {
1090 op.execute(&mut ctx);
1091 }
1092 }
1093 }
1094
1095 if self.should_persist() {
1096 turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT);
1098 }
1099 }
1100
1101 fn stopping(&self) {
1102 self.stopping.store(true, Ordering::Release);
1103 self.stopping_event.notify(usize::MAX);
1104 }
1105
1106 #[allow(unused_variables)]
1107 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1108 #[cfg(feature = "verify_aggregation_graph")]
1109 {
1110 self.is_idle.store(false, Ordering::Release);
1111 self.verify_aggregation_graph(turbo_tasks, false);
1112 }
1113 if let Err(err) = self.backing_storage.shutdown() {
1114 println!("Shutting down failed: {err}");
1115 }
1116 }
1117
1118 #[allow(unused_variables)]
1119 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1120 self.idle_start_event.notify(usize::MAX);
1121
1122 #[cfg(feature = "verify_aggregation_graph")]
1123 {
1124 use tokio::select;
1125
1126 self.is_idle.store(true, Ordering::Release);
1127 let this = self.clone();
1128 let turbo_tasks = turbo_tasks.pin();
1129 tokio::task::spawn(async move {
1130 select! {
1131 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1132 }
1134 _ = this.idle_end_event.listen() => {
1135 return;
1136 }
1137 }
1138 if !this.is_idle.load(Ordering::Relaxed) {
1139 return;
1140 }
1141 this.verify_aggregation_graph(&*turbo_tasks, true);
1142 });
1143 }
1144 }
1145
1146 fn idle_end(&self) {
1147 #[cfg(feature = "verify_aggregation_graph")]
1148 self.is_idle.store(false, Ordering::Release);
1149 self.idle_end_event.notify(usize::MAX);
1150 }
1151
1152 fn get_or_create_persistent_task(
1153 &self,
1154 task_type: CachedTaskType,
1155 parent_task: TaskId,
1156 is_immutable: bool,
1157 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1158 ) -> TaskId {
1159 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1160 self.track_cache_hit(&task_type);
1161 self.connect_child(parent_task, task_id, is_immutable, turbo_tasks);
1162 return task_id;
1163 }
1164
1165 self.track_cache_miss(&task_type);
1166 let tx = self
1167 .should_restore()
1168 .then(|| self.backing_storage.start_read_transaction())
1169 .flatten();
1170 let task_id = {
1171 if let Some(task_id) = unsafe {
1173 self.backing_storage
1174 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1175 .expect("Failed to lookup task id")
1176 } {
1177 let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1178 task_id
1179 } else {
1180 let task_type = Arc::new(task_type);
1181 let task_id = self.persisted_task_id_factory.get();
1182 let task_id = if let Err(existing_task_id) =
1183 self.task_cache.try_insert(task_type.clone(), task_id)
1184 {
1185 unsafe {
1187 self.persisted_task_id_factory.reuse(task_id);
1188 }
1189 existing_task_id
1190 } else {
1191 task_id
1192 };
1193 if let Some(log) = &self.persisted_task_cache_log {
1194 log.lock(task_id).push((task_type, task_id));
1195 }
1196 task_id
1197 }
1198 };
1199
1200 unsafe {
1202 self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, is_immutable, turbo_tasks)
1203 };
1204
1205 task_id
1206 }
1207
1208 fn get_or_create_transient_task(
1209 &self,
1210 task_type: CachedTaskType,
1211 parent_task: TaskId,
1212 is_immutable: bool,
1213 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1214 ) -> TaskId {
1215 if !parent_task.is_transient() {
1216 self.panic_persistent_calling_transient(
1217 self.lookup_task_type(parent_task).as_deref(),
1218 Some(&task_type),
1219 None,
1220 );
1221 }
1222 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1223 self.track_cache_hit(&task_type);
1224 self.connect_child(parent_task, task_id, is_immutable, turbo_tasks);
1225 return task_id;
1226 }
1227
1228 self.track_cache_miss(&task_type);
1229 let task_type = Arc::new(task_type);
1230 let task_id = self.transient_task_id_factory.get();
1231 if let Err(existing_task_id) = self.task_cache.try_insert(task_type, task_id) {
1232 unsafe {
1234 self.transient_task_id_factory.reuse(task_id);
1235 }
1236 self.connect_child(parent_task, existing_task_id, is_immutable, turbo_tasks);
1237 return existing_task_id;
1238 }
1239
1240 self.connect_child(parent_task, task_id, is_immutable, turbo_tasks);
1241
1242 task_id
1243 }
1244
1245 fn debug_trace_transient_task(
1248 &self,
1249 task_type: &CachedTaskType,
1250 cell_id: Option<CellId>,
1251 ) -> DebugTraceTransientTask {
1252 fn inner_id(
1255 backend: &TurboTasksBackendInner<impl BackingStorage>,
1256 task_id: TaskId,
1257 cell_type_id: Option<ValueTypeId>,
1258 visited_set: &mut FxHashSet<TaskId>,
1259 ) -> DebugTraceTransientTask {
1260 if let Some(task_type) = backend.lookup_task_type(task_id) {
1261 if visited_set.contains(&task_id) {
1262 let task_name = task_type.get_name();
1263 DebugTraceTransientTask::Collapsed {
1264 task_name,
1265 cell_type_id,
1266 }
1267 } else {
1268 inner_cached(backend, &task_type, cell_type_id, visited_set)
1269 }
1270 } else {
1271 DebugTraceTransientTask::Uncached { cell_type_id }
1272 }
1273 }
1274 fn inner_cached(
1275 backend: &TurboTasksBackendInner<impl BackingStorage>,
1276 task_type: &CachedTaskType,
1277 cell_type_id: Option<ValueTypeId>,
1278 visited_set: &mut FxHashSet<TaskId>,
1279 ) -> DebugTraceTransientTask {
1280 let task_name = task_type.get_name();
1281
1282 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1283 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1284 return None;
1288 };
1289 if task_id.is_transient() {
1290 Some(Box::new(inner_id(
1291 backend,
1292 task_id,
1293 cause_self_raw_vc.try_get_type_id(),
1294 visited_set,
1295 )))
1296 } else {
1297 None
1298 }
1299 });
1300 let cause_args = task_type
1301 .arg
1302 .get_raw_vcs()
1303 .into_iter()
1304 .filter_map(|raw_vc| {
1305 let Some(task_id) = raw_vc.try_get_task_id() else {
1306 return None;
1308 };
1309 if !task_id.is_transient() {
1310 return None;
1311 }
1312 Some((task_id, raw_vc.try_get_type_id()))
1313 })
1314 .collect::<IndexSet<_>>() .into_iter()
1316 .map(|(task_id, cell_type_id)| {
1317 inner_id(backend, task_id, cell_type_id, visited_set)
1318 })
1319 .collect();
1320
1321 DebugTraceTransientTask::Cached {
1322 task_name,
1323 cell_type_id,
1324 cause_self,
1325 cause_args,
1326 }
1327 }
1328 inner_cached(
1329 self,
1330 task_type,
1331 cell_id.map(|c| c.type_id),
1332 &mut FxHashSet::default(),
1333 )
1334 }
1335
1336 fn invalidate_task(
1337 &self,
1338 task_id: TaskId,
1339 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1340 ) {
1341 if !self.should_track_dependencies() {
1342 panic!("Dependency tracking is disabled so invalidation is not allowed");
1343 }
1344 operation::InvalidateOperation::run(
1345 smallvec![task_id],
1346 #[cfg(feature = "trace_task_dirty")]
1347 TaskDirtyCause::Invalidator,
1348 self.execute_context(turbo_tasks),
1349 );
1350 }
1351
1352 fn invalidate_tasks(
1353 &self,
1354 tasks: &[TaskId],
1355 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1356 ) {
1357 if !self.should_track_dependencies() {
1358 panic!("Dependency tracking is disabled so invalidation is not allowed");
1359 }
1360 operation::InvalidateOperation::run(
1361 tasks.iter().copied().collect(),
1362 #[cfg(feature = "trace_task_dirty")]
1363 TaskDirtyCause::Unknown,
1364 self.execute_context(turbo_tasks),
1365 );
1366 }
1367
1368 fn invalidate_tasks_set(
1369 &self,
1370 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1371 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1372 ) {
1373 if !self.should_track_dependencies() {
1374 panic!("Dependency tracking is disabled so invalidation is not allowed");
1375 }
1376 operation::InvalidateOperation::run(
1377 tasks.iter().copied().collect(),
1378 #[cfg(feature = "trace_task_dirty")]
1379 TaskDirtyCause::Unknown,
1380 self.execute_context(turbo_tasks),
1381 );
1382 }
1383
1384 fn invalidate_serialization(
1385 &self,
1386 task_id: TaskId,
1387 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1388 ) {
1389 if task_id.is_transient() {
1390 return;
1391 }
1392 let mut ctx = self.execute_context(turbo_tasks);
1393 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1394 task.invalidate_serialization();
1395 }
1396
1397 fn get_task_description(&self, task_id: TaskId) -> String {
1398 self.lookup_task_type(task_id).map_or_else(
1399 || format!("{task_id:?} transient"),
1400 |task_type| task_type.to_string(),
1401 )
1402 }
1403
1404 fn task_execution_canceled(
1405 &self,
1406 task_id: TaskId,
1407 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1408 ) {
1409 let mut ctx = self.execute_context(turbo_tasks);
1410 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1411 if let Some(in_progress) = remove!(task, InProgress) {
1412 match in_progress {
1413 InProgressState::Scheduled {
1414 done_event,
1415 reason: _,
1416 } => done_event.notify(usize::MAX),
1417 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1418 done_event.notify(usize::MAX)
1419 }
1420 InProgressState::Canceled => {}
1421 }
1422 }
1423 task.add_new(CachedDataItem::InProgress {
1424 value: InProgressState::Canceled,
1425 });
1426 }
1427
1428 fn try_start_task_execution(
1429 &self,
1430 task_id: TaskId,
1431 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1432 ) -> Option<TaskExecutionSpec<'_>> {
1433 enum TaskType {
1434 Cached(Arc<CachedTaskType>),
1435 Transient(Arc<TransientTask>),
1436 }
1437 let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1438 (TaskType::Cached(task_type), false)
1439 } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1440 (
1441 TaskType::Transient(task_type.clone()),
1442 matches!(**task_type, TransientTask::Once(_)),
1443 )
1444 } else {
1445 return None;
1446 };
1447 let execution_reason;
1448 {
1449 let mut ctx = self.execute_context(turbo_tasks);
1450 let mut task = ctx.task(task_id, TaskDataCategory::All);
1451 let in_progress = remove!(task, InProgress)?;
1452 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1453 task.add_new(CachedDataItem::InProgress { value: in_progress });
1454 return None;
1455 };
1456 execution_reason = reason;
1457 task.add_new(CachedDataItem::InProgress {
1458 value: InProgressState::InProgress(Box::new(InProgressStateInner {
1459 stale: false,
1460 once_task,
1461 done_event,
1462 session_dependent: false,
1463 marked_as_completed: false,
1464 done: false,
1465 new_children: Default::default(),
1466 })),
1467 });
1468
1469 if self.should_track_children() {
1470 enum Collectible {
1472 Current(CollectibleRef, i32),
1473 Outdated(CollectibleRef),
1474 }
1475 let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1476 .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1477 .collect::<Vec<_>>();
1478 for collectible in collectibles {
1479 match collectible {
1480 Collectible::Current(collectible, value) => {
1481 let _ = task
1482 .insert(CachedDataItem::OutdatedCollectible { collectible, value });
1483 }
1484 Collectible::Outdated(collectible) => {
1485 if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1486 task.remove(&CachedDataItemKey::OutdatedCollectible {
1487 collectible,
1488 });
1489 }
1490 }
1491 }
1492 }
1493 }
1494
1495 if self.should_track_dependencies() {
1496 enum Dep {
1498 CurrentCell(CellRef),
1499 CurrentOutput(TaskId),
1500 OutdatedCell(CellRef),
1501 OutdatedOutput(TaskId),
1502 }
1503 let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1504 .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1505 .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1506 .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1507 .collect::<Vec<_>>();
1508 for dep in dependencies {
1509 match dep {
1510 Dep::CurrentCell(cell) => {
1511 let _ = task.add(CachedDataItem::OutdatedCellDependency {
1512 target: cell,
1513 value: (),
1514 });
1515 }
1516 Dep::CurrentOutput(output) => {
1517 let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1518 target: output,
1519 value: (),
1520 });
1521 }
1522 Dep::OutdatedCell(cell) => {
1523 if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1524 task.remove(&CachedDataItemKey::OutdatedCellDependency {
1525 target: cell,
1526 });
1527 }
1528 }
1529 Dep::OutdatedOutput(output) => {
1530 if !task
1531 .has_key(&CachedDataItemKey::OutputDependency { target: output })
1532 {
1533 task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1534 target: output,
1535 });
1536 }
1537 }
1538 }
1539 }
1540 }
1541 }
1542
1543 let (span, future) = match task_type {
1544 TaskType::Cached(task_type) => {
1545 let CachedTaskType {
1546 native_fn,
1547 this,
1548 arg,
1549 } = &*task_type;
1550 (
1551 native_fn.span(task_id.persistence(), execution_reason),
1552 native_fn.execute(*this, &**arg),
1553 )
1554 }
1555 TaskType::Transient(task_type) => {
1556 let task_type = task_type.clone();
1557 let span = tracing::trace_span!("turbo_tasks::root_task");
1558 let future = match &*task_type {
1559 TransientTask::Root(f) => f(),
1560 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1561 };
1562 (span, future)
1563 }
1564 };
1565 Some(TaskExecutionSpec { future, span })
1566 }
1567
1568 fn task_execution_result(
1569 &self,
1570 task_id: TaskId,
1571 result: Result<RawVc, TurboTasksExecutionError>,
1572 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1573 ) {
1574 operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
1575 }
1576
1577 fn task_execution_completed(
1578 &self,
1579 task_id: TaskId,
1580 _duration: Duration,
1581 _memory_usage: usize,
1582 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1583 stateful: bool,
1584 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1585 ) -> bool {
1586 let _span = tracing::trace_span!("task execution completed").entered();
1601 let mut ctx = self.execute_context(turbo_tasks);
1602
1603 let mut task = ctx.task(task_id, TaskDataCategory::All);
1606 let Some(in_progress) = get_mut!(task, InProgress) else {
1607 panic!("Task execution completed, but task is not in progress: {task:#?}");
1608 };
1609 let &mut InProgressState::InProgress(box InProgressStateInner {
1610 stale,
1611 ref mut done,
1612 ref done_event,
1613 ref mut new_children,
1614 ..
1615 }) = in_progress
1616 else {
1617 panic!("Task execution completed, but task is not in progress: {task:#?}");
1618 };
1619
1620 if stale {
1622 let Some(InProgressState::InProgress(box InProgressStateInner {
1623 done_event,
1624 mut new_children,
1625 ..
1626 })) = remove!(task, InProgress)
1627 else {
1628 unreachable!();
1629 };
1630 task.add_new(CachedDataItem::InProgress {
1631 value: InProgressState::Scheduled {
1632 done_event,
1633 reason: TaskExecutionReason::Stale,
1634 },
1635 });
1636 for task in iter_many!(task, Child { task } => task) {
1639 new_children.remove(&task);
1640 }
1641 drop(task);
1642
1643 AggregationUpdateQueue::run(
1646 AggregationUpdateJob::DecreaseActiveCounts {
1647 task_ids: new_children.into_keys().collect(),
1648 },
1649 &mut ctx,
1650 );
1651 return true;
1652 }
1653
1654 *done = true;
1656 done_event.notify(usize::MAX);
1657
1658 let mut new_children = take(new_children);
1660
1661 if stateful {
1663 let _ = task.add(CachedDataItem::Stateful { value: () });
1664 }
1665
1666 let old_counters: FxHashMap<_, _> =
1668 get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1669 let mut counters_to_remove = old_counters.clone();
1670 for (&cell_type, &max_index) in cell_counters.iter() {
1671 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1672 if old_max_index != max_index {
1673 task.insert(CachedDataItem::CellTypeMaxIndex {
1674 cell_type,
1675 value: max_index,
1676 });
1677 }
1678 } else {
1679 task.add_new(CachedDataItem::CellTypeMaxIndex {
1680 cell_type,
1681 value: max_index,
1682 });
1683 }
1684 }
1685 for (cell_type, _) in counters_to_remove {
1686 task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1687 }
1688
1689 let mut queue = AggregationUpdateQueue::new();
1690
1691 let mut removed_data = Vec::new();
1692 let mut old_edges = Vec::new();
1693
1694 let has_children = !new_children.is_empty();
1695 let has_mutable_children =
1696 has_children && new_children.values().any(|is_immutable| !*is_immutable);
1697
1698 if !stateful && !has_mutable_children {
1701 task.mark_as_immutable();
1702 }
1703
1704 if has_children {
1706 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1707 }
1708
1709 if has_children {
1711 old_edges.extend(
1712 iter_many!(task, Child { task } => task)
1713 .filter(|task| new_children.remove(task).is_none())
1714 .map(OutdatedEdge::Child),
1715 );
1716 } else {
1717 old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
1718 }
1719
1720 if task_id.is_transient() || iter_many!(task, CellData { cell }
1725 if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
1726 ).count() > 0 {
1727 removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
1728 matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
1729 .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
1730 }));
1731 }
1732 if self.should_track_children() {
1733 old_edges.extend(
1734 task.iter(CachedDataItemType::OutdatedCollectible)
1735 .filter_map(|(key, value)| match (key, value) {
1736 (
1737 CachedDataItemKey::OutdatedCollectible { collectible },
1738 CachedDataItemValueRef::OutdatedCollectible { value },
1739 ) => Some(OutdatedEdge::Collectible(collectible, *value)),
1740 _ => None,
1741 }),
1742 );
1743 }
1744 if !task.is_immutable() && self.should_track_dependencies() {
1745 old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
1746 old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
1747 old_edges.extend(
1748 iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
1749 |(cell, task)| {
1750 if cell_counters
1751 .get(&cell.type_id)
1752 .is_none_or(|start_index| cell.index >= *start_index)
1753 && let Some(old_counter) = old_counters.get(&cell.type_id)
1754 && cell.index < *old_counter
1755 {
1756 return Some(OutdatedEdge::RemovedCellDependent {
1757 task_id: task,
1758 #[cfg(feature = "trace_task_dirty")]
1759 value_type_id: cell.type_id,
1760 });
1761 }
1762 None
1763 },
1764 ),
1765 );
1766 }
1767
1768 drop(task);
1769
1770 if !queue.is_empty() || !old_edges.is_empty() {
1771 #[cfg(feature = "trace_task_completion")]
1772 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
1773 CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx);
1777 }
1778
1779 let mut task = ctx.task(task_id, TaskDataCategory::All);
1786 let Some(in_progress) = get!(task, InProgress) else {
1787 panic!("Task execution completed, but task is not in progress: {task:#?}");
1788 };
1789 let InProgressState::InProgress(box InProgressStateInner { stale, .. }) = in_progress
1790 else {
1791 panic!("Task execution completed, but task is not in progress: {task:#?}");
1792 };
1793
1794 if *stale {
1796 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
1797 remove!(task, InProgress)
1798 else {
1799 unreachable!();
1800 };
1801 task.add_new(CachedDataItem::InProgress {
1802 value: InProgressState::Scheduled {
1803 done_event,
1804 reason: TaskExecutionReason::Stale,
1805 },
1806 });
1807 drop(task);
1808
1809 AggregationUpdateQueue::run(
1812 AggregationUpdateJob::DecreaseActiveCounts {
1813 task_ids: new_children.into_keys().collect(),
1814 },
1815 &mut ctx,
1816 );
1817 return true;
1818 }
1819
1820 let mut queue = AggregationUpdateQueue::new();
1821
1822 if has_children {
1823 let is_immutable = task.is_immutable();
1824 let has_active_count = !is_immutable
1825 && ctx.should_track_activeness()
1826 && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
1827 connect_children(
1828 task_id,
1829 &mut task,
1830 new_children,
1831 &mut queue,
1832 has_active_count,
1833 !is_immutable && ctx.should_track_activeness(),
1834 );
1835 }
1836
1837 drop(task);
1838
1839 if has_children {
1840 #[cfg(feature = "trace_task_completion")]
1841 let _span = tracing::trace_span!("connect new children").entered();
1842 queue.execute(&mut ctx);
1843 }
1844
1845 let mut task = ctx.task(task_id, TaskDataCategory::All);
1846 let Some(in_progress) = remove!(task, InProgress) else {
1847 panic!("Task execution completed, but task is not in progress: {task:#?}");
1848 };
1849 let InProgressState::InProgress(box InProgressStateInner {
1850 done_event,
1851 once_task: _,
1852 stale,
1853 session_dependent,
1854 done: _,
1855 marked_as_completed: _,
1856 new_children,
1857 }) = in_progress
1858 else {
1859 panic!("Task execution completed, but task is not in progress: {task:#?}");
1860 };
1861 debug_assert!(new_children.is_empty());
1862
1863 if stale {
1865 task.add_new(CachedDataItem::InProgress {
1866 value: InProgressState::Scheduled {
1867 done_event,
1868 reason: TaskExecutionReason::Stale,
1869 },
1870 });
1871 return true;
1872 }
1873
1874 removed_data.extend(task.extract_if(
1876 CachedDataItemType::InProgressCell,
1877 |key, value| match (key, value) {
1878 (
1879 CachedDataItemKey::InProgressCell { .. },
1880 CachedDataItemValueRef::InProgressCell { value },
1881 ) => {
1882 value.event.notify(usize::MAX);
1883 true
1884 }
1885 _ => false,
1886 },
1887 ));
1888
1889 let old_dirty_state = get!(task, Dirty).copied();
1891
1892 let new_dirty_state = if session_dependent {
1893 Some(DirtyState {
1894 clean_in_session: Some(self.session_id),
1895 })
1896 } else {
1897 None
1898 };
1899
1900 let data_update = if old_dirty_state != new_dirty_state {
1901 if let Some(new_dirty_state) = new_dirty_state {
1902 task.insert(CachedDataItem::Dirty {
1903 value: new_dirty_state,
1904 });
1905 } else {
1906 task.remove(&CachedDataItemKey::Dirty {});
1907 }
1908
1909 if self.should_track_children()
1910 && (old_dirty_state.is_some() || new_dirty_state.is_some())
1911 {
1912 let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
1913 .cloned()
1914 .unwrap_or_default();
1915 if let Some(old_dirty_state) = old_dirty_state {
1916 dirty_containers.update_with_dirty_state(&old_dirty_state);
1917 }
1918 let aggregated_update = match (old_dirty_state, new_dirty_state) {
1919 (None, None) => unreachable!(),
1920 (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
1921 (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
1922 (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
1923 };
1924 if !aggregated_update.is_zero() {
1925 if aggregated_update.get(self.session_id) < 0
1926 && let Some(root_state) = get_mut!(task, Activeness)
1927 {
1928 root_state.all_clean_event.notify(usize::MAX);
1929 root_state.unset_active_until_clean();
1930 if root_state.is_empty() {
1931 task.remove(&CachedDataItemKey::Activeness {});
1932 }
1933 }
1934 AggregationUpdateJob::data_update(
1935 &mut task,
1936 AggregatedDataUpdate::new()
1937 .dirty_container_update(task_id, aggregated_update),
1938 )
1939 } else {
1940 None
1941 }
1942 } else {
1943 None
1944 }
1945 } else {
1946 None
1947 };
1948
1949 drop(task);
1950
1951 if let Some(data_update) = data_update {
1952 AggregationUpdateQueue::run(data_update, &mut ctx);
1953 }
1954
1955 drop(removed_data);
1956
1957 let mut task = ctx.task(task_id, TaskDataCategory::All);
1960 task.shrink_to_fit(CachedDataItemType::CellData);
1961 task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
1962 task.shrink_to_fit(CachedDataItemType::CellDependency);
1963 task.shrink_to_fit(CachedDataItemType::OutputDependency);
1964 task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
1965 drop(task);
1966
1967 false
1968 }
1969
1970 fn run_backend_job<'a>(
1971 self: &'a Arc<Self>,
1972 id: BackendJobId,
1973 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1974 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
1975 Box::pin(async move {
1976 if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
1977 debug_assert!(self.should_persist());
1978
1979 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
1980 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
1981 loop {
1982 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(60);
1983 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(30);
1984 const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
1985
1986 let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
1987 FIRST_SNAPSHOT_WAIT
1988 } else {
1989 SNAPSHOT_INTERVAL
1990 };
1991
1992 let until = last_snapshot + time;
1993 if until > Instant::now() {
1994 let mut stop_listener = self.stopping_event.listen();
1995 if !self.stopping.load(Ordering::Acquire) {
1996 let mut idle_start_listener = self.idle_start_event.listen();
1997 let mut idle_end_listener = self.idle_end_event.listen();
1998 let mut idle_time = if turbo_tasks.is_idle() {
1999 Instant::now() + IDLE_TIMEOUT
2000 } else {
2001 far_future()
2002 };
2003 loop {
2004 tokio::select! {
2005 _ = &mut stop_listener => {
2006 break;
2007 },
2008 _ = &mut idle_start_listener => {
2009 idle_time = Instant::now() + IDLE_TIMEOUT;
2010 idle_start_listener = self.idle_start_event.listen()
2011 },
2012 _ = &mut idle_end_listener => {
2013 idle_time = until + IDLE_TIMEOUT;
2014 idle_end_listener = self.idle_end_event.listen()
2015 },
2016 _ = tokio::time::sleep_until(until) => {
2017 break;
2018 },
2019 _ = tokio::time::sleep_until(idle_time) => {
2020 if turbo_tasks.is_idle() {
2021 break;
2022 }
2023 },
2024 }
2025 }
2026 }
2027 }
2028
2029 let this = self.clone();
2030 let snapshot = turbo_tasks::spawn_blocking(move || this.snapshot()).await;
2031 if let Some((snapshot_start, new_data)) = snapshot {
2032 last_snapshot = snapshot_start;
2033 if new_data {
2034 continue;
2035 }
2036 let last_snapshot = last_snapshot.duration_since(self.start_time);
2037 self.last_snapshot.store(
2038 last_snapshot.as_millis().try_into().unwrap(),
2039 Ordering::Relaxed,
2040 );
2041
2042 turbo_tasks.schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT);
2043 return;
2044 }
2045 }
2046 }
2047 })
2048 }
2049
2050 fn try_read_own_task_cell_untracked(
2051 &self,
2052 task_id: TaskId,
2053 cell: CellId,
2054 _options: ReadCellOptions,
2055 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2056 ) -> Result<TypedCellContent> {
2057 let mut ctx = self.execute_context(turbo_tasks);
2058 let task = ctx.task(task_id, TaskDataCategory::Data);
2059 if let Some(content) = get!(task, CellData { cell }) {
2060 Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
2061 } else {
2062 Ok(CellContent(None).into_typed(cell.type_id))
2063 }
2064 }
2065
2066 fn read_task_collectibles(
2067 &self,
2068 task_id: TaskId,
2069 collectible_type: TraitTypeId,
2070 reader_id: TaskId,
2071 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2072 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2073 if !self.should_track_children() {
2074 return AutoMap::default();
2075 }
2076
2077 let mut ctx = self.execute_context(turbo_tasks);
2078 let mut collectibles = AutoMap::default();
2079 {
2080 let mut task = ctx.task(task_id, TaskDataCategory::All);
2081 loop {
2083 let aggregation_number = get_aggregation_number(&task);
2084 if is_root_node(aggregation_number) {
2085 break;
2086 }
2087 drop(task);
2088 AggregationUpdateQueue::run(
2089 AggregationUpdateJob::UpdateAggregationNumber {
2090 task_id,
2091 base_aggregation_number: u32::MAX,
2092 distance: None,
2093 },
2094 &mut ctx,
2095 );
2096 task = ctx.task(task_id, TaskDataCategory::All);
2097 }
2098 for collectible in iter_many!(
2099 task,
2100 AggregatedCollectible {
2101 collectible
2102 } count if collectible.collectible_type == collectible_type && *count > 0 => {
2103 collectible.cell
2104 }
2105 ) {
2106 *collectibles
2107 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2108 .or_insert(0) += 1;
2109 }
2110 for (collectible, count) in iter_many!(
2111 task,
2112 Collectible {
2113 collectible
2114 } count if collectible.collectible_type == collectible_type => {
2115 (collectible.cell, *count)
2116 }
2117 ) {
2118 *collectibles
2119 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2120 .or_insert(0) += count;
2121 }
2122 let _ = task.add(CachedDataItem::CollectiblesDependent {
2123 collectible_type,
2124 task: reader_id,
2125 value: (),
2126 });
2127 }
2128 {
2129 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2130 let target = CollectiblesRef {
2131 task: task_id,
2132 collectible_type,
2133 };
2134 if reader.add(CachedDataItem::CollectiblesDependency { target, value: () }) {
2135 reader.remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target });
2136 }
2137 }
2138 collectibles
2139 }
2140
2141 fn emit_collectible(
2142 &self,
2143 collectible_type: TraitTypeId,
2144 collectible: RawVc,
2145 task_id: TaskId,
2146 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2147 ) {
2148 self.assert_valid_collectible(task_id, collectible);
2149 if !self.should_track_children() {
2150 return;
2151 }
2152
2153 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2154 panic!("Collectibles need to be resolved");
2155 };
2156 let cell = CellRef {
2157 task: collectible_task,
2158 cell,
2159 };
2160 operation::UpdateCollectibleOperation::run(
2161 task_id,
2162 CollectibleRef {
2163 collectible_type,
2164 cell,
2165 },
2166 1,
2167 self.execute_context(turbo_tasks),
2168 );
2169 }
2170
2171 fn unemit_collectible(
2172 &self,
2173 collectible_type: TraitTypeId,
2174 collectible: RawVc,
2175 count: u32,
2176 task_id: TaskId,
2177 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2178 ) {
2179 self.assert_valid_collectible(task_id, collectible);
2180 if !self.should_track_children() {
2181 return;
2182 }
2183
2184 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2185 panic!("Collectibles need to be resolved");
2186 };
2187 let cell = CellRef {
2188 task: collectible_task,
2189 cell,
2190 };
2191 operation::UpdateCollectibleOperation::run(
2192 task_id,
2193 CollectibleRef {
2194 collectible_type,
2195 cell,
2196 },
2197 -(i32::try_from(count).unwrap()),
2198 self.execute_context(turbo_tasks),
2199 );
2200 }
2201
2202 fn update_task_cell(
2203 &self,
2204 task_id: TaskId,
2205 cell: CellId,
2206 content: CellContent,
2207 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2208 ) {
2209 operation::UpdateCellOperation::run(
2210 task_id,
2211 cell,
2212 content,
2213 self.execute_context(turbo_tasks),
2214 );
2215 }
2216
2217 fn mark_own_task_as_session_dependent(
2218 &self,
2219 task_id: TaskId,
2220 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2221 ) {
2222 if !self.should_track_dependencies() {
2223 return;
2225 }
2226 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2227 let mut ctx = self.execute_context(turbo_tasks);
2228 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2229 let aggregation_number = get_aggregation_number(&task);
2230 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2231 drop(task);
2232 AggregationUpdateQueue::run(
2235 AggregationUpdateJob::UpdateAggregationNumber {
2236 task_id,
2237 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2238 distance: None,
2239 },
2240 &mut ctx,
2241 );
2242 task = ctx.task(task_id, TaskDataCategory::Meta);
2243 }
2244 if let Some(InProgressState::InProgress(box InProgressStateInner {
2245 session_dependent,
2246 ..
2247 })) = get_mut!(task, InProgress)
2248 {
2249 *session_dependent = true;
2250 }
2251 }
2252
2253 fn mark_own_task_as_finished(
2254 &self,
2255 task: TaskId,
2256 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2257 ) {
2258 let mut ctx = self.execute_context(turbo_tasks);
2259 let mut task = ctx.task(task, TaskDataCategory::Data);
2260 if let Some(InProgressState::InProgress(box InProgressStateInner {
2261 marked_as_completed,
2262 ..
2263 })) = get_mut!(task, InProgress)
2264 {
2265 *marked_as_completed = true;
2266 }
2271 }
2272
2273 fn set_own_task_aggregation_number(
2274 &self,
2275 task: TaskId,
2276 aggregation_number: u32,
2277 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2278 ) {
2279 let mut ctx = self.execute_context(turbo_tasks);
2280 AggregationUpdateQueue::run(
2281 AggregationUpdateJob::UpdateAggregationNumber {
2282 task_id: task,
2283 base_aggregation_number: aggregation_number,
2284 distance: None,
2285 },
2286 &mut ctx,
2287 );
2288 }
2289
2290 fn connect_task(
2291 &self,
2292 task: TaskId,
2293 parent_task: TaskId,
2294 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2295 ) {
2296 self.assert_not_persistent_calling_transient(parent_task, task, None);
2297 ConnectChildOperation::run(parent_task, task, false, self.execute_context(turbo_tasks));
2298 }
2299
2300 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2301 let task_id = self.transient_task_id_factory.get();
2302 let root_type = match task_type {
2303 TransientTaskType::Root(_) => RootType::RootTask,
2304 TransientTaskType::Once(_) => RootType::OnceTask,
2305 };
2306 self.transient_tasks.insert(
2307 task_id,
2308 Arc::new(match task_type {
2309 TransientTaskType::Root(f) => TransientTask::Root(f),
2310 TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2311 }),
2312 );
2313 {
2314 let mut task = self.storage.access_mut(task_id);
2315 task.add(CachedDataItem::AggregationNumber {
2316 value: AggregationNumber {
2317 base: u32::MAX,
2318 distance: 0,
2319 effective: u32::MAX,
2320 },
2321 });
2322 if !task.state().is_immutable() && self.should_track_activeness() {
2323 task.add(CachedDataItem::Activeness {
2324 value: ActivenessState::new_root(root_type, task_id),
2325 });
2326 }
2327 task.add(CachedDataItem::new_scheduled(
2328 TaskExecutionReason::Initial,
2329 move || match root_type {
2330 RootType::RootTask => "Root Task".to_string(),
2331 RootType::OnceTask => "Once Task".to_string(),
2332 },
2333 ));
2334 }
2335 #[cfg(feature = "verify_aggregation_graph")]
2336 self.root_tasks.lock().insert(task_id);
2337 task_id
2338 }
2339
2340 fn dispose_root_task(
2341 &self,
2342 task_id: TaskId,
2343 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2344 ) {
2345 #[cfg(feature = "verify_aggregation_graph")]
2346 self.root_tasks.lock().remove(&task_id);
2347
2348 let mut ctx = self.execute_context(turbo_tasks);
2349 let mut task = ctx.task(task_id, TaskDataCategory::All);
2350 let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2351 let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2352 .map_or(false, |dirty_containers| {
2353 dirty_containers.get(self.session_id) > 0
2354 });
2355 if is_dirty || has_dirty_containers {
2356 if let Some(root_state) = get_mut!(task, Activeness) {
2357 root_state.unset_root_type();
2359 root_state.set_active_until_clean();
2360 };
2361 } else if let Some(root_state) = remove!(task, Activeness) {
2362 root_state.all_clean_event.notify(usize::MAX);
2365 }
2366 }
2367
2368 #[cfg(feature = "verify_aggregation_graph")]
2369 fn verify_aggregation_graph(
2370 &self,
2371 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2372 idle: bool,
2373 ) {
2374 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2375 return;
2376 }
2377 use std::{collections::VecDeque, env, io::stdout};
2378
2379 use crate::backend::operation::{get_uppers, is_aggregating_node};
2380
2381 let mut ctx = self.execute_context(turbo_tasks);
2382 let root_tasks = self.root_tasks.lock().clone();
2383 let len = root_tasks.len();
2384
2385 for (i, task_id) in root_tasks.into_iter().enumerate() {
2386 println!("Verifying graph from root {task_id} {i}/{len}...");
2387 let mut queue = VecDeque::new();
2388 let mut visited = FxHashSet::default();
2389 let mut aggregated_nodes = FxHashSet::default();
2390 let mut collectibles = FxHashMap::default();
2391 let root_task_id = task_id;
2392 visited.insert(task_id);
2393 aggregated_nodes.insert(task_id);
2394 queue.push_back(task_id);
2395 let mut counter = 0;
2396 while let Some(task_id) = queue.pop_front() {
2397 counter += 1;
2398 if counter % 100000 == 0 {
2399 println!(
2400 "queue={}, visited={}, aggregated_nodes={}",
2401 queue.len(),
2402 visited.len(),
2403 aggregated_nodes.len()
2404 );
2405 }
2406 let task = ctx.task(task_id, TaskDataCategory::All);
2407 if idle && !self.is_idle.load(Ordering::Relaxed) {
2408 return;
2409 }
2410
2411 let uppers = get_uppers(&task);
2412 if task_id != root_task_id
2413 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2414 {
2415 println!(
2416 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2417 {:?})",
2418 task_id,
2419 ctx.get_task_description(task_id),
2420 uppers
2421 );
2422 }
2423
2424 let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2425 for collectible in aggregated_collectibles {
2426 collectibles
2427 .entry(collectible)
2428 .or_insert_with(|| (false, Vec::new()))
2429 .1
2430 .push(task_id);
2431 }
2432
2433 let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2434 for collectible in own_collectibles {
2435 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2436 *flag = true
2437 } else {
2438 println!(
2439 "Task {} has a collectible {:?} that is not in any upper task",
2440 task_id, collectible
2441 );
2442 }
2443 }
2444
2445 let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2446 let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2447 .is_some_and(|count| count.get(self.session_id) > 0);
2448 let should_be_in_upper = is_dirty || has_dirty_container;
2449
2450 let aggregation_number = get_aggregation_number(&task);
2451 if is_aggregating_node(aggregation_number) {
2452 aggregated_nodes.insert(task_id);
2453 }
2454 for child_id in iter_many!(task, Child { task } => task) {
2461 if visited.insert(child_id) {
2463 queue.push_back(child_id);
2464 }
2465 }
2466 drop(task);
2467
2468 if should_be_in_upper {
2469 for upper_id in uppers {
2470 let task = ctx.task(task_id, TaskDataCategory::All);
2471 let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2472 .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2473 if !in_upper {
2474 println!(
2475 "Task {} is dirty, but is not listed in the upper task {}",
2476 task_id, upper_id
2477 );
2478 }
2479 }
2480 }
2481 }
2482
2483 for (collectible, (flag, task_ids)) in collectibles {
2484 if !flag {
2485 use std::io::Write;
2486 let mut stdout = stdout().lock();
2487 writeln!(
2488 stdout,
2489 "{:?} that is not emitted in any child task but in these aggregated \
2490 tasks: {:#?}",
2491 collectible,
2492 task_ids
2493 .iter()
2494 .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
2495 .collect::<Vec<_>>()
2496 );
2497
2498 let task_id = collectible.cell.task;
2499 let mut queue = {
2500 let task = ctx.task(task_id, TaskDataCategory::All);
2501 get_uppers(&task)
2502 };
2503 let mut visited = FxHashSet::default();
2504 for &upper_id in queue.iter() {
2505 visited.insert(upper_id);
2506 writeln!(stdout, "{task_id:?} -> {upper_id:?}");
2507 }
2508 while let Some(task_id) = queue.pop() {
2509 let desc = ctx.get_task_description(task_id);
2510 let task = ctx.task(task_id, TaskDataCategory::All);
2511 let aggregated_collectible =
2512 get!(task, AggregatedCollectible { collectible })
2513 .copied()
2514 .unwrap_or_default();
2515 let uppers = get_uppers(&task);
2516 drop(task);
2517 writeln!(
2518 stdout,
2519 "upper {task_id} {desc} collectible={aggregated_collectible}"
2520 );
2521 if task_ids.contains(&task_id) {
2522 writeln!(
2523 stdout,
2524 "Task has an upper connection to an aggregated task that doesn't \
2525 reference it. Upper connection is invalid!"
2526 );
2527 }
2528 for upper_id in uppers {
2529 writeln!(stdout, "{task_id:?} -> {upper_id:?}");
2530 if !visited.contains(&upper_id) {
2531 queue.push(upper_id);
2532 }
2533 }
2534 }
2535 }
2536 }
2537 println!("visited {task_id} {} tasks", visited.len());
2538 }
2539 }
2540
2541 fn assert_not_persistent_calling_transient(
2542 &self,
2543 parent_id: TaskId,
2544 child_id: TaskId,
2545 cell_id: Option<CellId>,
2546 ) {
2547 if !parent_id.is_transient() && child_id.is_transient() {
2548 self.panic_persistent_calling_transient(
2549 self.lookup_task_type(parent_id).as_deref(),
2550 self.lookup_task_type(child_id).as_deref(),
2551 cell_id,
2552 );
2553 }
2554 }
2555
2556 fn panic_persistent_calling_transient(
2557 &self,
2558 parent: Option<&CachedTaskType>,
2559 child: Option<&CachedTaskType>,
2560 cell_id: Option<CellId>,
2561 ) {
2562 let transient_reason = if let Some(child) = child {
2563 Cow::Owned(format!(
2564 " The callee is transient because it depends on:\n{}",
2565 self.debug_trace_transient_task(child, cell_id),
2566 ))
2567 } else {
2568 Cow::Borrowed("")
2569 };
2570 panic!(
2571 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
2572 parent.map_or("unknown", |t| t.get_name()),
2573 child.map_or("unknown", |t| t.get_name()),
2574 transient_reason,
2575 );
2576 }
2577
2578 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
2579 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
2581 let task_info = if let Some(col_task_ty) = collectible
2583 .try_get_task_id()
2584 .and_then(|t| self.lookup_task_type(t))
2585 {
2586 Cow::Owned(format!(" (return type of {col_task_ty})"))
2587 } else {
2588 Cow::Borrowed("")
2589 };
2590 panic!("Collectible{task_info} must be a ResolvedVc")
2591 };
2592 if col_task_id.is_transient() && !task_id.is_transient() {
2593 let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
2594 Cow::Owned(format!(
2595 ". The collectible is transient because it depends on:\n{}",
2596 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
2597 ))
2598 } else {
2599 Cow::Borrowed("")
2600 };
2601 panic!(
2603 "Collectible is transient, transient collectibles cannot be emitted from \
2604 persistent tasks{transient_reason}",
2605 )
2606 }
2607 }
2608}
2609
2610impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
2611 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2612 self.0.startup(turbo_tasks);
2613 }
2614
2615 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2616 self.0.stopping();
2617 }
2618
2619 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2620 self.0.stop(turbo_tasks);
2621 }
2622
2623 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2624 self.0.idle_start(turbo_tasks);
2625 }
2626
2627 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2628 self.0.idle_end();
2629 }
2630
2631 fn get_or_create_persistent_task(
2632 &self,
2633 task_type: CachedTaskType,
2634 parent_task: TaskId,
2635 is_immutable: bool,
2636 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2637 ) -> TaskId {
2638 self.0
2639 .get_or_create_persistent_task(task_type, parent_task, is_immutable, turbo_tasks)
2640 }
2641
2642 fn get_or_create_transient_task(
2643 &self,
2644 task_type: CachedTaskType,
2645 parent_task: TaskId,
2646 is_immutable: bool,
2647 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2648 ) -> TaskId {
2649 self.0
2650 .get_or_create_transient_task(task_type, parent_task, is_immutable, turbo_tasks)
2651 }
2652
2653 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2654 self.0.invalidate_task(task_id, turbo_tasks);
2655 }
2656
2657 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2658 self.0.invalidate_tasks(tasks, turbo_tasks);
2659 }
2660
2661 fn invalidate_tasks_set(
2662 &self,
2663 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
2664 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2665 ) {
2666 self.0.invalidate_tasks_set(tasks, turbo_tasks);
2667 }
2668
2669 fn invalidate_serialization(
2670 &self,
2671 task_id: TaskId,
2672 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2673 ) {
2674 self.0.invalidate_serialization(task_id, turbo_tasks);
2675 }
2676
2677 fn get_task_description(&self, task: TaskId) -> String {
2678 self.0.get_task_description(task)
2679 }
2680
2681 type TaskState = ();
2682 fn new_task_state(&self, _task: TaskId) -> Self::TaskState {}
2683
2684 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2685 self.0.task_execution_canceled(task, turbo_tasks)
2686 }
2687
2688 fn try_start_task_execution(
2689 &self,
2690 task_id: TaskId,
2691 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2692 ) -> Option<TaskExecutionSpec<'_>> {
2693 self.0.try_start_task_execution(task_id, turbo_tasks)
2694 }
2695
2696 fn task_execution_result(
2697 &self,
2698 task_id: TaskId,
2699 result: Result<RawVc, TurboTasksExecutionError>,
2700 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2701 ) {
2702 self.0.task_execution_result(task_id, result, turbo_tasks);
2703 }
2704
2705 fn task_execution_completed(
2706 &self,
2707 task_id: TaskId,
2708 _duration: Duration,
2709 _memory_usage: usize,
2710 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2711 stateful: bool,
2712 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2713 ) -> bool {
2714 self.0.task_execution_completed(
2715 task_id,
2716 _duration,
2717 _memory_usage,
2718 cell_counters,
2719 stateful,
2720 turbo_tasks,
2721 )
2722 }
2723
2724 fn run_backend_job<'a>(
2725 &'a self,
2726 id: BackendJobId,
2727 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
2728 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2729 self.0.run_backend_job(id, turbo_tasks)
2730 }
2731
2732 fn try_read_task_output(
2733 &self,
2734 task_id: TaskId,
2735 reader: TaskId,
2736 consistency: ReadConsistency,
2737 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2738 ) -> Result<Result<RawVc, EventListener>> {
2739 self.0
2740 .try_read_task_output(task_id, Some(reader), consistency, turbo_tasks)
2741 }
2742
2743 fn try_read_task_output_untracked(
2744 &self,
2745 task_id: TaskId,
2746 consistency: ReadConsistency,
2747 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2748 ) -> Result<Result<RawVc, EventListener>> {
2749 self.0
2750 .try_read_task_output(task_id, None, consistency, turbo_tasks)
2751 }
2752
2753 fn try_read_task_cell(
2754 &self,
2755 task_id: TaskId,
2756 cell: CellId,
2757 reader: TaskId,
2758 options: ReadCellOptions,
2759 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2760 ) -> Result<Result<TypedCellContent, EventListener>> {
2761 self.0
2762 .try_read_task_cell(task_id, Some(reader), cell, options, turbo_tasks)
2763 }
2764
2765 fn try_read_task_cell_untracked(
2766 &self,
2767 task_id: TaskId,
2768 cell: CellId,
2769 options: ReadCellOptions,
2770 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2771 ) -> Result<Result<TypedCellContent, EventListener>> {
2772 self.0
2773 .try_read_task_cell(task_id, None, cell, options, turbo_tasks)
2774 }
2775
2776 fn try_read_own_task_cell_untracked(
2777 &self,
2778 task_id: TaskId,
2779 cell: CellId,
2780 options: ReadCellOptions,
2781 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2782 ) -> Result<TypedCellContent> {
2783 self.0
2784 .try_read_own_task_cell_untracked(task_id, cell, options, turbo_tasks)
2785 }
2786
2787 fn read_task_collectibles(
2788 &self,
2789 task_id: TaskId,
2790 collectible_type: TraitTypeId,
2791 reader: TaskId,
2792 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2793 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2794 self.0
2795 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
2796 }
2797
2798 fn emit_collectible(
2799 &self,
2800 collectible_type: TraitTypeId,
2801 collectible: RawVc,
2802 task_id: TaskId,
2803 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2804 ) {
2805 self.0
2806 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
2807 }
2808
2809 fn unemit_collectible(
2810 &self,
2811 collectible_type: TraitTypeId,
2812 collectible: RawVc,
2813 count: u32,
2814 task_id: TaskId,
2815 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2816 ) {
2817 self.0
2818 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
2819 }
2820
2821 fn update_task_cell(
2822 &self,
2823 task_id: TaskId,
2824 cell: CellId,
2825 content: CellContent,
2826 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2827 ) {
2828 self.0.update_task_cell(task_id, cell, content, turbo_tasks);
2829 }
2830
2831 fn mark_own_task_as_finished(
2832 &self,
2833 task_id: TaskId,
2834 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2835 ) {
2836 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
2837 }
2838
2839 fn set_own_task_aggregation_number(
2840 &self,
2841 task: TaskId,
2842 aggregation_number: u32,
2843 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2844 ) {
2845 self.0
2846 .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
2847 }
2848
2849 fn mark_own_task_as_session_dependent(
2850 &self,
2851 task: TaskId,
2852 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2853 ) {
2854 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
2855 }
2856
2857 fn connect_task(
2858 &self,
2859 task: TaskId,
2860 parent_task: TaskId,
2861 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2862 ) {
2863 self.0.connect_task(task, parent_task, turbo_tasks);
2864 }
2865
2866 fn create_transient_task(
2867 &self,
2868 task_type: TransientTaskType,
2869 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2870 ) -> TaskId {
2871 self.0.create_transient_task(task_type)
2872 }
2873
2874 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2875 self.0.dispose_root_task(task_id, turbo_tasks);
2876 }
2877
2878 fn task_statistics(&self) -> &TaskStatisticsApi {
2879 &self.0.task_statistics
2880 }
2881}
2882
2883enum DebugTraceTransientTask {
2884 Cached {
2885 task_name: &'static str,
2886 cell_type_id: Option<ValueTypeId>,
2887 cause_self: Option<Box<DebugTraceTransientTask>>,
2888 cause_args: Vec<DebugTraceTransientTask>,
2889 },
2890 Collapsed {
2892 task_name: &'static str,
2893 cell_type_id: Option<ValueTypeId>,
2894 },
2895 Uncached {
2896 cell_type_id: Option<ValueTypeId>,
2897 },
2898}
2899
2900impl DebugTraceTransientTask {
2901 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
2902 let indent = " ".repeat(level);
2903 f.write_str(&indent)?;
2904
2905 fn fmt_cell_type_id(
2906 f: &mut fmt::Formatter<'_>,
2907 cell_type_id: Option<ValueTypeId>,
2908 ) -> fmt::Result {
2909 if let Some(ty) = cell_type_id {
2910 write!(f, " (read cell of type {})", get_value_type_global_name(ty))
2911 } else {
2912 Ok(())
2913 }
2914 }
2915
2916 match self {
2918 Self::Cached {
2919 task_name,
2920 cell_type_id,
2921 ..
2922 }
2923 | Self::Collapsed {
2924 task_name,
2925 cell_type_id,
2926 ..
2927 } => {
2928 f.write_str(task_name)?;
2929 fmt_cell_type_id(f, *cell_type_id)?;
2930 if matches!(self, Self::Collapsed { .. }) {
2931 f.write_str(" (collapsed)")?;
2932 }
2933 }
2934 Self::Uncached { cell_type_id } => {
2935 f.write_str("unknown transient task")?;
2936 fmt_cell_type_id(f, *cell_type_id)?;
2937 }
2938 }
2939 f.write_char('\n')?;
2940
2941 if let Self::Cached {
2943 cause_self,
2944 cause_args,
2945 ..
2946 } = self
2947 {
2948 if let Some(c) = cause_self {
2949 writeln!(f, "{indent} self:")?;
2950 c.fmt_indented(f, level + 1)?;
2951 }
2952 if !cause_args.is_empty() {
2953 writeln!(f, "{indent} args:")?;
2954 for c in cause_args {
2955 c.fmt_indented(f, level + 1)?;
2956 }
2957 }
2958 }
2959 Ok(())
2960 }
2961}
2962
2963impl fmt::Display for DebugTraceTransientTask {
2964 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2965 self.fmt_indented(f, 0)
2966 }
2967}
2968
2969fn far_future() -> Instant {
2971 Instant::now() + Duration::from_secs(86400 * 365 * 30)
2976}