1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6 borrow::Cow,
7 cmp::min,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 ops::Range,
13 pin::Pin,
14 sync::{
15 Arc, LazyLock,
16 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17 },
18};
19
20use anyhow::{Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_tasks::{
29 CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
30 ReadOutputOptions, ReadTracking, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId,
31 TurboTasksBackendApi, ValueTypeId,
32 backend::{
33 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
34 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
35 },
36 event::{Event, EventListener},
37 message_queue::TimingEvent,
38 registry::get_value_type,
39 task_statistics::TaskStatisticsApi,
40 trace::TraceRawVcs,
41 turbo_tasks,
42 util::{IdFactoryWithReuse, good_chunk_size},
43};
44
45pub use self::{operation::AnyOperation, storage::TaskDataCategory};
46#[cfg(feature = "trace_task_dirty")]
47use crate::backend::operation::TaskDirtyCause;
48use crate::{
49 backend::{
50 operation::{
51 AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation,
52 ComputeDirtyAndCleanUpdate, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
53 Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
54 get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
55 },
56 storage::{
57 InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
58 iter_many, remove,
59 },
60 },
61 backing_storage::BackingStorage,
62 data::{
63 ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
64 CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
65 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
66 },
67 utils::{
68 bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
69 ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
70 },
71};
72
73const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
74
75static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
78 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
79 .ok()
80 .and_then(|v| v.parse::<u64>().ok())
81 .map(Duration::from_millis)
82 .unwrap_or(Duration::from_secs(2))
83});
84
85struct SnapshotRequest {
86 snapshot_requested: bool,
87 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
88}
89
90impl SnapshotRequest {
91 fn new() -> Self {
92 Self {
93 snapshot_requested: false,
94 suspended_operations: FxHashSet::default(),
95 }
96 }
97}
98
99type TransientTaskOnce =
100 Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
101
102pub enum TransientTask {
103 Root(TransientTaskRoot),
109
110 Once(TransientTaskOnce),
119}
120
121pub enum StorageMode {
122 ReadOnly,
124 ReadWrite,
127 ReadWriteOnShutdown,
130}
131
132pub struct BackendOptions {
133 pub dependency_tracking: bool,
138
139 pub active_tracking: bool,
145
146 pub storage_mode: Option<StorageMode>,
148
149 pub num_workers: Option<usize>,
152
153 pub small_preallocation: bool,
155}
156
157impl Default for BackendOptions {
158 fn default() -> Self {
159 Self {
160 dependency_tracking: true,
161 active_tracking: true,
162 storage_mode: Some(StorageMode::ReadWrite),
163 num_workers: None,
164 small_preallocation: false,
165 }
166 }
167}
168
169pub enum TurboTasksBackendJob {
170 InitialSnapshot,
171 FollowUpSnapshot,
172 Prefetch {
173 data: Arc<FxIndexMap<TaskId, bool>>,
174 range: Option<Range<usize>>,
175 },
176}
177
178pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
179
180type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
181
182struct TurboTasksBackendInner<B: BackingStorage> {
183 options: BackendOptions,
184
185 start_time: Instant,
186
187 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
188 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
189
190 persisted_task_cache_log: Option<TaskCacheLog>,
191 task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
192 transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
193
194 storage: Storage,
195
196 local_is_partial: AtomicBool,
198
199 in_progress_operations: AtomicUsize,
205
206 snapshot_request: Mutex<SnapshotRequest>,
207 operations_suspended: Condvar,
211 snapshot_completed: Condvar,
214 last_snapshot: AtomicU64,
216
217 stopping: AtomicBool,
218 stopping_event: Event,
219 idle_start_event: Event,
220 idle_end_event: Event,
221 #[cfg(feature = "verify_aggregation_graph")]
222 is_idle: AtomicBool,
223
224 task_statistics: TaskStatisticsApi,
225
226 backing_storage: B,
227
228 #[cfg(feature = "verify_aggregation_graph")]
229 root_tasks: Mutex<FxHashSet<TaskId>>,
230}
231
232impl<B: BackingStorage> TurboTasksBackend<B> {
233 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
234 Self(Arc::new(TurboTasksBackendInner::new(
235 options,
236 backing_storage,
237 )))
238 }
239
240 pub fn backing_storage(&self) -> &B {
241 &self.0.backing_storage
242 }
243}
244
245impl<B: BackingStorage> TurboTasksBackendInner<B> {
246 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
247 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
248 let need_log = matches!(
249 options.storage_mode,
250 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
251 );
252 if !options.dependency_tracking {
253 options.active_tracking = false;
254 }
255 let small_preallocation = options.small_preallocation;
256 let next_task_id = backing_storage
257 .next_free_task_id()
258 .expect("Failed to get task id");
259 Self {
260 options,
261 start_time: Instant::now(),
262 persisted_task_id_factory: IdFactoryWithReuse::new(
263 next_task_id,
264 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
265 ),
266 transient_task_id_factory: IdFactoryWithReuse::new(
267 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
268 TaskId::MAX,
269 ),
270 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
271 task_cache: BiMap::new(),
272 transient_tasks: FxDashMap::default(),
273 local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
274 storage: Storage::new(shard_amount, small_preallocation),
275 in_progress_operations: AtomicUsize::new(0),
276 snapshot_request: Mutex::new(SnapshotRequest::new()),
277 operations_suspended: Condvar::new(),
278 snapshot_completed: Condvar::new(),
279 last_snapshot: AtomicU64::new(0),
280 stopping: AtomicBool::new(false),
281 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
282 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
283 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
284 #[cfg(feature = "verify_aggregation_graph")]
285 is_idle: AtomicBool::new(false),
286 task_statistics: TaskStatisticsApi::default(),
287 backing_storage,
288 #[cfg(feature = "verify_aggregation_graph")]
289 root_tasks: Default::default(),
290 }
291 }
292
293 fn execute_context<'a>(
294 &'a self,
295 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
296 ) -> impl ExecuteContext<'a> {
297 ExecuteContextImpl::new(self, turbo_tasks)
298 }
299
300 unsafe fn execute_context_with_tx<'e, 'tx>(
304 &'e self,
305 tx: Option<&'e B::ReadTransaction<'tx>>,
306 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
307 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
308 where
309 'tx: 'e,
310 {
311 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
313 }
314
315 fn suspending_requested(&self) -> bool {
316 self.should_persist()
317 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
318 }
319
320 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
321 #[cold]
322 fn operation_suspend_point_cold<B: BackingStorage>(
323 this: &TurboTasksBackendInner<B>,
324 suspend: impl FnOnce() -> AnyOperation,
325 ) {
326 let operation = Arc::new(suspend());
327 let mut snapshot_request = this.snapshot_request.lock();
328 if snapshot_request.snapshot_requested {
329 snapshot_request
330 .suspended_operations
331 .insert(operation.clone().into());
332 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
333 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
334 if value == SNAPSHOT_REQUESTED_BIT {
335 this.operations_suspended.notify_all();
336 }
337 this.snapshot_completed
338 .wait_while(&mut snapshot_request, |snapshot_request| {
339 snapshot_request.snapshot_requested
340 });
341 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
342 snapshot_request
343 .suspended_operations
344 .remove(&operation.into());
345 }
346 }
347
348 if self.suspending_requested() {
349 operation_suspend_point_cold(self, suspend);
350 }
351 }
352
353 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
354 if !self.should_persist() {
355 return OperationGuard { backend: None };
356 }
357 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
358 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
359 let mut snapshot_request = self.snapshot_request.lock();
360 if snapshot_request.snapshot_requested {
361 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
362 if value == SNAPSHOT_REQUESTED_BIT {
363 self.operations_suspended.notify_all();
364 }
365 self.snapshot_completed
366 .wait_while(&mut snapshot_request, |snapshot_request| {
367 snapshot_request.snapshot_requested
368 });
369 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
370 }
371 }
372 OperationGuard {
373 backend: Some(self),
374 }
375 }
376
377 fn should_persist(&self) -> bool {
378 matches!(
379 self.options.storage_mode,
380 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
381 )
382 }
383
384 fn should_restore(&self) -> bool {
385 self.options.storage_mode.is_some()
386 }
387
388 fn should_track_dependencies(&self) -> bool {
389 self.options.dependency_tracking
390 }
391
392 fn should_track_activeness(&self) -> bool {
393 self.options.active_tracking
394 }
395
396 fn track_cache_hit(&self, task_type: &CachedTaskType) {
397 self.task_statistics
398 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
399 }
400
401 fn track_cache_miss(&self, task_type: &CachedTaskType) {
402 self.task_statistics
403 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
404 }
405}
406
407pub(crate) struct OperationGuard<'a, B: BackingStorage> {
408 backend: Option<&'a TurboTasksBackendInner<B>>,
409}
410
411impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
412 fn drop(&mut self) {
413 if let Some(backend) = self.backend {
414 let fetch_sub = backend
415 .in_progress_operations
416 .fetch_sub(1, Ordering::AcqRel);
417 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
418 backend.operations_suspended.notify_all();
419 }
420 }
421 }
422}
423
424struct TaskExecutionCompletePrepareResult {
426 pub new_children: FxHashSet<TaskId>,
427 pub removed_data: Vec<CachedDataItem>,
428 pub is_now_immutable: bool,
429 #[cfg(feature = "verify_determinism")]
430 pub no_output_set: bool,
431 pub new_output: Option<OutputValue>,
432 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
433}
434
435impl<B: BackingStorage> TurboTasksBackendInner<B> {
437 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
441 &'l self,
442 tx: Option<&'l B::ReadTransaction<'tx>>,
443 parent_task: Option<TaskId>,
444 child_task: TaskId,
445 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
446 ) {
447 operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
448 self.execute_context_with_tx(tx, turbo_tasks)
449 });
450 }
451
452 fn connect_child(
453 &self,
454 parent_task: Option<TaskId>,
455 child_task: TaskId,
456 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
457 ) {
458 operation::ConnectChildOperation::run(
459 parent_task,
460 child_task,
461 self.execute_context(turbo_tasks),
462 );
463 }
464
465 fn try_read_task_output(
466 self: &Arc<Self>,
467 task_id: TaskId,
468 reader: Option<TaskId>,
469 options: ReadOutputOptions,
470 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
471 ) -> Result<Result<RawVc, EventListener>> {
472 self.assert_not_persistent_calling_transient(reader, task_id, None);
473
474 let mut ctx = self.execute_context(turbo_tasks);
475 let need_reader_task = if self.should_track_dependencies()
476 && !matches!(options.tracking, ReadTracking::Untracked)
477 && reader.is_some_and(|reader_id| reader_id != task_id)
478 && let Some(reader_id) = reader
479 && reader_id != task_id
480 {
481 Some(reader_id)
482 } else {
483 None
484 };
485 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
486 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
490 (task, Some(reader))
491 } else {
492 (ctx.task(task_id, TaskDataCategory::All), None)
493 };
494
495 fn listen_to_done_event<B: BackingStorage>(
496 this: &TurboTasksBackendInner<B>,
497 reader: Option<TaskId>,
498 tracking: ReadTracking,
499 done_event: &Event,
500 ) -> EventListener {
501 done_event.listen_with_note(move || {
502 let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
503 move || {
504 if let Some(reader_desc) = reader_desc.as_ref() {
505 format!("try_read_task_output from {} ({})", reader_desc(), tracking)
506 } else {
507 format!("try_read_task_output ({})", tracking)
508 }
509 }
510 })
511 }
512
513 fn check_in_progress<B: BackingStorage>(
514 this: &TurboTasksBackendInner<B>,
515 task: &impl TaskGuard,
516 reader: Option<TaskId>,
517 tracking: ReadTracking,
518 ctx: &impl ExecuteContext<'_>,
519 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
520 {
521 match get!(task, InProgress) {
522 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
523 listen_to_done_event(this, reader, tracking, done_event),
524 ))),
525 Some(InProgressState::InProgress(box InProgressStateInner {
526 done_event, ..
527 })) => Some(Ok(Err(listen_to_done_event(
528 this, reader, tracking, done_event,
529 )))),
530 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
531 "{} was canceled",
532 ctx.get_task_description(task.id())
533 ))),
534 None => None,
535 }
536 }
537
538 if matches!(options.consistency, ReadConsistency::Strong) {
539 loop {
541 let aggregation_number = get_aggregation_number(&task);
542 if is_root_node(aggregation_number) {
543 break;
544 }
545 drop(task);
546 drop(reader_task);
547 {
548 let _span = tracing::trace_span!(
549 "make root node for strongly consistent read",
550 %task_id
551 )
552 .entered();
553 AggregationUpdateQueue::run(
554 AggregationUpdateJob::UpdateAggregationNumber {
555 task_id,
556 base_aggregation_number: u32::MAX,
557 distance: None,
558 },
559 &mut ctx,
560 );
561 }
562 (task, reader_task) = if let Some(reader_id) = need_reader_task {
563 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
565 (task, Some(reader))
566 } else {
567 (ctx.task(task_id, TaskDataCategory::All), None)
568 }
569 }
570
571 let is_dirty = task.is_dirty();
572
573 let has_dirty_containers = task.has_dirty_containers();
575 if has_dirty_containers || is_dirty {
576 let activeness = get_mut!(task, Activeness);
577 let mut task_ids_to_schedule: Vec<_> = Vec::new();
578 let activeness = if let Some(activeness) = activeness {
580 activeness.set_active_until_clean();
584 activeness
585 } else {
586 get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
590 .set_active_until_clean();
591 if ctx.should_track_activeness() {
592 task_ids_to_schedule = task.dirty_containers().collect();
594 task_ids_to_schedule.push(task_id);
595 }
596 get!(task, Activeness).unwrap()
597 };
598 let listener = activeness.all_clean_event.listen_with_note(move || {
599 let this = self.clone();
600 let tt = turbo_tasks.pin();
601 move || {
602 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
603 let mut ctx = this.execute_context(tt);
604 let mut visited = FxHashSet::default();
605 fn indent(s: &str) -> String {
606 s.split_inclusive('\n')
607 .flat_map(|line: &str| [" ", line].into_iter())
608 .collect::<String>()
609 }
610 fn get_info(
611 ctx: &mut impl ExecuteContext<'_>,
612 task_id: TaskId,
613 parent_and_count: Option<(TaskId, i32)>,
614 visited: &mut FxHashSet<TaskId>,
615 ) -> String {
616 let task = ctx.task(task_id, TaskDataCategory::All);
617 let is_dirty = task.is_dirty();
618 let in_progress =
619 get!(task, InProgress).map_or("not in progress", |p| match p {
620 InProgressState::InProgress(_) => "in progress",
621 InProgressState::Scheduled { .. } => "scheduled",
622 InProgressState::Canceled => "canceled",
623 });
624 let activeness = get!(task, Activeness).map_or_else(
625 || "not active".to_string(),
626 |activeness| format!("{activeness:?}"),
627 );
628 let aggregation_number = get_aggregation_number(&task);
629 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
630 {
631 let uppers = get_uppers(&task);
632 !uppers.contains(&parent_task_id)
633 } else {
634 false
635 };
636
637 let has_dirty_containers = task.has_dirty_containers();
639
640 let task_description = ctx.get_task_description(task_id);
641 let is_dirty_label = if is_dirty { ", dirty" } else { "" };
642 let has_dirty_containers_label = if has_dirty_containers {
643 ", dirty containers"
644 } else {
645 ""
646 };
647 let count = if let Some((_, count)) = parent_and_count {
648 format!(" {count}")
649 } else {
650 String::new()
651 };
652 let mut info = format!(
653 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
654 {in_progress}, \
655 {activeness}{is_dirty_label}{has_dirty_containers_label})",
656 );
657 let children: Vec<_> = task.dirty_containers_with_count().collect();
658 drop(task);
659
660 if missing_upper {
661 info.push_str("\n ERROR: missing upper connection");
662 }
663
664 if has_dirty_containers || !children.is_empty() {
665 writeln!(info, "\n dirty tasks:").unwrap();
666
667 for (child_task_id, count) in children {
668 let task_description = ctx.get_task_description(child_task_id);
669 if visited.insert(child_task_id) {
670 let child_info = get_info(
671 ctx,
672 child_task_id,
673 Some((task_id, count)),
674 visited,
675 );
676 info.push_str(&indent(&child_info));
677 if !info.ends_with('\n') {
678 info.push('\n');
679 }
680 } else {
681 writeln!(
682 info,
683 " {child_task_id} {task_description} {count} \
684 (already visited)"
685 )
686 .unwrap();
687 }
688 }
689 }
690 info
691 }
692 let info = get_info(&mut ctx, task_id, None, &mut visited);
693 format!(
694 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
695 )
696 }
697 });
698 drop(reader_task);
699 drop(task);
700 if !task_ids_to_schedule.is_empty() {
701 let mut queue = AggregationUpdateQueue::new();
702 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
703 queue.execute(&mut ctx);
704 }
705
706 return Ok(Err(listener));
707 }
708 }
709
710 if let Some(value) = check_in_progress(self, &task, reader, options.tracking, &ctx) {
711 return value;
712 }
713
714 if let Some(output) = get!(task, Output) {
715 let result = match output {
716 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
717 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
718 OutputValue::Error(error) => Err(error
719 .with_task_context(ctx.get_task_description(task_id), Some(task_id))
720 .into()),
721 };
722 if let Some(mut reader_task) = reader_task
723 && options.tracking.should_track(result.is_err())
724 && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
725 {
726 #[cfg(feature = "trace_task_output_dependencies")]
727 let _span = tracing::trace_span!(
728 "add output dependency",
729 task = %task_id,
730 dependent_task = ?reader
731 )
732 .entered();
733 let _ = task.add(CachedDataItem::OutputDependent {
734 task: reader.unwrap(),
735 value: (),
736 });
737 drop(task);
738
739 if reader_task
745 .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
746 .is_none()
747 {
748 let _ = reader_task.add(CachedDataItem::OutputDependency {
749 target: task_id,
750 value: (),
751 });
752 }
753 }
754
755 return result;
756 }
757 drop(reader_task);
758
759 let note = move || {
760 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
761 move || {
762 if let Some(reader_desc) = reader_desc.as_ref() {
763 format!("try_read_task_output (recompute) from {}", (reader_desc)())
764 } else {
765 "try_read_task_output (recompute, untracked)".to_string()
766 }
767 }
768 };
769
770 let (item, listener) = CachedDataItem::new_scheduled_with_listener(
772 TaskExecutionReason::OutputNotAvailable,
773 || self.get_task_desc_fn(task_id),
774 note,
775 );
776 task.add_new(item);
779 ctx.schedule_task(task);
780
781 Ok(Err(listener))
782 }
783
784 fn try_read_task_cell(
785 &self,
786 task_id: TaskId,
787 reader: Option<TaskId>,
788 cell: CellId,
789 options: ReadCellOptions,
790 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
791 ) -> Result<Result<TypedCellContent, EventListener>> {
792 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
793
794 fn add_cell_dependency(
795 task_id: TaskId,
796 mut task: impl TaskGuard,
797 reader: Option<TaskId>,
798 reader_task: Option<impl TaskGuard>,
799 cell: CellId,
800 ) {
801 if let Some(mut reader_task) = reader_task
802 && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
803 {
804 let _ = task.add(CachedDataItem::CellDependent {
805 cell,
806 task: reader.unwrap(),
807 value: (),
808 });
809 drop(task);
810
811 let target = CellRef {
817 task: task_id,
818 cell,
819 };
820 if reader_task
821 .remove(&CachedDataItemKey::OutdatedCellDependency { target })
822 .is_none()
823 {
824 let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
825 }
826 }
827 }
828
829 let ReadCellOptions {
830 is_serializable_cell_content,
831 tracking,
832 final_read_hint,
833 } = options;
834
835 let mut ctx = self.execute_context(turbo_tasks);
836 let (mut task, reader_task) = if self.should_track_dependencies()
837 && !matches!(tracking, ReadTracking::Untracked)
838 && let Some(reader_id) = reader
839 && reader_id != task_id
840 {
841 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::Data);
845 (task, Some(reader))
846 } else {
847 (ctx.task(task_id, TaskDataCategory::Data), None)
848 };
849
850 let content = if final_read_hint {
851 task.remove_cell_data(is_serializable_cell_content, cell)
852 } else {
853 task.get_cell_data(is_serializable_cell_content, cell)
854 };
855 if let Some(content) = content {
856 if tracking.should_track(false) {
857 add_cell_dependency(task_id, task, reader, reader_task, cell);
858 }
859 return Ok(Ok(TypedCellContent(
860 cell.type_id,
861 CellContent(Some(content.reference)),
862 )));
863 }
864
865 let in_progress = get!(task, InProgress);
866 if matches!(
867 in_progress,
868 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
869 ) {
870 return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
871 }
872 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
873
874 let max_id = get!(
876 task,
877 CellTypeMaxIndex {
878 cell_type: cell.type_id
879 }
880 )
881 .copied();
882 let Some(max_id) = max_id else {
883 if tracking.should_track(true) {
884 add_cell_dependency(task_id, task, reader, reader_task, cell);
885 }
886 bail!(
887 "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
888 ctx.get_task_description(task_id)
889 );
890 };
891 if cell.index >= max_id {
892 if tracking.should_track(true) {
893 add_cell_dependency(task_id, task, reader, reader_task, cell);
894 }
895 bail!(
896 "Cell {cell:?} no longer exists in task {} (index out of bounds)",
897 ctx.get_task_description(task_id)
898 );
899 }
900 drop(reader_task);
901
902 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
907 if !new_listener {
908 return Ok(Err(listener));
909 }
910
911 let _span = tracing::trace_span!(
912 "recomputation",
913 cell_type = get_value_type(cell.type_id).global_name,
914 cell_index = cell.index
915 )
916 .entered();
917
918 if is_cancelled {
920 bail!("{} was canceled", ctx.get_task_description(task_id));
921 }
922 task.add_new(CachedDataItem::new_scheduled(
923 TaskExecutionReason::CellNotAvailable,
924 || self.get_task_desc_fn(task_id),
925 ));
926 ctx.schedule_task(task);
927
928 Ok(Err(listener))
929 }
930
931 fn listen_to_cell(
932 &self,
933 task: &mut impl TaskGuard,
934 task_id: TaskId,
935 reader: Option<TaskId>,
936 cell: CellId,
937 ) -> (EventListener, bool) {
938 let note = move || {
939 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
940 move || {
941 if let Some(reader_desc) = reader_desc.as_ref() {
942 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
943 } else {
944 "try_read_task_cell (in progress, untracked)".to_string()
945 }
946 }
947 };
948 if let Some(in_progress) = get!(task, InProgressCell { cell }) {
949 let listener = in_progress.event.listen_with_note(note);
951 return (listener, false);
952 }
953 let in_progress = InProgressCellState::new(task_id, cell);
954 let listener = in_progress.event.listen_with_note(note);
955 task.add_new(CachedDataItem::InProgressCell {
956 cell,
957 value: in_progress,
958 });
959 (listener, true)
960 }
961
962 fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
963 if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
964 return Some(task_type);
965 }
966 if self.should_restore()
967 && self.local_is_partial.load(Ordering::Acquire)
968 && !task_id.is_transient()
969 && let Some(task_type) = unsafe {
970 self.backing_storage
971 .reverse_lookup_task_cache(None, task_id)
972 .expect("Failed to lookup task type")
973 }
974 {
975 let _ = self.task_cache.try_insert(task_type.clone(), task_id);
976 return Some(task_type);
977 }
978 None
979 }
980
981 fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
982 let task_type = self.lookup_task_type(task_id);
983 move || {
984 task_type.as_ref().map_or_else(
985 || format!("{task_id:?} transient"),
986 |task_type| format!("{task_id:?} {task_type}"),
987 )
988 }
989 }
990
991 fn snapshot_and_persist(
992 &self,
993 parent_span: Option<tracing::Id>,
994 reason: &str,
995 ) -> Option<(Instant, bool)> {
996 let snapshot_span =
997 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
998 .entered();
999 let start = Instant::now();
1000 debug_assert!(self.should_persist());
1001
1002 let suspended_operations;
1003 {
1004 let _span = tracing::info_span!("blocking").entered();
1005 let mut snapshot_request = self.snapshot_request.lock();
1006 snapshot_request.snapshot_requested = true;
1007 let active_operations = self
1008 .in_progress_operations
1009 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1010 if active_operations != 0 {
1011 self.operations_suspended
1012 .wait_while(&mut snapshot_request, |_| {
1013 self.in_progress_operations.load(Ordering::Relaxed)
1014 != SNAPSHOT_REQUESTED_BIT
1015 });
1016 }
1017 suspended_operations = snapshot_request
1018 .suspended_operations
1019 .iter()
1020 .map(|op| op.arc().clone())
1021 .collect::<Vec<_>>();
1022 }
1023 self.storage.start_snapshot();
1024 let mut persisted_task_cache_log = self
1025 .persisted_task_cache_log
1026 .as_ref()
1027 .map(|l| l.take(|i| i))
1028 .unwrap_or_default();
1029 let mut snapshot_request = self.snapshot_request.lock();
1030 snapshot_request.snapshot_requested = false;
1031 self.in_progress_operations
1032 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1033 self.snapshot_completed.notify_all();
1034 let snapshot_time = Instant::now();
1035 drop(snapshot_request);
1036
1037 let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
1038 if task_id.is_transient() {
1039 return (None, None);
1040 }
1041 let len = inner.len();
1042
1043 let meta_restored = inner.state().meta_restored();
1044 let data_restored = inner.state().data_restored();
1045
1046 let mut meta = meta_restored.then(|| Vec::with_capacity(len));
1047 let mut data = data_restored.then(|| Vec::with_capacity(len));
1048 for (key, value) in inner.iter_all() {
1049 if key.is_persistent() && value.is_persistent() {
1050 match key.category() {
1051 TaskDataCategory::Meta => {
1052 if let Some(meta) = &mut meta {
1053 meta.push(CachedDataItem::from_key_and_value_ref(key, value))
1054 }
1055 }
1056 TaskDataCategory::Data => {
1057 if let Some(data) = &mut data {
1058 data.push(CachedDataItem::from_key_and_value_ref(key, value))
1059 }
1060 }
1061 _ => {}
1062 }
1063 }
1064 }
1065
1066 (meta, data)
1067 };
1068 let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
1069 (
1073 task_id,
1074 meta.map(|d| self.backing_storage.serialize(task_id, &d)),
1075 data.map(|d| self.backing_storage.serialize(task_id, &d)),
1076 )
1077 };
1078 let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
1079 if task_id.is_transient() {
1080 return (task_id, None, None);
1081 }
1082 let len = inner.len();
1083 let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
1084 let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
1085 for (key, value) in inner.iter_all() {
1086 if key.is_persistent() && value.is_persistent() {
1087 match key.category() {
1088 TaskDataCategory::Meta => {
1089 if let Some(meta) = &mut meta {
1090 meta.push(CachedDataItem::from_key_and_value_ref(key, value));
1091 }
1092 }
1093 TaskDataCategory::Data => {
1094 if let Some(data) = &mut data {
1095 data.push(CachedDataItem::from_key_and_value_ref(key, value));
1096 }
1097 }
1098 _ => {}
1099 }
1100 }
1101 }
1102 (
1103 task_id,
1104 meta.map(|meta| self.backing_storage.serialize(task_id, &meta)),
1105 data.map(|data| self.backing_storage.serialize(task_id, &data)),
1106 )
1107 };
1108
1109 let snapshot = self
1110 .storage
1111 .take_snapshot(&preprocess, &process, &process_snapshot);
1112
1113 #[cfg(feature = "print_cache_item_size")]
1114 #[derive(Default)]
1115 struct TaskCacheStats {
1116 data: usize,
1117 data_count: usize,
1118 meta: usize,
1119 meta_count: usize,
1120 }
1121 #[cfg(feature = "print_cache_item_size")]
1122 impl TaskCacheStats {
1123 fn add_data(&mut self, len: usize) {
1124 self.data += len;
1125 self.data_count += 1;
1126 }
1127
1128 fn add_meta(&mut self, len: usize) {
1129 self.meta += len;
1130 self.meta_count += 1;
1131 }
1132 }
1133 #[cfg(feature = "print_cache_item_size")]
1134 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1135 Mutex::new(FxHashMap::default());
1136
1137 let task_snapshots = snapshot
1138 .into_iter()
1139 .filter_map(|iter| {
1140 let mut iter = iter
1141 .filter_map(
1142 |(task_id, meta, data): (
1143 _,
1144 Option<Result<SmallVec<_>>>,
1145 Option<Result<SmallVec<_>>>,
1146 )| {
1147 let meta = match meta {
1148 Some(Ok(meta)) => {
1149 #[cfg(feature = "print_cache_item_size")]
1150 task_cache_stats
1151 .lock()
1152 .entry(self.get_task_description(task_id))
1153 .or_default()
1154 .add_meta(meta.len());
1155 Some(meta)
1156 }
1157 None => None,
1158 Some(Err(err)) => {
1159 println!(
1160 "Serializing task {} failed (meta): {:?}",
1161 self.get_task_description(task_id),
1162 err
1163 );
1164 None
1165 }
1166 };
1167 let data = match data {
1168 Some(Ok(data)) => {
1169 #[cfg(feature = "print_cache_item_size")]
1170 task_cache_stats
1171 .lock()
1172 .entry(self.get_task_description(task_id))
1173 .or_default()
1174 .add_data(data.len());
1175 Some(data)
1176 }
1177 None => None,
1178 Some(Err(err)) => {
1179 println!(
1180 "Serializing task {} failed (data): {:?}",
1181 self.get_task_description(task_id),
1182 err
1183 );
1184 None
1185 }
1186 };
1187 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1188 },
1189 )
1190 .peekable();
1191 iter.peek().is_some().then_some(iter)
1192 })
1193 .collect::<Vec<_>>();
1194
1195 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1196
1197 drop(snapshot_span);
1198
1199 if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1200 return Some((snapshot_time, false));
1201 }
1202
1203 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1204 {
1205 if let Err(err) = self.backing_storage.save_snapshot(
1206 suspended_operations,
1207 persisted_task_cache_log,
1208 task_snapshots,
1209 ) {
1210 println!("Persisting failed: {err:?}");
1211 return None;
1212 }
1213 #[cfg(feature = "print_cache_item_size")]
1214 {
1215 let mut task_cache_stats = task_cache_stats
1216 .into_inner()
1217 .into_iter()
1218 .collect::<Vec<_>>();
1219 if !task_cache_stats.is_empty() {
1220 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1221 (stats_b.data + stats_b.meta, key_b)
1222 .cmp(&(stats_a.data + stats_a.meta, key_a))
1223 });
1224 println!("Task cache stats:");
1225 for (task_desc, stats) in task_cache_stats {
1226 use std::ops::Div;
1227
1228 use turbo_tasks::util::FormatBytes;
1229
1230 println!(
1231 " {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1232 FormatBytes(stats.data + stats.meta),
1233 FormatBytes(stats.meta),
1234 stats.meta_count,
1235 FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1236 FormatBytes(stats.data),
1237 stats.data_count,
1238 FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1239 );
1240 }
1241 }
1242 }
1243 }
1244
1245 let elapsed = start.elapsed();
1246 if elapsed > Duration::from_secs(10) {
1248 turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1249 "Finished writing to filesystem cache".to_string(),
1250 elapsed,
1251 )));
1252 }
1253
1254 Some((snapshot_time, true))
1255 }
1256
1257 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1258 if self.should_restore() {
1259 let uncompleted_operations = self
1263 .backing_storage
1264 .uncompleted_operations()
1265 .expect("Failed to get uncompleted operations");
1266 if !uncompleted_operations.is_empty() {
1267 let mut ctx = self.execute_context(turbo_tasks);
1268 for op in uncompleted_operations {
1269 op.execute(&mut ctx);
1270 }
1271 }
1272 }
1273
1274 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1277 let _span = trace_span!("persisting background job").entered();
1279 let _span = tracing::info_span!("thread").entered();
1280 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1281 }
1282 }
1283
1284 fn stopping(&self) {
1285 self.stopping.store(true, Ordering::Release);
1286 self.stopping_event.notify(usize::MAX);
1287 }
1288
1289 #[allow(unused_variables)]
1290 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1291 #[cfg(feature = "verify_aggregation_graph")]
1292 {
1293 self.is_idle.store(false, Ordering::Release);
1294 self.verify_aggregation_graph(turbo_tasks, false);
1295 }
1296 if self.should_persist() {
1297 self.snapshot_and_persist(Span::current().into(), "stop");
1298 }
1299 self.task_cache.drop_contents();
1300 drop_contents(&self.transient_tasks);
1301 self.storage.drop_contents();
1302 if let Err(err) = self.backing_storage.shutdown() {
1303 println!("Shutting down failed: {err}");
1304 }
1305 }
1306
1307 #[allow(unused_variables)]
1308 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1309 self.idle_start_event.notify(usize::MAX);
1310
1311 #[cfg(feature = "verify_aggregation_graph")]
1312 {
1313 use tokio::select;
1314
1315 self.is_idle.store(true, Ordering::Release);
1316 let this = self.clone();
1317 let turbo_tasks = turbo_tasks.pin();
1318 tokio::task::spawn(async move {
1319 select! {
1320 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1321 }
1323 _ = this.idle_end_event.listen() => {
1324 return;
1325 }
1326 }
1327 if !this.is_idle.load(Ordering::Relaxed) {
1328 return;
1329 }
1330 this.verify_aggregation_graph(&*turbo_tasks, true);
1331 });
1332 }
1333 }
1334
1335 fn idle_end(&self) {
1336 #[cfg(feature = "verify_aggregation_graph")]
1337 self.is_idle.store(false, Ordering::Release);
1338 self.idle_end_event.notify(usize::MAX);
1339 }
1340
1341 fn get_or_create_persistent_task(
1342 &self,
1343 task_type: CachedTaskType,
1344 parent_task: Option<TaskId>,
1345 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1346 ) -> TaskId {
1347 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1348 self.track_cache_hit(&task_type);
1349 self.connect_child(parent_task, task_id, turbo_tasks);
1350 return task_id;
1351 }
1352
1353 let check_backing_storage =
1354 self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1355 let tx = check_backing_storage
1356 .then(|| self.backing_storage.start_read_transaction())
1357 .flatten();
1358 let task_id = {
1359 if let Some(task_id) = unsafe {
1361 check_backing_storage
1362 .then(|| {
1363 self.backing_storage
1364 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1365 .expect("Failed to lookup task id")
1366 })
1367 .flatten()
1368 } {
1369 self.track_cache_hit(&task_type);
1370 let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1371 task_id
1372 } else {
1373 let task_type = Arc::new(task_type);
1374 let task_id = self.persisted_task_id_factory.get();
1375 let task_id = if let Err(existing_task_id) =
1376 self.task_cache.try_insert(task_type.clone(), task_id)
1377 {
1378 self.track_cache_hit(&task_type);
1379 unsafe {
1381 self.persisted_task_id_factory.reuse(task_id);
1382 }
1383 existing_task_id
1384 } else {
1385 self.track_cache_miss(&task_type);
1386 task_id
1387 };
1388 if let Some(log) = &self.persisted_task_cache_log {
1389 log.lock(task_id).push((task_type, task_id));
1390 }
1391 task_id
1392 }
1393 };
1394
1395 unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1397
1398 task_id
1399 }
1400
1401 fn get_or_create_transient_task(
1402 &self,
1403 task_type: CachedTaskType,
1404 parent_task: Option<TaskId>,
1405 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1406 ) -> TaskId {
1407 if let Some(parent_task) = parent_task
1408 && !parent_task.is_transient()
1409 {
1410 self.panic_persistent_calling_transient(
1411 self.lookup_task_type(parent_task).as_deref(),
1412 Some(&task_type),
1413 None,
1414 );
1415 }
1416 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1417 self.track_cache_hit(&task_type);
1418 self.connect_child(parent_task, task_id, turbo_tasks);
1419 return task_id;
1420 }
1421
1422 let task_type = Arc::new(task_type);
1423 let task_id = self.transient_task_id_factory.get();
1424 if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1425 self.track_cache_hit(&task_type);
1426 unsafe {
1428 self.transient_task_id_factory.reuse(task_id);
1429 }
1430 self.connect_child(parent_task, existing_task_id, turbo_tasks);
1431 return existing_task_id;
1432 }
1433
1434 self.track_cache_miss(&task_type);
1435 self.connect_child(parent_task, task_id, turbo_tasks);
1436
1437 task_id
1438 }
1439
1440 fn debug_trace_transient_task(
1443 &self,
1444 task_type: &CachedTaskType,
1445 cell_id: Option<CellId>,
1446 ) -> DebugTraceTransientTask {
1447 fn inner_id(
1450 backend: &TurboTasksBackendInner<impl BackingStorage>,
1451 task_id: TaskId,
1452 cell_type_id: Option<ValueTypeId>,
1453 visited_set: &mut FxHashSet<TaskId>,
1454 ) -> DebugTraceTransientTask {
1455 if let Some(task_type) = backend.lookup_task_type(task_id) {
1456 if visited_set.contains(&task_id) {
1457 let task_name = task_type.get_name();
1458 DebugTraceTransientTask::Collapsed {
1459 task_name,
1460 cell_type_id,
1461 }
1462 } else {
1463 inner_cached(backend, &task_type, cell_type_id, visited_set)
1464 }
1465 } else {
1466 DebugTraceTransientTask::Uncached { cell_type_id }
1467 }
1468 }
1469 fn inner_cached(
1470 backend: &TurboTasksBackendInner<impl BackingStorage>,
1471 task_type: &CachedTaskType,
1472 cell_type_id: Option<ValueTypeId>,
1473 visited_set: &mut FxHashSet<TaskId>,
1474 ) -> DebugTraceTransientTask {
1475 let task_name = task_type.get_name();
1476
1477 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1478 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1479 return None;
1483 };
1484 if task_id.is_transient() {
1485 Some(Box::new(inner_id(
1486 backend,
1487 task_id,
1488 cause_self_raw_vc.try_get_type_id(),
1489 visited_set,
1490 )))
1491 } else {
1492 None
1493 }
1494 });
1495 let cause_args = task_type
1496 .arg
1497 .get_raw_vcs()
1498 .into_iter()
1499 .filter_map(|raw_vc| {
1500 let Some(task_id) = raw_vc.try_get_task_id() else {
1501 return None;
1503 };
1504 if !task_id.is_transient() {
1505 return None;
1506 }
1507 Some((task_id, raw_vc.try_get_type_id()))
1508 })
1509 .collect::<IndexSet<_>>() .into_iter()
1511 .map(|(task_id, cell_type_id)| {
1512 inner_id(backend, task_id, cell_type_id, visited_set)
1513 })
1514 .collect();
1515
1516 DebugTraceTransientTask::Cached {
1517 task_name,
1518 cell_type_id,
1519 cause_self,
1520 cause_args,
1521 }
1522 }
1523 inner_cached(
1524 self,
1525 task_type,
1526 cell_id.map(|c| c.type_id),
1527 &mut FxHashSet::default(),
1528 )
1529 }
1530
1531 fn invalidate_task(
1532 &self,
1533 task_id: TaskId,
1534 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1535 ) {
1536 if !self.should_track_dependencies() {
1537 panic!("Dependency tracking is disabled so invalidation is not allowed");
1538 }
1539 operation::InvalidateOperation::run(
1540 smallvec![task_id],
1541 #[cfg(feature = "trace_task_dirty")]
1542 TaskDirtyCause::Invalidator,
1543 self.execute_context(turbo_tasks),
1544 );
1545 }
1546
1547 fn invalidate_tasks(
1548 &self,
1549 tasks: &[TaskId],
1550 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1551 ) {
1552 if !self.should_track_dependencies() {
1553 panic!("Dependency tracking is disabled so invalidation is not allowed");
1554 }
1555 operation::InvalidateOperation::run(
1556 tasks.iter().copied().collect(),
1557 #[cfg(feature = "trace_task_dirty")]
1558 TaskDirtyCause::Unknown,
1559 self.execute_context(turbo_tasks),
1560 );
1561 }
1562
1563 fn invalidate_tasks_set(
1564 &self,
1565 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1566 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1567 ) {
1568 if !self.should_track_dependencies() {
1569 panic!("Dependency tracking is disabled so invalidation is not allowed");
1570 }
1571 operation::InvalidateOperation::run(
1572 tasks.iter().copied().collect(),
1573 #[cfg(feature = "trace_task_dirty")]
1574 TaskDirtyCause::Unknown,
1575 self.execute_context(turbo_tasks),
1576 );
1577 }
1578
1579 fn invalidate_serialization(
1580 &self,
1581 task_id: TaskId,
1582 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1583 ) {
1584 if task_id.is_transient() {
1585 return;
1586 }
1587 let mut ctx = self.execute_context(turbo_tasks);
1588 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1589 task.invalidate_serialization();
1590 }
1591
1592 fn get_task_description(&self, task_id: TaskId) -> String {
1593 self.lookup_task_type(task_id).map_or_else(
1594 || format!("{task_id:?} transient"),
1595 |task_type| task_type.to_string(),
1596 )
1597 }
1598
1599 fn task_execution_canceled(
1600 &self,
1601 task_id: TaskId,
1602 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1603 ) {
1604 let mut ctx = self.execute_context(turbo_tasks);
1605 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1606 if let Some(in_progress) = remove!(task, InProgress) {
1607 match in_progress {
1608 InProgressState::Scheduled {
1609 done_event,
1610 reason: _,
1611 } => done_event.notify(usize::MAX),
1612 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1613 done_event.notify(usize::MAX)
1614 }
1615 InProgressState::Canceled => {}
1616 }
1617 }
1618 task.add_new(CachedDataItem::InProgress {
1619 value: InProgressState::Canceled,
1620 });
1621 }
1622
1623 fn try_start_task_execution(
1624 &self,
1625 task_id: TaskId,
1626 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1627 ) -> Option<TaskExecutionSpec<'_>> {
1628 enum TaskType {
1629 Cached(Arc<CachedTaskType>),
1630 Transient(Arc<TransientTask>),
1631 }
1632 let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1633 (TaskType::Cached(task_type), false)
1634 } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1635 (
1636 TaskType::Transient(task_type.clone()),
1637 matches!(**task_type, TransientTask::Once(_)),
1638 )
1639 } else {
1640 return None;
1641 };
1642 let execution_reason;
1643 {
1644 let mut ctx = self.execute_context(turbo_tasks);
1645 let mut task = ctx.task(task_id, TaskDataCategory::All);
1646 let in_progress = remove!(task, InProgress)?;
1647 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1648 task.add_new(CachedDataItem::InProgress { value: in_progress });
1649 return None;
1650 };
1651 execution_reason = reason;
1652 task.add_new(CachedDataItem::InProgress {
1653 value: InProgressState::InProgress(Box::new(InProgressStateInner {
1654 stale: false,
1655 once_task,
1656 done_event,
1657 session_dependent: false,
1658 marked_as_completed: false,
1659 new_children: Default::default(),
1660 })),
1661 });
1662
1663 enum Collectible {
1665 Current(CollectibleRef, i32),
1666 Outdated(CollectibleRef),
1667 }
1668 let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1669 .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1670 .collect::<Vec<_>>();
1671 for collectible in collectibles {
1672 match collectible {
1673 Collectible::Current(collectible, value) => {
1674 let _ =
1675 task.insert(CachedDataItem::OutdatedCollectible { collectible, value });
1676 }
1677 Collectible::Outdated(collectible) => {
1678 if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1679 task.remove(&CachedDataItemKey::OutdatedCollectible { collectible });
1680 }
1681 }
1682 }
1683 }
1684
1685 if self.should_track_dependencies() {
1686 let outdated_cell_dependencies_to_add =
1688 iter_many!(task, CellDependency { target } => target)
1689 .collect::<SmallVec<[_; 8]>>();
1690 let outdated_cell_dependencies_to_remove =
1691 iter_many!(task, OutdatedCellDependency { target } => target)
1692 .filter(|&target| {
1693 !task.has_key(&CachedDataItemKey::CellDependency { target })
1694 })
1695 .collect::<SmallVec<[_; 8]>>();
1696 task.extend(
1697 CachedDataItemType::OutdatedCellDependency,
1698 outdated_cell_dependencies_to_add
1699 .into_iter()
1700 .map(|target| CachedDataItem::OutdatedCellDependency { target, value: () }),
1701 );
1702 for target in outdated_cell_dependencies_to_remove {
1703 task.remove(&CachedDataItemKey::OutdatedCellDependency { target });
1704 }
1705
1706 let outdated_output_dependencies_to_add =
1707 iter_many!(task, OutputDependency { target } => target)
1708 .collect::<SmallVec<[_; 8]>>();
1709 let outdated_output_dependencies_to_remove =
1710 iter_many!(task, OutdatedOutputDependency { target } => target)
1711 .filter(|&target| {
1712 !task.has_key(&CachedDataItemKey::OutputDependency { target })
1713 })
1714 .collect::<SmallVec<[_; 8]>>();
1715 task.extend(
1716 CachedDataItemType::OutdatedOutputDependency,
1717 outdated_output_dependencies_to_add
1718 .into_iter()
1719 .map(|target| CachedDataItem::OutdatedOutputDependency {
1720 target,
1721 value: (),
1722 }),
1723 );
1724 for target in outdated_output_dependencies_to_remove {
1725 task.remove(&CachedDataItemKey::OutdatedOutputDependency { target });
1726 }
1727 }
1728 }
1729
1730 let (span, future) = match task_type {
1731 TaskType::Cached(task_type) => {
1732 let CachedTaskType {
1733 native_fn,
1734 this,
1735 arg,
1736 } = &*task_type;
1737 (
1738 native_fn.span(task_id.persistence(), execution_reason),
1739 native_fn.execute(*this, &**arg),
1740 )
1741 }
1742 TaskType::Transient(task_type) => {
1743 let span = tracing::trace_span!("turbo_tasks::root_task");
1744 let future = match &*task_type {
1745 TransientTask::Root(f) => f(),
1746 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1747 };
1748 (span, future)
1749 }
1750 };
1751 Some(TaskExecutionSpec { future, span })
1752 }
1753
1754 fn task_execution_completed(
1755 &self,
1756 task_id: TaskId,
1757 result: Result<RawVc, TurboTasksExecutionError>,
1758 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1759 stateful: bool,
1760 has_invalidator: bool,
1761 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1762 ) -> bool {
1763 #[cfg(not(feature = "trace_task_details"))]
1778 let _span = tracing::trace_span!("task execution completed").entered();
1779 #[cfg(feature = "trace_task_details")]
1780 let span = tracing::trace_span!(
1781 "task execution completed",
1782 task_id = display(task_id),
1783 result = match result.as_ref() {
1784 Ok(value) => display(either::Either::Left(value)),
1785 Err(err) => display(either::Either::Right(err)),
1786 },
1787 immutable = tracing::field::Empty,
1788 new_output = tracing::field::Empty,
1789 output_dependents = tracing::field::Empty,
1790 stale = tracing::field::Empty,
1791 )
1792 .entered();
1793 let mut ctx = self.execute_context(turbo_tasks);
1794
1795 let Some(TaskExecutionCompletePrepareResult {
1796 new_children,
1797 mut removed_data,
1798 is_now_immutable,
1799 #[cfg(feature = "verify_determinism")]
1800 no_output_set,
1801 new_output,
1802 output_dependent_tasks,
1803 }) = self.task_execution_completed_prepare(
1804 &mut ctx,
1805 #[cfg(feature = "trace_task_details")]
1806 &span,
1807 task_id,
1808 result,
1809 cell_counters,
1810 stateful,
1811 has_invalidator,
1812 )
1813 else {
1814 #[cfg(feature = "trace_task_details")]
1816 span.record("stale", "true");
1817 return true;
1818 };
1819
1820 #[cfg(feature = "trace_task_details")]
1821 span.record("new_output", new_output.is_some());
1822 #[cfg(feature = "trace_task_details")]
1823 span.record("output_dependents", output_dependent_tasks.len());
1824
1825 if !output_dependent_tasks.is_empty() {
1830 self.task_execution_completed_invalidate_output_dependent(
1831 &mut ctx,
1832 task_id,
1833 output_dependent_tasks,
1834 );
1835 }
1836
1837 let has_new_children = !new_children.is_empty();
1838
1839 if has_new_children {
1840 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1841 }
1842
1843 if has_new_children
1844 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1845 {
1846 #[cfg(feature = "trace_task_details")]
1848 span.record("stale", "true");
1849 return true;
1850 }
1851
1852 if self.task_execution_completed_finish(
1853 &mut ctx,
1854 task_id,
1855 #[cfg(feature = "verify_determinism")]
1856 no_output_set,
1857 new_output,
1858 &mut removed_data,
1859 is_now_immutable,
1860 ) {
1861 #[cfg(feature = "trace_task_details")]
1863 span.record("stale", "true");
1864 return true;
1865 }
1866
1867 drop(removed_data);
1868
1869 self.task_execution_completed_cleanup(&mut ctx, task_id);
1870
1871 false
1872 }
1873
1874 fn task_execution_completed_prepare(
1875 &self,
1876 ctx: &mut impl ExecuteContext<'_>,
1877 #[cfg(feature = "trace_task_details")] span: &Span,
1878 task_id: TaskId,
1879 result: Result<RawVc, TurboTasksExecutionError>,
1880 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1881 stateful: bool,
1882 has_invalidator: bool,
1883 ) -> Option<TaskExecutionCompletePrepareResult> {
1884 let mut task = ctx.task(task_id, TaskDataCategory::All);
1885 let Some(in_progress) = get_mut!(task, InProgress) else {
1886 panic!("Task execution completed, but task is not in progress: {task:#?}");
1887 };
1888 if matches!(in_progress, InProgressState::Canceled) {
1889 return Some(TaskExecutionCompletePrepareResult {
1890 new_children: Default::default(),
1891 removed_data: Default::default(),
1892 is_now_immutable: false,
1893 #[cfg(feature = "verify_determinism")]
1894 no_output_set: false,
1895 new_output: None,
1896 output_dependent_tasks: Default::default(),
1897 });
1898 }
1899 let &mut InProgressState::InProgress(box InProgressStateInner {
1900 stale,
1901 ref mut new_children,
1902 session_dependent,
1903 ..
1904 }) = in_progress
1905 else {
1906 panic!("Task execution completed, but task is not in progress: {task:#?}");
1907 };
1908
1909 #[cfg(not(feature = "no_fast_stale"))]
1911 if stale {
1912 let Some(InProgressState::InProgress(box InProgressStateInner {
1913 done_event,
1914 mut new_children,
1915 ..
1916 })) = remove!(task, InProgress)
1917 else {
1918 unreachable!();
1919 };
1920 task.add_new(CachedDataItem::InProgress {
1921 value: InProgressState::Scheduled {
1922 done_event,
1923 reason: TaskExecutionReason::Stale,
1924 },
1925 });
1926 for task in iter_many!(task, Child { task } => task) {
1929 new_children.remove(&task);
1930 }
1931 drop(task);
1932
1933 AggregationUpdateQueue::run(
1936 AggregationUpdateJob::DecreaseActiveCounts {
1937 task_ids: new_children.into_iter().collect(),
1938 },
1939 ctx,
1940 );
1941 return None;
1942 }
1943
1944 let mut new_children = take(new_children);
1946
1947 if stateful {
1949 let _ = task.add(CachedDataItem::Stateful { value: () });
1950 }
1951
1952 if has_invalidator {
1954 let _ = task.add(CachedDataItem::HasInvalidator { value: () });
1955 }
1956
1957 let old_counters: FxHashMap<_, _> =
1959 get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1960 let mut counters_to_remove = old_counters.clone();
1961
1962 task.extend(
1963 CachedDataItemType::CellTypeMaxIndex,
1964 cell_counters.iter().filter_map(|(&cell_type, &max_index)| {
1965 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1966 if old_max_index != max_index {
1967 Some(CachedDataItem::CellTypeMaxIndex {
1968 cell_type,
1969 value: max_index,
1970 })
1971 } else {
1972 None
1973 }
1974 } else {
1975 Some(CachedDataItem::CellTypeMaxIndex {
1976 cell_type,
1977 value: max_index,
1978 })
1979 }
1980 }),
1981 );
1982 for (cell_type, _) in counters_to_remove {
1983 task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1984 }
1985
1986 let mut queue = AggregationUpdateQueue::new();
1987
1988 let mut removed_data = Vec::new();
1989 let mut old_edges = Vec::new();
1990
1991 let has_children = !new_children.is_empty();
1992 let is_immutable = task.is_immutable();
1993 let task_dependencies_for_immutable =
1994 if !is_immutable
1996 && !session_dependent
1998 && !task.has_key(&CachedDataItemKey::HasInvalidator {})
2000 && count!(task, CollectiblesDependency) == 0
2002 {
2003 Some(
2004 iter_many!(task, OutputDependency { target } => target)
2006 .chain(iter_many!(task, CellDependency { target } => target.task))
2007 .collect::<FxHashSet<_>>(),
2008 )
2009 } else {
2010 None
2011 };
2012
2013 if has_children {
2014 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2016
2017 old_edges.extend(
2019 iter_many!(task, Child { task } => task)
2020 .filter(|task| !new_children.remove(task))
2021 .map(OutdatedEdge::Child),
2022 );
2023 } else {
2024 old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
2025 }
2026
2027 if task_id.is_transient() || iter_many!(task, CellData { cell }
2032 if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
2033 ).count() > 0 {
2034 removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
2035 matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
2036 .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
2037 }));
2038 }
2039 if task_id.is_transient() || iter_many!(task, TransientCellData { cell }
2040 if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
2041 ).count() > 0 {
2042 removed_data.extend(task.extract_if(CachedDataItemType::TransientCellData, |key, _| {
2043 matches!(key, CachedDataItemKey::TransientCellData { cell } if cell_counters
2044 .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
2045 }));
2046 }
2047
2048 old_edges.extend(
2049 task.iter(CachedDataItemType::OutdatedCollectible)
2050 .filter_map(|(key, value)| match (key, value) {
2051 (
2052 CachedDataItemKey::OutdatedCollectible { collectible },
2053 CachedDataItemValueRef::OutdatedCollectible { value },
2054 ) => Some(OutdatedEdge::Collectible(collectible, *value)),
2055 _ => None,
2056 }),
2057 );
2058
2059 if self.should_track_dependencies() {
2060 old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
2061 old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
2062 old_edges.extend(
2063 iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
2064 |(cell, task)| {
2065 if cell_counters
2066 .get(&cell.type_id)
2067 .is_none_or(|start_index| cell.index >= *start_index)
2068 && let Some(old_counter) = old_counters.get(&cell.type_id)
2069 && cell.index < *old_counter
2070 {
2071 return Some(OutdatedEdge::RemovedCellDependent {
2072 task_id: task,
2073 #[cfg(feature = "trace_task_dirty")]
2074 value_type_id: cell.type_id,
2075 });
2076 }
2077 None
2078 },
2079 ),
2080 );
2081 }
2082
2083 let current_output = get!(task, Output);
2085 #[cfg(feature = "verify_determinism")]
2086 let no_output_set = current_output.is_none();
2087 let new_output = match result {
2088 Ok(RawVc::TaskOutput(output_task_id)) => {
2089 if let Some(OutputValue::Output(current_task_id)) = current_output
2090 && *current_task_id == output_task_id
2091 {
2092 None
2093 } else {
2094 Some(OutputValue::Output(output_task_id))
2095 }
2096 }
2097 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2098 if let Some(OutputValue::Cell(CellRef {
2099 task: current_task_id,
2100 cell: current_cell,
2101 })) = current_output
2102 && *current_task_id == output_task_id
2103 && *current_cell == cell
2104 {
2105 None
2106 } else {
2107 Some(OutputValue::Cell(CellRef {
2108 task: output_task_id,
2109 cell,
2110 }))
2111 }
2112 }
2113 Ok(RawVc::LocalOutput(..)) => {
2114 panic!("Non-local tasks must not return a local Vc");
2115 }
2116 Err(err) => {
2117 if let Some(OutputValue::Error(old_error)) = current_output
2118 && old_error == &err
2119 {
2120 None
2121 } else {
2122 Some(OutputValue::Error(err))
2123 }
2124 }
2125 };
2126 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2127 if new_output.is_some() && ctx.should_track_dependencies() {
2129 output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
2130 }
2131
2132 drop(task);
2133
2134 let mut is_now_immutable = false;
2136 if let Some(dependencies) = task_dependencies_for_immutable
2137 && dependencies
2138 .iter()
2139 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).is_immutable())
2140 {
2141 is_now_immutable = true;
2142 }
2143 #[cfg(feature = "trace_task_details")]
2144 span.record("immutable", is_immutable || is_now_immutable);
2145
2146 if !queue.is_empty() || !old_edges.is_empty() {
2147 #[cfg(feature = "trace_task_completion")]
2148 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2149 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2153 }
2154
2155 Some(TaskExecutionCompletePrepareResult {
2156 new_children,
2157 removed_data,
2158 is_now_immutable,
2159 #[cfg(feature = "verify_determinism")]
2160 no_output_set,
2161 new_output,
2162 output_dependent_tasks,
2163 })
2164 }
2165
2166 fn task_execution_completed_invalidate_output_dependent(
2167 &self,
2168 ctx: &mut impl ExecuteContext<'_>,
2169 task_id: TaskId,
2170 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2171 ) {
2172 debug_assert!(!output_dependent_tasks.is_empty());
2173
2174 let mut queue = AggregationUpdateQueue::new();
2175 for dependent_task_id in output_dependent_tasks {
2176 #[cfg(feature = "trace_task_output_dependencies")]
2177 let span = tracing::trace_span!(
2178 "invalidate output dependency",
2179 task = %task_id,
2180 dependent_task = %dependent_task_id,
2181 result = tracing::field::Empty,
2182 )
2183 .entered();
2184 if ctx.is_once_task(dependent_task_id) {
2185 #[cfg(feature = "trace_task_output_dependencies")]
2187 span.record("result", "once task");
2188 continue;
2189 }
2190 let mut make_stale = true;
2191 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2192 if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
2193 #[cfg(feature = "trace_task_output_dependencies")]
2194 span.record("result", "outdated dependency");
2195 make_stale = false;
2200 } else if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
2201 #[cfg(feature = "trace_task_output_dependencies")]
2204 span.record("result", "no backward dependency");
2205 continue;
2206 }
2207 make_task_dirty_internal(
2208 dependent,
2209 dependent_task_id,
2210 make_stale,
2211 #[cfg(feature = "trace_task_dirty")]
2212 TaskDirtyCause::OutputChange { task_id },
2213 &mut queue,
2214 ctx,
2215 );
2216 #[cfg(feature = "trace_task_output_dependencies")]
2217 span.record("result", "marked dirty");
2218 }
2219
2220 queue.execute(ctx);
2221 }
2222
2223 fn task_execution_completed_unfinished_children_dirty(
2224 &self,
2225 ctx: &mut impl ExecuteContext<'_>,
2226 new_children: &FxHashSet<TaskId>,
2227 ) {
2228 debug_assert!(!new_children.is_empty());
2229
2230 let mut queue = AggregationUpdateQueue::new();
2231 for &child_id in new_children {
2232 let child_task = ctx.task(child_id, TaskDataCategory::Meta);
2233 if !child_task.has_key(&CachedDataItemKey::Output {}) {
2234 make_task_dirty_internal(
2235 child_task,
2236 child_id,
2237 false,
2238 #[cfg(feature = "trace_task_dirty")]
2239 TaskDirtyCause::InitialDirty,
2240 &mut queue,
2241 ctx,
2242 );
2243 }
2244 }
2245
2246 queue.execute(ctx);
2247 }
2248
2249 fn task_execution_completed_connect(
2250 &self,
2251 ctx: &mut impl ExecuteContext<'_>,
2252 task_id: TaskId,
2253 new_children: FxHashSet<TaskId>,
2254 ) -> bool {
2255 debug_assert!(!new_children.is_empty());
2256
2257 let mut task = ctx.task(task_id, TaskDataCategory::All);
2258 let Some(in_progress) = get!(task, InProgress) else {
2259 panic!("Task execution completed, but task is not in progress: {task:#?}");
2260 };
2261 if matches!(in_progress, InProgressState::Canceled) {
2262 return false;
2264 }
2265 let InProgressState::InProgress(box InProgressStateInner {
2266 #[cfg(not(feature = "no_fast_stale"))]
2267 stale,
2268 ..
2269 }) = in_progress
2270 else {
2271 panic!("Task execution completed, but task is not in progress: {task:#?}");
2272 };
2273
2274 #[cfg(not(feature = "no_fast_stale"))]
2276 if *stale {
2277 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2278 remove!(task, InProgress)
2279 else {
2280 unreachable!();
2281 };
2282 task.add_new(CachedDataItem::InProgress {
2283 value: InProgressState::Scheduled {
2284 done_event,
2285 reason: TaskExecutionReason::Stale,
2286 },
2287 });
2288 drop(task);
2289
2290 AggregationUpdateQueue::run(
2293 AggregationUpdateJob::DecreaseActiveCounts {
2294 task_ids: new_children.into_iter().collect(),
2295 },
2296 ctx,
2297 );
2298 return true;
2299 }
2300
2301 let has_active_count = ctx.should_track_activeness()
2302 && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
2303 connect_children(
2304 ctx,
2305 task_id,
2306 task,
2307 new_children,
2308 has_active_count,
2309 ctx.should_track_activeness(),
2310 );
2311
2312 false
2313 }
2314
2315 fn task_execution_completed_finish(
2316 &self,
2317 ctx: &mut impl ExecuteContext<'_>,
2318 task_id: TaskId,
2319 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2320 new_output: Option<OutputValue>,
2321 removed_data: &mut Vec<CachedDataItem>,
2322 is_now_immutable: bool,
2323 ) -> bool {
2324 let mut task = ctx.task(task_id, TaskDataCategory::All);
2325 let Some(in_progress) = remove!(task, InProgress) else {
2326 panic!("Task execution completed, but task is not in progress: {task:#?}");
2327 };
2328 if matches!(in_progress, InProgressState::Canceled) {
2329 return false;
2331 }
2332 let InProgressState::InProgress(box InProgressStateInner {
2333 done_event,
2334 once_task: _,
2335 stale,
2336 session_dependent,
2337 marked_as_completed: _,
2338 new_children,
2339 }) = in_progress
2340 else {
2341 panic!("Task execution completed, but task is not in progress: {task:#?}");
2342 };
2343 debug_assert!(new_children.is_empty());
2344
2345 if stale {
2347 task.add_new(CachedDataItem::InProgress {
2348 value: InProgressState::Scheduled {
2349 done_event,
2350 reason: TaskExecutionReason::Stale,
2351 },
2352 });
2353 return true;
2354 }
2355
2356 let mut old_content = None;
2358 if let Some(value) = new_output {
2359 old_content = task.insert(CachedDataItem::Output { value });
2360 }
2361
2362 if is_now_immutable {
2365 let _ = task.add(CachedDataItem::Immutable { value: () });
2366 }
2367
2368 removed_data.extend(task.extract_if(
2370 CachedDataItemType::InProgressCell,
2371 |key, value| match (key, value) {
2372 (
2373 CachedDataItemKey::InProgressCell { .. },
2374 CachedDataItemValueRef::InProgressCell { value },
2375 ) => {
2376 value.event.notify(usize::MAX);
2377 true
2378 }
2379 _ => false,
2380 },
2381 ));
2382
2383 let old_dirtyness = get!(task, Dirty).cloned();
2385 let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2386 None => (false, false),
2387 Some(Dirtyness::Dirty) => (true, false),
2388 Some(Dirtyness::SessionDependent) => {
2389 let clean_in_current_session = get!(task, CurrentSessionClean).is_some();
2390 (true, clean_in_current_session)
2391 }
2392 };
2393
2394 let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2396 (Some(Dirtyness::SessionDependent), true, true)
2397 } else {
2398 (None, false, false)
2399 };
2400
2401 if old_dirtyness != new_dirtyness {
2403 if let Some(value) = new_dirtyness {
2404 task.insert(CachedDataItem::Dirty { value });
2405 } else if old_dirtyness.is_some() {
2406 task.remove(&CachedDataItemKey::Dirty {});
2407 }
2408 }
2409 if old_current_session_self_clean != new_current_session_self_clean {
2410 if new_current_session_self_clean {
2411 task.insert(CachedDataItem::CurrentSessionClean { value: () });
2412 } else if old_current_session_self_clean {
2413 task.remove(&CachedDataItemKey::CurrentSessionClean {});
2414 }
2415 }
2416
2417 let data_update = if old_self_dirty != new_self_dirty
2419 || old_current_session_self_clean != new_current_session_self_clean
2420 {
2421 let dirty_container_count = get!(task, AggregatedDirtyContainerCount)
2422 .cloned()
2423 .unwrap_or_default();
2424 let current_session_clean_container_count =
2425 get!(task, AggregatedCurrentSessionCleanContainerCount)
2426 .copied()
2427 .unwrap_or_default();
2428 let result = ComputeDirtyAndCleanUpdate {
2429 old_dirty_container_count: dirty_container_count,
2430 new_dirty_container_count: dirty_container_count,
2431 old_current_session_clean_container_count: current_session_clean_container_count,
2432 new_current_session_clean_container_count: current_session_clean_container_count,
2433 old_self_dirty,
2434 new_self_dirty,
2435 old_current_session_self_clean,
2436 new_current_session_self_clean,
2437 }
2438 .compute();
2439 if result.dirty_count_update - result.current_session_clean_update < 0 {
2440 if let Some(activeness_state) = get_mut!(task, Activeness) {
2442 activeness_state.all_clean_event.notify(usize::MAX);
2443 activeness_state.unset_active_until_clean();
2444 if activeness_state.is_empty() {
2445 task.remove(&CachedDataItemKey::Activeness {});
2446 }
2447 }
2448 }
2449 result
2450 .aggregated_update(task_id)
2451 .and_then(|aggregated_update| {
2452 AggregationUpdateJob::data_update(&mut task, aggregated_update)
2453 })
2454 } else {
2455 None
2456 };
2457
2458 #[cfg(feature = "verify_determinism")]
2459 let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2460 #[cfg(not(feature = "verify_determinism"))]
2461 let reschedule = false;
2462 if reschedule {
2463 task.add_new(CachedDataItem::InProgress {
2464 value: InProgressState::Scheduled {
2465 done_event,
2466 reason: TaskExecutionReason::Stale,
2467 },
2468 });
2469 drop(task);
2470 } else {
2471 drop(task);
2472
2473 done_event.notify(usize::MAX);
2475 }
2476
2477 drop(old_content);
2478
2479 if let Some(data_update) = data_update {
2480 AggregationUpdateQueue::run(data_update, ctx);
2481 }
2482
2483 reschedule
2484 }
2485
2486 fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
2487 let mut task = ctx.task(task_id, TaskDataCategory::All);
2488 task.shrink_to_fit(CachedDataItemType::CellData);
2489 task.shrink_to_fit(CachedDataItemType::TransientCellData);
2490 task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
2491 task.shrink_to_fit(CachedDataItemType::CellDependency);
2492 task.shrink_to_fit(CachedDataItemType::OutputDependency);
2493 task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
2494 drop(task);
2495 }
2496
2497 fn run_backend_job<'a>(
2498 self: &'a Arc<Self>,
2499 job: TurboTasksBackendJob,
2500 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2501 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2502 Box::pin(async move {
2503 match job {
2504 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2505 debug_assert!(self.should_persist());
2506
2507 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2508 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2509 let mut idle_start_listener = self.idle_start_event.listen();
2510 let mut idle_end_listener = self.idle_end_event.listen();
2511 let mut fresh_idle = true;
2512 loop {
2513 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2514 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2515 let idle_timeout = *IDLE_TIMEOUT;
2516 let (time, mut reason) =
2517 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2518 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2519 } else {
2520 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2521 };
2522
2523 let until = last_snapshot + time;
2524 if until > Instant::now() {
2525 let mut stop_listener = self.stopping_event.listen();
2526 if self.stopping.load(Ordering::Acquire) {
2527 return;
2528 }
2529 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2530 Instant::now() + idle_timeout
2531 } else {
2532 far_future()
2533 };
2534 loop {
2535 tokio::select! {
2536 _ = &mut stop_listener => {
2537 return;
2538 },
2539 _ = &mut idle_start_listener => {
2540 fresh_idle = true;
2541 idle_time = Instant::now() + idle_timeout;
2542 idle_start_listener = self.idle_start_event.listen()
2543 },
2544 _ = &mut idle_end_listener => {
2545 idle_time = until + idle_timeout;
2546 idle_end_listener = self.idle_end_event.listen()
2547 },
2548 _ = tokio::time::sleep_until(until) => {
2549 break;
2550 },
2551 _ = tokio::time::sleep_until(idle_time) => {
2552 if turbo_tasks.is_idle() {
2553 reason = "idle timeout";
2554 break;
2555 }
2556 },
2557 }
2558 }
2559 }
2560
2561 let this = self.clone();
2562 let snapshot = this.snapshot_and_persist(None, reason);
2563 if let Some((snapshot_start, new_data)) = snapshot {
2564 last_snapshot = snapshot_start;
2565 if !new_data {
2566 fresh_idle = false;
2567 continue;
2568 }
2569 let last_snapshot = last_snapshot.duration_since(self.start_time);
2570 self.last_snapshot.store(
2571 last_snapshot.as_millis().try_into().unwrap(),
2572 Ordering::Relaxed,
2573 );
2574
2575 turbo_tasks.schedule_backend_background_job(
2576 TurboTasksBackendJob::FollowUpSnapshot,
2577 );
2578 return;
2579 }
2580 }
2581 }
2582 TurboTasksBackendJob::Prefetch { data, range } => {
2583 let range: Range<usize> = if let Some(range) = range {
2584 range
2585 } else {
2586 if data.len() > 128 {
2587 let chunk_size = good_chunk_size(data.len());
2588 let chunks = data.len().div_ceil(chunk_size);
2589 for i in 0..chunks {
2590 turbo_tasks.schedule_backend_background_job(
2591 TurboTasksBackendJob::Prefetch {
2592 data: data.clone(),
2593 range: Some(
2594 (i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2595 ),
2596 },
2597 );
2598 }
2599 return;
2600 }
2601 0..data.len()
2602 };
2603
2604 let _span = trace_span!("prefetching").entered();
2605 let mut ctx = self.execute_context(turbo_tasks);
2606 for i in range {
2607 let (&task, &with_data) = data.get_index(i).unwrap();
2608 let category = if with_data {
2609 TaskDataCategory::All
2610 } else {
2611 TaskDataCategory::Meta
2612 };
2613 drop(ctx.task(task, category));
2615 }
2616 }
2617 }
2618 })
2619 }
2620
2621 fn try_read_own_task_cell(
2622 &self,
2623 task_id: TaskId,
2624 cell: CellId,
2625 options: ReadCellOptions,
2626 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2627 ) -> Result<TypedCellContent> {
2628 let mut ctx = self.execute_context(turbo_tasks);
2629 let task = ctx.task(task_id, TaskDataCategory::Data);
2630 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2631 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2632 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2633 } else {
2634 Ok(CellContent(None).into_typed(cell.type_id))
2635 }
2636 }
2637
2638 fn read_task_collectibles(
2639 &self,
2640 task_id: TaskId,
2641 collectible_type: TraitTypeId,
2642 reader_id: Option<TaskId>,
2643 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2644 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2645 let mut ctx = self.execute_context(turbo_tasks);
2646 let mut collectibles = AutoMap::default();
2647 {
2648 let mut task = ctx.task(task_id, TaskDataCategory::All);
2649 loop {
2651 let aggregation_number = get_aggregation_number(&task);
2652 if is_root_node(aggregation_number) {
2653 break;
2654 }
2655 drop(task);
2656 AggregationUpdateQueue::run(
2657 AggregationUpdateJob::UpdateAggregationNumber {
2658 task_id,
2659 base_aggregation_number: u32::MAX,
2660 distance: None,
2661 },
2662 &mut ctx,
2663 );
2664 task = ctx.task(task_id, TaskDataCategory::All);
2665 }
2666 for collectible in iter_many!(
2667 task,
2668 AggregatedCollectible {
2669 collectible
2670 } count if collectible.collectible_type == collectible_type && *count > 0 => {
2671 collectible.cell
2672 }
2673 ) {
2674 *collectibles
2675 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2676 .or_insert(0) += 1;
2677 }
2678 for (collectible, count) in iter_many!(
2679 task,
2680 Collectible {
2681 collectible
2682 } count if collectible.collectible_type == collectible_type => {
2683 (collectible.cell, *count)
2684 }
2685 ) {
2686 *collectibles
2687 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2688 .or_insert(0) += count;
2689 }
2690 if let Some(reader_id) = reader_id {
2691 let _ = task.add(CachedDataItem::CollectiblesDependent {
2692 collectible_type,
2693 task: reader_id,
2694 value: (),
2695 });
2696 }
2697 }
2698 if let Some(reader_id) = reader_id {
2699 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2700 let target = CollectiblesRef {
2701 task: task_id,
2702 collectible_type,
2703 };
2704 if reader
2705 .remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target })
2706 .is_none()
2707 {
2708 let _ = reader.add(CachedDataItem::CollectiblesDependency { target, value: () });
2709 }
2710 }
2711 collectibles
2712 }
2713
2714 fn emit_collectible(
2715 &self,
2716 collectible_type: TraitTypeId,
2717 collectible: RawVc,
2718 task_id: TaskId,
2719 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2720 ) {
2721 self.assert_valid_collectible(task_id, collectible);
2722
2723 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2724 panic!("Collectibles need to be resolved");
2725 };
2726 let cell = CellRef {
2727 task: collectible_task,
2728 cell,
2729 };
2730 operation::UpdateCollectibleOperation::run(
2731 task_id,
2732 CollectibleRef {
2733 collectible_type,
2734 cell,
2735 },
2736 1,
2737 self.execute_context(turbo_tasks),
2738 );
2739 }
2740
2741 fn unemit_collectible(
2742 &self,
2743 collectible_type: TraitTypeId,
2744 collectible: RawVc,
2745 count: u32,
2746 task_id: TaskId,
2747 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2748 ) {
2749 self.assert_valid_collectible(task_id, collectible);
2750
2751 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2752 panic!("Collectibles need to be resolved");
2753 };
2754 let cell = CellRef {
2755 task: collectible_task,
2756 cell,
2757 };
2758 operation::UpdateCollectibleOperation::run(
2759 task_id,
2760 CollectibleRef {
2761 collectible_type,
2762 cell,
2763 },
2764 -(i32::try_from(count).unwrap()),
2765 self.execute_context(turbo_tasks),
2766 );
2767 }
2768
2769 fn update_task_cell(
2770 &self,
2771 task_id: TaskId,
2772 cell: CellId,
2773 is_serializable_cell_content: bool,
2774 content: CellContent,
2775 verification_mode: VerificationMode,
2776 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2777 ) {
2778 operation::UpdateCellOperation::run(
2779 task_id,
2780 cell,
2781 content,
2782 is_serializable_cell_content,
2783 verification_mode,
2784 self.execute_context(turbo_tasks),
2785 );
2786 }
2787
2788 fn mark_own_task_as_session_dependent(
2789 &self,
2790 task_id: TaskId,
2791 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2792 ) {
2793 if !self.should_track_dependencies() {
2794 return;
2796 }
2797 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2798 let mut ctx = self.execute_context(turbo_tasks);
2799 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2800 let aggregation_number = get_aggregation_number(&task);
2801 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2802 drop(task);
2803 AggregationUpdateQueue::run(
2806 AggregationUpdateJob::UpdateAggregationNumber {
2807 task_id,
2808 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2809 distance: None,
2810 },
2811 &mut ctx,
2812 );
2813 task = ctx.task(task_id, TaskDataCategory::Meta);
2814 }
2815 if let Some(InProgressState::InProgress(box InProgressStateInner {
2816 session_dependent,
2817 ..
2818 })) = get_mut!(task, InProgress)
2819 {
2820 *session_dependent = true;
2821 }
2822 }
2823
2824 fn mark_own_task_as_finished(
2825 &self,
2826 task: TaskId,
2827 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2828 ) {
2829 let mut ctx = self.execute_context(turbo_tasks);
2830 let mut task = ctx.task(task, TaskDataCategory::Data);
2831 if let Some(InProgressState::InProgress(box InProgressStateInner {
2832 marked_as_completed,
2833 ..
2834 })) = get_mut!(task, InProgress)
2835 {
2836 *marked_as_completed = true;
2837 }
2842 }
2843
2844 fn set_own_task_aggregation_number(
2845 &self,
2846 task: TaskId,
2847 aggregation_number: u32,
2848 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2849 ) {
2850 let mut ctx = self.execute_context(turbo_tasks);
2851 AggregationUpdateQueue::run(
2852 AggregationUpdateJob::UpdateAggregationNumber {
2853 task_id: task,
2854 base_aggregation_number: aggregation_number,
2855 distance: None,
2856 },
2857 &mut ctx,
2858 );
2859 }
2860
2861 fn connect_task(
2862 &self,
2863 task: TaskId,
2864 parent_task: Option<TaskId>,
2865 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2866 ) {
2867 self.assert_not_persistent_calling_transient(parent_task, task, None);
2868 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2869 }
2870
2871 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2872 let task_id = self.transient_task_id_factory.get();
2873 let root_type = match task_type {
2874 TransientTaskType::Root(_) => RootType::RootTask,
2875 TransientTaskType::Once(_) => RootType::OnceTask,
2876 };
2877 self.transient_tasks.insert(
2878 task_id,
2879 Arc::new(match task_type {
2880 TransientTaskType::Root(f) => TransientTask::Root(f),
2881 TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2882 }),
2883 );
2884 {
2885 let mut task = self.storage.access_mut(task_id);
2886 task.add(CachedDataItem::AggregationNumber {
2887 value: AggregationNumber {
2888 base: u32::MAX,
2889 distance: 0,
2890 effective: u32::MAX,
2891 },
2892 });
2893 if self.should_track_activeness() {
2894 task.add(CachedDataItem::Activeness {
2895 value: ActivenessState::new_root(root_type, task_id),
2896 });
2897 }
2898 task.add(CachedDataItem::new_scheduled(
2899 TaskExecutionReason::Initial,
2900 move || {
2901 move || match root_type {
2902 RootType::RootTask => "Root Task".to_string(),
2903 RootType::OnceTask => "Once Task".to_string(),
2904 }
2905 },
2906 ));
2907 }
2908 #[cfg(feature = "verify_aggregation_graph")]
2909 self.root_tasks.lock().insert(task_id);
2910 task_id
2911 }
2912
2913 fn dispose_root_task(
2914 &self,
2915 task_id: TaskId,
2916 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2917 ) {
2918 #[cfg(feature = "verify_aggregation_graph")]
2919 self.root_tasks.lock().remove(&task_id);
2920
2921 let mut ctx = self.execute_context(turbo_tasks);
2922 let mut task = ctx.task(task_id, TaskDataCategory::All);
2923 let is_dirty = task.is_dirty();
2924 let has_dirty_containers = task.has_dirty_containers();
2925 if is_dirty || has_dirty_containers {
2926 if let Some(activeness_state) = get_mut!(task, Activeness) {
2927 activeness_state.unset_root_type();
2929 activeness_state.set_active_until_clean();
2930 };
2931 } else if let Some(activeness_state) = remove!(task, Activeness) {
2932 activeness_state.all_clean_event.notify(usize::MAX);
2935 }
2936 }
2937
2938 #[cfg(feature = "verify_aggregation_graph")]
2939 fn verify_aggregation_graph(
2940 &self,
2941 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2942 idle: bool,
2943 ) {
2944 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2945 return;
2946 }
2947 use std::{collections::VecDeque, env, io::stdout};
2948
2949 use crate::backend::operation::{get_uppers, is_aggregating_node};
2950
2951 let mut ctx = self.execute_context(turbo_tasks);
2952 let root_tasks = self.root_tasks.lock().clone();
2953
2954 for task_id in root_tasks.into_iter() {
2955 let mut queue = VecDeque::new();
2956 let mut visited = FxHashSet::default();
2957 let mut aggregated_nodes = FxHashSet::default();
2958 let mut collectibles = FxHashMap::default();
2959 let root_task_id = task_id;
2960 visited.insert(task_id);
2961 aggregated_nodes.insert(task_id);
2962 queue.push_back(task_id);
2963 let mut counter = 0;
2964 while let Some(task_id) = queue.pop_front() {
2965 counter += 1;
2966 if counter % 100000 == 0 {
2967 println!(
2968 "queue={}, visited={}, aggregated_nodes={}",
2969 queue.len(),
2970 visited.len(),
2971 aggregated_nodes.len()
2972 );
2973 }
2974 let task = ctx.task(task_id, TaskDataCategory::All);
2975 if idle && !self.is_idle.load(Ordering::Relaxed) {
2976 return;
2977 }
2978
2979 let uppers = get_uppers(&task);
2980 if task_id != root_task_id
2981 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2982 {
2983 panic!(
2984 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2985 {:?})",
2986 task_id,
2987 ctx.get_task_description(task_id),
2988 uppers
2989 );
2990 }
2991
2992 let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2993 for collectible in aggregated_collectibles {
2994 collectibles
2995 .entry(collectible)
2996 .or_insert_with(|| (false, Vec::new()))
2997 .1
2998 .push(task_id);
2999 }
3000
3001 let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
3002 for collectible in own_collectibles {
3003 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3004 *flag = true
3005 } else {
3006 panic!(
3007 "Task {} has a collectible {:?} that is not in any upper task",
3008 task_id, collectible
3009 );
3010 }
3011 }
3012
3013 let is_dirty = get!(task, Dirty).is_some();
3014 let has_dirty_container = task.has_dirty_containers();
3015 let should_be_in_upper = is_dirty || has_dirty_container;
3016
3017 let aggregation_number = get_aggregation_number(&task);
3018 if is_aggregating_node(aggregation_number) {
3019 aggregated_nodes.insert(task_id);
3020 }
3021 for child_id in iter_many!(task, Child { task } => task) {
3028 if visited.insert(child_id) {
3030 queue.push_back(child_id);
3031 }
3032 }
3033 drop(task);
3034
3035 if should_be_in_upper {
3036 for upper_id in uppers {
3037 let task = ctx.task(upper_id, TaskDataCategory::All);
3038 let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
3039 .is_some_and(|&dirty| dirty > 0);
3040 if !in_upper {
3041 let containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task: task_id } value => (task_id, *value));
3042 panic!(
3043 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3044 ({})\nThese dirty containers are present:\n{:#?}",
3045 task_id,
3046 ctx.get_task_description(task_id),
3047 upper_id,
3048 ctx.get_task_description(upper_id),
3049 containers,
3050 );
3051 }
3052 }
3053 }
3054 }
3055
3056 for (collectible, (flag, task_ids)) in collectibles {
3057 if !flag {
3058 use std::io::Write;
3059 let mut stdout = stdout().lock();
3060 writeln!(
3061 stdout,
3062 "{:?} that is not emitted in any child task but in these aggregated \
3063 tasks: {:#?}",
3064 collectible,
3065 task_ids
3066 .iter()
3067 .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
3068 .collect::<Vec<_>>()
3069 )
3070 .unwrap();
3071
3072 let task_id = collectible.cell.task;
3073 let mut queue = {
3074 let task = ctx.task(task_id, TaskDataCategory::All);
3075 get_uppers(&task)
3076 };
3077 let mut visited = FxHashSet::default();
3078 for &upper_id in queue.iter() {
3079 visited.insert(upper_id);
3080 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3081 }
3082 while let Some(task_id) = queue.pop() {
3083 let desc = ctx.get_task_description(task_id);
3084 let task = ctx.task(task_id, TaskDataCategory::All);
3085 let aggregated_collectible =
3086 get!(task, AggregatedCollectible { collectible })
3087 .copied()
3088 .unwrap_or_default();
3089 let uppers = get_uppers(&task);
3090 drop(task);
3091 writeln!(
3092 stdout,
3093 "upper {task_id} {desc} collectible={aggregated_collectible}"
3094 )
3095 .unwrap();
3096 if task_ids.contains(&task_id) {
3097 writeln!(
3098 stdout,
3099 "Task has an upper connection to an aggregated task that doesn't \
3100 reference it. Upper connection is invalid!"
3101 )
3102 .unwrap();
3103 }
3104 for upper_id in uppers {
3105 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3106 if !visited.contains(&upper_id) {
3107 queue.push(upper_id);
3108 }
3109 }
3110 }
3111 panic!("See stdout for more details");
3112 }
3113 }
3114 }
3115 }
3116
3117 fn assert_not_persistent_calling_transient(
3118 &self,
3119 parent_id: Option<TaskId>,
3120 child_id: TaskId,
3121 cell_id: Option<CellId>,
3122 ) {
3123 if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
3124 self.panic_persistent_calling_transient(
3125 parent_id
3126 .and_then(|id| self.lookup_task_type(id))
3127 .as_deref(),
3128 self.lookup_task_type(child_id).as_deref(),
3129 cell_id,
3130 );
3131 }
3132 }
3133
3134 fn panic_persistent_calling_transient(
3135 &self,
3136 parent: Option<&CachedTaskType>,
3137 child: Option<&CachedTaskType>,
3138 cell_id: Option<CellId>,
3139 ) {
3140 let transient_reason = if let Some(child) = child {
3141 Cow::Owned(format!(
3142 " The callee is transient because it depends on:\n{}",
3143 self.debug_trace_transient_task(child, cell_id),
3144 ))
3145 } else {
3146 Cow::Borrowed("")
3147 };
3148 panic!(
3149 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3150 parent.map_or("unknown", |t| t.get_name()),
3151 child.map_or("unknown", |t| t.get_name()),
3152 transient_reason,
3153 );
3154 }
3155
3156 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3157 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3159 let task_info = if let Some(col_task_ty) = collectible
3161 .try_get_task_id()
3162 .and_then(|t| self.lookup_task_type(t))
3163 {
3164 Cow::Owned(format!(" (return type of {col_task_ty})"))
3165 } else {
3166 Cow::Borrowed("")
3167 };
3168 panic!("Collectible{task_info} must be a ResolvedVc")
3169 };
3170 if col_task_id.is_transient() && !task_id.is_transient() {
3171 let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
3172 Cow::Owned(format!(
3173 ". The collectible is transient because it depends on:\n{}",
3174 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3175 ))
3176 } else {
3177 Cow::Borrowed("")
3178 };
3179 panic!(
3181 "Collectible is transient, transient collectibles cannot be emitted from \
3182 persistent tasks{transient_reason}",
3183 )
3184 }
3185 }
3186}
3187
3188impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3189 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3190 self.0.startup(turbo_tasks);
3191 }
3192
3193 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3194 self.0.stopping();
3195 }
3196
3197 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3198 self.0.stop(turbo_tasks);
3199 }
3200
3201 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3202 self.0.idle_start(turbo_tasks);
3203 }
3204
3205 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3206 self.0.idle_end();
3207 }
3208
3209 fn get_or_create_persistent_task(
3210 &self,
3211 task_type: CachedTaskType,
3212 parent_task: Option<TaskId>,
3213 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3214 ) -> TaskId {
3215 self.0
3216 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3217 }
3218
3219 fn get_or_create_transient_task(
3220 &self,
3221 task_type: CachedTaskType,
3222 parent_task: Option<TaskId>,
3223 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3224 ) -> TaskId {
3225 self.0
3226 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3227 }
3228
3229 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3230 self.0.invalidate_task(task_id, turbo_tasks);
3231 }
3232
3233 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3234 self.0.invalidate_tasks(tasks, turbo_tasks);
3235 }
3236
3237 fn invalidate_tasks_set(
3238 &self,
3239 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3240 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3241 ) {
3242 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3243 }
3244
3245 fn invalidate_serialization(
3246 &self,
3247 task_id: TaskId,
3248 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3249 ) {
3250 self.0.invalidate_serialization(task_id, turbo_tasks);
3251 }
3252
3253 fn get_task_description(&self, task: TaskId) -> String {
3254 self.0.get_task_description(task)
3255 }
3256
3257 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3258 self.0.task_execution_canceled(task, turbo_tasks)
3259 }
3260
3261 fn try_start_task_execution(
3262 &self,
3263 task_id: TaskId,
3264 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3265 ) -> Option<TaskExecutionSpec<'_>> {
3266 self.0.try_start_task_execution(task_id, turbo_tasks)
3267 }
3268
3269 fn task_execution_completed(
3270 &self,
3271 task_id: TaskId,
3272 result: Result<RawVc, TurboTasksExecutionError>,
3273 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3274 stateful: bool,
3275 has_invalidator: bool,
3276 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3277 ) -> bool {
3278 self.0.task_execution_completed(
3279 task_id,
3280 result,
3281 cell_counters,
3282 stateful,
3283 has_invalidator,
3284 turbo_tasks,
3285 )
3286 }
3287
3288 type BackendJob = TurboTasksBackendJob;
3289
3290 fn run_backend_job<'a>(
3291 &'a self,
3292 job: Self::BackendJob,
3293 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3294 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3295 self.0.run_backend_job(job, turbo_tasks)
3296 }
3297
3298 fn try_read_task_output(
3299 &self,
3300 task_id: TaskId,
3301 reader: Option<TaskId>,
3302 options: ReadOutputOptions,
3303 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3304 ) -> Result<Result<RawVc, EventListener>> {
3305 self.0
3306 .try_read_task_output(task_id, reader, options, turbo_tasks)
3307 }
3308
3309 fn try_read_task_cell(
3310 &self,
3311 task_id: TaskId,
3312 cell: CellId,
3313 reader: Option<TaskId>,
3314 options: ReadCellOptions,
3315 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3316 ) -> Result<Result<TypedCellContent, EventListener>> {
3317 self.0
3318 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3319 }
3320
3321 fn try_read_own_task_cell(
3322 &self,
3323 task_id: TaskId,
3324 cell: CellId,
3325 options: ReadCellOptions,
3326 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3327 ) -> Result<TypedCellContent> {
3328 self.0
3329 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3330 }
3331
3332 fn read_task_collectibles(
3333 &self,
3334 task_id: TaskId,
3335 collectible_type: TraitTypeId,
3336 reader: Option<TaskId>,
3337 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3338 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3339 self.0
3340 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3341 }
3342
3343 fn emit_collectible(
3344 &self,
3345 collectible_type: TraitTypeId,
3346 collectible: RawVc,
3347 task_id: TaskId,
3348 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3349 ) {
3350 self.0
3351 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3352 }
3353
3354 fn unemit_collectible(
3355 &self,
3356 collectible_type: TraitTypeId,
3357 collectible: RawVc,
3358 count: u32,
3359 task_id: TaskId,
3360 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3361 ) {
3362 self.0
3363 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3364 }
3365
3366 fn update_task_cell(
3367 &self,
3368 task_id: TaskId,
3369 cell: CellId,
3370 is_serializable_cell_content: bool,
3371 content: CellContent,
3372 verification_mode: VerificationMode,
3373 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3374 ) {
3375 self.0.update_task_cell(
3376 task_id,
3377 cell,
3378 is_serializable_cell_content,
3379 content,
3380 verification_mode,
3381 turbo_tasks,
3382 );
3383 }
3384
3385 fn mark_own_task_as_finished(
3386 &self,
3387 task_id: TaskId,
3388 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3389 ) {
3390 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3391 }
3392
3393 fn set_own_task_aggregation_number(
3394 &self,
3395 task: TaskId,
3396 aggregation_number: u32,
3397 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3398 ) {
3399 self.0
3400 .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3401 }
3402
3403 fn mark_own_task_as_session_dependent(
3404 &self,
3405 task: TaskId,
3406 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3407 ) {
3408 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3409 }
3410
3411 fn connect_task(
3412 &self,
3413 task: TaskId,
3414 parent_task: Option<TaskId>,
3415 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3416 ) {
3417 self.0.connect_task(task, parent_task, turbo_tasks);
3418 }
3419
3420 fn create_transient_task(
3421 &self,
3422 task_type: TransientTaskType,
3423 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3424 ) -> TaskId {
3425 self.0.create_transient_task(task_type)
3426 }
3427
3428 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3429 self.0.dispose_root_task(task_id, turbo_tasks);
3430 }
3431
3432 fn task_statistics(&self) -> &TaskStatisticsApi {
3433 &self.0.task_statistics
3434 }
3435
3436 fn is_tracking_dependencies(&self) -> bool {
3437 self.0.options.dependency_tracking
3438 }
3439}
3440
3441enum DebugTraceTransientTask {
3442 Cached {
3443 task_name: &'static str,
3444 cell_type_id: Option<ValueTypeId>,
3445 cause_self: Option<Box<DebugTraceTransientTask>>,
3446 cause_args: Vec<DebugTraceTransientTask>,
3447 },
3448 Collapsed {
3450 task_name: &'static str,
3451 cell_type_id: Option<ValueTypeId>,
3452 },
3453 Uncached {
3454 cell_type_id: Option<ValueTypeId>,
3455 },
3456}
3457
3458impl DebugTraceTransientTask {
3459 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3460 let indent = " ".repeat(level);
3461 f.write_str(&indent)?;
3462
3463 fn fmt_cell_type_id(
3464 f: &mut fmt::Formatter<'_>,
3465 cell_type_id: Option<ValueTypeId>,
3466 ) -> fmt::Result {
3467 if let Some(ty) = cell_type_id {
3468 write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3469 } else {
3470 Ok(())
3471 }
3472 }
3473
3474 match self {
3476 Self::Cached {
3477 task_name,
3478 cell_type_id,
3479 ..
3480 }
3481 | Self::Collapsed {
3482 task_name,
3483 cell_type_id,
3484 ..
3485 } => {
3486 f.write_str(task_name)?;
3487 fmt_cell_type_id(f, *cell_type_id)?;
3488 if matches!(self, Self::Collapsed { .. }) {
3489 f.write_str(" (collapsed)")?;
3490 }
3491 }
3492 Self::Uncached { cell_type_id } => {
3493 f.write_str("unknown transient task")?;
3494 fmt_cell_type_id(f, *cell_type_id)?;
3495 }
3496 }
3497 f.write_char('\n')?;
3498
3499 if let Self::Cached {
3501 cause_self,
3502 cause_args,
3503 ..
3504 } = self
3505 {
3506 if let Some(c) = cause_self {
3507 writeln!(f, "{indent} self:")?;
3508 c.fmt_indented(f, level + 1)?;
3509 }
3510 if !cause_args.is_empty() {
3511 writeln!(f, "{indent} args:")?;
3512 for c in cause_args {
3513 c.fmt_indented(f, level + 1)?;
3514 }
3515 }
3516 }
3517 Ok(())
3518 }
3519}
3520
3521impl fmt::Display for DebugTraceTransientTask {
3522 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3523 self.fmt_indented(f, 0)
3524 }
3525}
3526
3527fn far_future() -> Instant {
3529 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3534}