1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17};
18
19use anyhow::{Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use tracing::{Span, trace_span};
27use turbo_tasks::{
28 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
29 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
30 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, ValueTypeId,
31 backend::{
32 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
33 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
34 },
35 event::{Event, EventListener},
36 message_queue::TimingEvent,
37 registry::get_value_type,
38 scope::scope_and_block,
39 task_statistics::TaskStatisticsApi,
40 trace::TraceRawVcs,
41 turbo_tasks,
42 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
43};
44
45pub use self::{
46 operation::AnyOperation,
47 storage::{SpecificTaskDataCategory, TaskDataCategory},
48};
49#[cfg(feature = "trace_task_dirty")]
50use crate::backend::operation::TaskDirtyCause;
51use crate::{
52 backend::{
53 operation::{
54 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
55 CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
56 ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
57 TaskGuard, connect_children, get_aggregation_number, get_uppers, is_root_node,
58 make_task_dirty_internal, prepare_new_children,
59 },
60 storage::Storage,
61 storage_schema::{TaskStorage, TaskStorageAccessors},
62 },
63 backing_storage::BackingStorage,
64 data::{
65 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
66 InProgressState, InProgressStateInner, OutputValue, RootType,
67 },
68 utils::{
69 bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
70 ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
71 },
72};
73
74const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
78
79const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
80
81static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
84 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
85 .ok()
86 .and_then(|v| v.parse::<u64>().ok())
87 .map(Duration::from_millis)
88 .unwrap_or(Duration::from_secs(2))
89});
90
91struct SnapshotRequest {
92 snapshot_requested: bool,
93 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
94}
95
96impl SnapshotRequest {
97 fn new() -> Self {
98 Self {
99 snapshot_requested: false,
100 suspended_operations: FxHashSet::default(),
101 }
102 }
103}
104
105type TransientTaskOnce =
106 Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
107
108pub enum TransientTask {
109 Root(TransientTaskRoot),
115
116 Once(TransientTaskOnce),
125}
126
127pub enum StorageMode {
128 ReadOnly,
130 ReadWrite,
133 ReadWriteOnShutdown,
136}
137
138pub struct BackendOptions {
139 pub dependency_tracking: bool,
144
145 pub active_tracking: bool,
151
152 pub storage_mode: Option<StorageMode>,
154
155 pub num_workers: Option<usize>,
158
159 pub small_preallocation: bool,
161}
162
163impl Default for BackendOptions {
164 fn default() -> Self {
165 Self {
166 dependency_tracking: true,
167 active_tracking: true,
168 storage_mode: Some(StorageMode::ReadWrite),
169 num_workers: None,
170 small_preallocation: false,
171 }
172 }
173}
174
175pub enum TurboTasksBackendJob {
176 InitialSnapshot,
177 FollowUpSnapshot,
178}
179
180pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
181
182type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
183
184struct TurboTasksBackendInner<B: BackingStorage> {
185 options: BackendOptions,
186
187 start_time: Instant,
188
189 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
190 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
191
192 persisted_task_cache_log: Option<TaskCacheLog>,
193 task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
194 transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
195
196 storage: Storage,
197
198 local_is_partial: AtomicBool,
200
201 in_progress_operations: AtomicUsize,
207
208 snapshot_request: Mutex<SnapshotRequest>,
209 operations_suspended: Condvar,
213 snapshot_completed: Condvar,
216 last_snapshot: AtomicU64,
218
219 stopping: AtomicBool,
220 stopping_event: Event,
221 idle_start_event: Event,
222 idle_end_event: Event,
223 #[cfg(feature = "verify_aggregation_graph")]
224 is_idle: AtomicBool,
225
226 task_statistics: TaskStatisticsApi,
227
228 backing_storage: B,
229
230 #[cfg(feature = "verify_aggregation_graph")]
231 root_tasks: Mutex<FxHashSet<TaskId>>,
232}
233
234impl<B: BackingStorage> TurboTasksBackend<B> {
235 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
236 Self(Arc::new(TurboTasksBackendInner::new(
237 options,
238 backing_storage,
239 )))
240 }
241
242 pub fn backing_storage(&self) -> &B {
243 &self.0.backing_storage
244 }
245}
246
247impl<B: BackingStorage> TurboTasksBackendInner<B> {
248 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
249 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
250 let need_log = matches!(
251 options.storage_mode,
252 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
253 );
254 if !options.dependency_tracking {
255 options.active_tracking = false;
256 }
257 let small_preallocation = options.small_preallocation;
258 let next_task_id = backing_storage
259 .next_free_task_id()
260 .expect("Failed to get task id");
261 Self {
262 options,
263 start_time: Instant::now(),
264 persisted_task_id_factory: IdFactoryWithReuse::new(
265 next_task_id,
266 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
267 ),
268 transient_task_id_factory: IdFactoryWithReuse::new(
269 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
270 TaskId::MAX,
271 ),
272 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
273 task_cache: BiMap::new(),
274 transient_tasks: FxDashMap::default(),
275 local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
276 storage: Storage::new(shard_amount, small_preallocation),
277 in_progress_operations: AtomicUsize::new(0),
278 snapshot_request: Mutex::new(SnapshotRequest::new()),
279 operations_suspended: Condvar::new(),
280 snapshot_completed: Condvar::new(),
281 last_snapshot: AtomicU64::new(0),
282 stopping: AtomicBool::new(false),
283 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
284 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
285 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
286 #[cfg(feature = "verify_aggregation_graph")]
287 is_idle: AtomicBool::new(false),
288 task_statistics: TaskStatisticsApi::default(),
289 backing_storage,
290 #[cfg(feature = "verify_aggregation_graph")]
291 root_tasks: Default::default(),
292 }
293 }
294
295 fn execute_context<'a>(
296 &'a self,
297 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
298 ) -> impl ExecuteContext<'a> {
299 ExecuteContextImpl::new(self, turbo_tasks)
300 }
301
302 unsafe fn execute_context_with_tx<'e, 'tx>(
306 &'e self,
307 tx: Option<&'e B::ReadTransaction<'tx>>,
308 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
309 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
310 where
311 'tx: 'e,
312 {
313 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
315 }
316
317 fn suspending_requested(&self) -> bool {
318 self.should_persist()
319 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
320 }
321
322 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
323 #[cold]
324 fn operation_suspend_point_cold<B: BackingStorage>(
325 this: &TurboTasksBackendInner<B>,
326 suspend: impl FnOnce() -> AnyOperation,
327 ) {
328 let operation = Arc::new(suspend());
329 let mut snapshot_request = this.snapshot_request.lock();
330 if snapshot_request.snapshot_requested {
331 snapshot_request
332 .suspended_operations
333 .insert(operation.clone().into());
334 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
335 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
336 if value == SNAPSHOT_REQUESTED_BIT {
337 this.operations_suspended.notify_all();
338 }
339 this.snapshot_completed
340 .wait_while(&mut snapshot_request, |snapshot_request| {
341 snapshot_request.snapshot_requested
342 });
343 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
344 snapshot_request
345 .suspended_operations
346 .remove(&operation.into());
347 }
348 }
349
350 if self.suspending_requested() {
351 operation_suspend_point_cold(self, suspend);
352 }
353 }
354
355 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
356 if !self.should_persist() {
357 return OperationGuard { backend: None };
358 }
359 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
360 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
361 let mut snapshot_request = self.snapshot_request.lock();
362 if snapshot_request.snapshot_requested {
363 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
364 if value == SNAPSHOT_REQUESTED_BIT {
365 self.operations_suspended.notify_all();
366 }
367 self.snapshot_completed
368 .wait_while(&mut snapshot_request, |snapshot_request| {
369 snapshot_request.snapshot_requested
370 });
371 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
372 }
373 }
374 OperationGuard {
375 backend: Some(self),
376 }
377 }
378
379 fn should_persist(&self) -> bool {
380 matches!(
381 self.options.storage_mode,
382 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
383 )
384 }
385
386 fn should_restore(&self) -> bool {
387 self.options.storage_mode.is_some()
388 }
389
390 fn should_track_dependencies(&self) -> bool {
391 self.options.dependency_tracking
392 }
393
394 fn should_track_activeness(&self) -> bool {
395 self.options.active_tracking
396 }
397
398 fn track_cache_hit(&self, task_type: &CachedTaskType) {
399 self.task_statistics
400 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
401 }
402
403 fn track_cache_miss(&self, task_type: &CachedTaskType) {
404 self.task_statistics
405 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
406 }
407}
408
409pub(crate) struct OperationGuard<'a, B: BackingStorage> {
410 backend: Option<&'a TurboTasksBackendInner<B>>,
411}
412
413impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
414 fn drop(&mut self) {
415 if let Some(backend) = self.backend {
416 let fetch_sub = backend
417 .in_progress_operations
418 .fetch_sub(1, Ordering::AcqRel);
419 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
420 backend.operations_suspended.notify_all();
421 }
422 }
423 }
424}
425
426struct TaskExecutionCompletePrepareResult {
428 pub new_children: FxHashSet<TaskId>,
429 pub is_now_immutable: bool,
430 #[cfg(feature = "verify_determinism")]
431 pub no_output_set: bool,
432 pub new_output: Option<OutputValue>,
433 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
434}
435
436impl<B: BackingStorage> TurboTasksBackendInner<B> {
438 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
442 &'l self,
443 tx: Option<&'l B::ReadTransaction<'tx>>,
444 parent_task: Option<TaskId>,
445 child_task: TaskId,
446 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
447 ) {
448 operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
449 self.execute_context_with_tx(tx, turbo_tasks)
450 });
451 }
452
453 fn connect_child(
454 &self,
455 parent_task: Option<TaskId>,
456 child_task: TaskId,
457 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
458 ) {
459 operation::ConnectChildOperation::run(
460 parent_task,
461 child_task,
462 self.execute_context(turbo_tasks),
463 );
464 }
465
466 fn try_read_task_output(
467 self: &Arc<Self>,
468 task_id: TaskId,
469 reader: Option<TaskId>,
470 options: ReadOutputOptions,
471 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
472 ) -> Result<Result<RawVc, EventListener>> {
473 self.assert_not_persistent_calling_transient(reader, task_id, None);
474
475 let mut ctx = self.execute_context(turbo_tasks);
476 let need_reader_task = if self.should_track_dependencies()
477 && !matches!(options.tracking, ReadTracking::Untracked)
478 && reader.is_some_and(|reader_id| reader_id != task_id)
479 && let Some(reader_id) = reader
480 && reader_id != task_id
481 {
482 Some(reader_id)
483 } else {
484 None
485 };
486 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
487 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
491 (task, Some(reader))
492 } else {
493 (ctx.task(task_id, TaskDataCategory::All), None)
494 };
495
496 fn listen_to_done_event<B: BackingStorage>(
497 this: &TurboTasksBackendInner<B>,
498 reader: Option<TaskId>,
499 tracking: ReadTracking,
500 done_event: &Event,
501 ) -> EventListener {
502 done_event.listen_with_note(move || {
503 let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
504 move || {
505 if let Some(reader_desc) = reader_desc.as_ref() {
506 format!("try_read_task_output from {} ({})", reader_desc(), tracking)
507 } else {
508 format!("try_read_task_output ({})", tracking)
509 }
510 }
511 })
512 }
513
514 fn check_in_progress<B: BackingStorage>(
515 this: &TurboTasksBackendInner<B>,
516 task: &impl TaskGuard,
517 reader: Option<TaskId>,
518 tracking: ReadTracking,
519 ctx: &impl ExecuteContext<'_>,
520 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
521 {
522 match task.get_in_progress() {
523 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
524 listen_to_done_event(this, reader, tracking, done_event),
525 ))),
526 Some(InProgressState::InProgress(box InProgressStateInner {
527 done_event, ..
528 })) => Some(Ok(Err(listen_to_done_event(
529 this, reader, tracking, done_event,
530 )))),
531 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
532 "{} was canceled",
533 ctx.get_task_description(task.id())
534 ))),
535 None => None,
536 }
537 }
538
539 if matches!(options.consistency, ReadConsistency::Strong) {
540 loop {
542 let aggregation_number = get_aggregation_number(&task);
543 if is_root_node(aggregation_number) {
544 break;
545 }
546 drop(task);
547 drop(reader_task);
548 {
549 let _span = tracing::trace_span!(
550 "make root node for strongly consistent read",
551 %task_id
552 )
553 .entered();
554 AggregationUpdateQueue::run(
555 AggregationUpdateJob::UpdateAggregationNumber {
556 task_id,
557 base_aggregation_number: u32::MAX,
558 distance: None,
559 },
560 &mut ctx,
561 );
562 }
563 (task, reader_task) = if let Some(reader_id) = need_reader_task {
564 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
566 (task, Some(reader))
567 } else {
568 (ctx.task(task_id, TaskDataCategory::All), None)
569 }
570 }
571
572 let is_dirty = task.is_dirty();
573
574 let has_dirty_containers = task.has_dirty_containers();
576 if has_dirty_containers || is_dirty.is_some() {
577 let activeness = task.get_activeness_mut();
578 let mut task_ids_to_schedule: Vec<_> = Vec::new();
579 let activeness = if let Some(activeness) = activeness {
581 activeness.set_active_until_clean();
585 activeness
586 } else {
587 if ctx.should_track_activeness() {
591 task_ids_to_schedule = task.dirty_containers().collect();
593 task_ids_to_schedule.push(task_id);
594 }
595 let activeness =
596 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
597 activeness.set_active_until_clean();
598 activeness
599 };
600 let listener = activeness.all_clean_event.listen_with_note(move || {
601 let this = self.clone();
602 let tt = turbo_tasks.pin();
603 move || {
604 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
605 let mut ctx = this.execute_context(tt);
606 let mut visited = FxHashSet::default();
607 fn indent(s: &str) -> String {
608 s.split_inclusive('\n')
609 .flat_map(|line: &str| [" ", line].into_iter())
610 .collect::<String>()
611 }
612 fn get_info(
613 ctx: &mut impl ExecuteContext<'_>,
614 task_id: TaskId,
615 parent_and_count: Option<(TaskId, i32)>,
616 visited: &mut FxHashSet<TaskId>,
617 ) -> String {
618 let task = ctx.task(task_id, TaskDataCategory::All);
619 let is_dirty = task.is_dirty();
620 let in_progress =
621 task.get_in_progress()
622 .map_or("not in progress", |p| match p {
623 InProgressState::InProgress(_) => "in progress",
624 InProgressState::Scheduled { .. } => "scheduled",
625 InProgressState::Canceled => "canceled",
626 });
627 let activeness = task.get_activeness().map_or_else(
628 || "not active".to_string(),
629 |activeness| format!("{activeness:?}"),
630 );
631 let aggregation_number = get_aggregation_number(&task);
632 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
633 {
634 let uppers = get_uppers(&task);
635 !uppers.contains(&parent_task_id)
636 } else {
637 false
638 };
639
640 let has_dirty_containers = task.has_dirty_containers();
642
643 let task_description = ctx.get_task_description(task_id);
644 let is_dirty_label = if let Some(parent_priority) = is_dirty {
645 format!(", dirty({parent_priority})")
646 } else {
647 String::new()
648 };
649 let has_dirty_containers_label = if has_dirty_containers {
650 ", dirty containers"
651 } else {
652 ""
653 };
654 let count = if let Some((_, count)) = parent_and_count {
655 format!(" {count}")
656 } else {
657 String::new()
658 };
659 let mut info = format!(
660 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
661 {in_progress}, \
662 {activeness}{is_dirty_label}{has_dirty_containers_label})",
663 );
664 let children: Vec<_> = task.dirty_containers_with_count().collect();
665 drop(task);
666
667 if missing_upper {
668 info.push_str("\n ERROR: missing upper connection");
669 }
670
671 if has_dirty_containers || !children.is_empty() {
672 writeln!(info, "\n dirty tasks:").unwrap();
673
674 for (child_task_id, count) in children {
675 let task_description = ctx.get_task_description(child_task_id);
676 if visited.insert(child_task_id) {
677 let child_info = get_info(
678 ctx,
679 child_task_id,
680 Some((task_id, count)),
681 visited,
682 );
683 info.push_str(&indent(&child_info));
684 if !info.ends_with('\n') {
685 info.push('\n');
686 }
687 } else {
688 writeln!(
689 info,
690 " {child_task_id} {task_description} {count} \
691 (already visited)"
692 )
693 .unwrap();
694 }
695 }
696 }
697 info
698 }
699 let info = get_info(&mut ctx, task_id, None, &mut visited);
700 format!(
701 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
702 )
703 }
704 });
705 drop(reader_task);
706 drop(task);
707 if !task_ids_to_schedule.is_empty() {
708 let mut queue = AggregationUpdateQueue::new();
709 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
710 queue.execute(&mut ctx);
711 }
712
713 return Ok(Err(listener));
714 }
715 }
716
717 if let Some(value) = check_in_progress(self, &task, reader, options.tracking, &ctx) {
718 return value;
719 }
720
721 if let Some(output) = task.get_output() {
722 let result = match output {
723 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
724 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
725 OutputValue::Error(error) => Err(error
726 .with_task_context(ctx.get_task_description(task_id), Some(task_id))
727 .into()),
728 };
729 if let Some(mut reader_task) = reader_task
730 && options.tracking.should_track(result.is_err())
731 && (!task.immutable() || cfg!(feature = "verify_immutable"))
732 {
733 #[cfg(feature = "trace_task_output_dependencies")]
734 let _span = tracing::trace_span!(
735 "add output dependency",
736 task = %task_id,
737 dependent_task = ?reader
738 )
739 .entered();
740 let mut queue = LeafDistanceUpdateQueue::new();
741 let reader = reader.unwrap();
742 if task.add_output_dependent(reader) {
743 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
745 let reader_leaf_distance =
746 reader_task.get_leaf_distance().copied().unwrap_or_default();
747 if reader_leaf_distance.distance <= leaf_distance.distance {
748 queue.push(
749 reader,
750 leaf_distance.distance,
751 leaf_distance.max_distance_in_buffer,
752 );
753 }
754 }
755
756 drop(task);
757
758 if !reader_task.remove_outdated_output_dependencies(&task_id) {
764 let _ = reader_task.add_output_dependencies(task_id);
765 }
766 drop(reader_task);
767
768 queue.execute(&mut ctx);
769 }
770
771 return result;
772 }
773 drop(reader_task);
774
775 let note = move || {
776 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
777 move || {
778 if let Some(reader_desc) = reader_desc.as_ref() {
779 format!("try_read_task_output (recompute) from {}", (reader_desc)())
780 } else {
781 "try_read_task_output (recompute, untracked)".to_string()
782 }
783 }
784 };
785
786 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
788 TaskExecutionReason::OutputNotAvailable,
789 || self.get_task_desc_fn(task_id),
790 note,
791 );
792 let old = task.set_in_progress(in_progress_state);
795 debug_assert!(old.is_none(), "InProgress already exists");
796 ctx.schedule_task(task, TaskPriority::Initial);
797
798 Ok(Err(listener))
799 }
800
801 fn try_read_task_cell(
802 &self,
803 task_id: TaskId,
804 reader: Option<TaskId>,
805 cell: CellId,
806 options: ReadCellOptions,
807 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
808 ) -> Result<Result<TypedCellContent, EventListener>> {
809 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
810
811 fn add_cell_dependency(
812 task_id: TaskId,
813 mut task: impl TaskGuard,
814 reader: Option<TaskId>,
815 reader_task: Option<impl TaskGuard>,
816 cell: CellId,
817 key: Option<u64>,
818 ) {
819 if let Some(mut reader_task) = reader_task
820 && (!task.immutable() || cfg!(feature = "verify_immutable"))
821 {
822 let reader = reader.unwrap();
823 let _ = task.add_cell_dependents((cell, key, reader));
824 drop(task);
825
826 let target = CellRef {
832 task: task_id,
833 cell,
834 };
835 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
836 let _ = reader_task.add_cell_dependencies((target, key));
837 }
838 drop(reader_task);
839 }
840 }
841
842 let ReadCellOptions {
843 is_serializable_cell_content,
844 tracking,
845 final_read_hint,
846 } = options;
847
848 let mut ctx = self.execute_context(turbo_tasks);
849 let (mut task, reader_task) = if self.should_track_dependencies()
850 && !matches!(tracking, ReadCellTracking::Untracked)
851 && let Some(reader_id) = reader
852 && reader_id != task_id
853 {
854 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
858 (task, Some(reader))
859 } else {
860 (ctx.task(task_id, TaskDataCategory::All), None)
861 };
862
863 let content = if final_read_hint {
864 task.remove_cell_data(is_serializable_cell_content, cell)
865 } else {
866 task.get_cell_data(is_serializable_cell_content, cell)
867 };
868 if let Some(content) = content {
869 if tracking.should_track(false) {
870 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
871 }
872 return Ok(Ok(TypedCellContent(
873 cell.type_id,
874 CellContent(Some(content.reference)),
875 )));
876 }
877
878 let in_progress = task.get_in_progress();
879 if matches!(
880 in_progress,
881 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
882 ) {
883 return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
884 }
885 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
886
887 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
889 let Some(max_id) = max_id else {
890 if tracking.should_track(true) {
891 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
892 }
893 bail!(
894 "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
895 ctx.get_task_description(task_id)
896 );
897 };
898 if cell.index >= max_id {
899 if tracking.should_track(true) {
900 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
901 }
902 bail!(
903 "Cell {cell:?} no longer exists in task {} (index out of bounds)",
904 ctx.get_task_description(task_id)
905 );
906 }
907 drop(reader_task);
908
909 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
914 if !new_listener {
915 return Ok(Err(listener));
916 }
917
918 let _span = tracing::trace_span!(
919 "recomputation",
920 cell_type = get_value_type(cell.type_id).global_name,
921 cell_index = cell.index
922 )
923 .entered();
924
925 if is_cancelled {
927 bail!("{} was canceled", ctx.get_task_description(task_id));
928 }
929 let _ = task.add_scheduled(TaskExecutionReason::CellNotAvailable, || {
930 self.get_task_desc_fn(task_id)
931 });
932 ctx.schedule_task(task, TaskPriority::initial());
933
934 Ok(Err(listener))
935 }
936
937 fn listen_to_cell(
938 &self,
939 task: &mut impl TaskGuard,
940 task_id: TaskId,
941 reader: Option<TaskId>,
942 cell: CellId,
943 ) -> (EventListener, bool) {
944 let note = move || {
945 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
946 move || {
947 if let Some(reader_desc) = reader_desc.as_ref() {
948 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
949 } else {
950 "try_read_task_cell (in progress, untracked)".to_string()
951 }
952 }
953 };
954 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
955 let listener = in_progress.event.listen_with_note(note);
957 return (listener, false);
958 }
959 let in_progress = InProgressCellState::new(task_id, cell);
960 let listener = in_progress.event.listen_with_note(note);
961 let old = task.insert_in_progress_cells(cell, in_progress);
962 debug_assert!(old.is_none(), "InProgressCell already exists");
963 (listener, true)
964 }
965
966 fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
967 if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
968 return Some(task_type);
969 }
970 if self.should_restore()
971 && self.local_is_partial.load(Ordering::Acquire)
972 && !task_id.is_transient()
973 && let Some(task_type) = unsafe {
974 self.backing_storage
975 .reverse_lookup_task_cache(None, task_id)
976 .expect("Failed to lookup task type")
977 }
978 {
979 let _ = self.task_cache.try_insert(task_type.clone(), task_id);
980 return Some(task_type);
981 }
982 None
983 }
984
985 fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
986 let task_type = self.lookup_task_type(task_id);
987 move || {
988 task_type.as_ref().map_or_else(
989 || format!("{task_id:?} transient"),
990 |task_type| format!("{task_id:?} {task_type}"),
991 )
992 }
993 }
994
995 fn snapshot_and_persist(
996 &self,
997 parent_span: Option<tracing::Id>,
998 reason: &str,
999 ) -> Option<(Instant, bool)> {
1000 let snapshot_span =
1001 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1002 .entered();
1003 let start = Instant::now();
1004 debug_assert!(self.should_persist());
1005
1006 let suspended_operations;
1007 {
1008 let _span = tracing::info_span!("blocking").entered();
1009 let mut snapshot_request = self.snapshot_request.lock();
1010 snapshot_request.snapshot_requested = true;
1011 let active_operations = self
1012 .in_progress_operations
1013 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1014 if active_operations != 0 {
1015 self.operations_suspended
1016 .wait_while(&mut snapshot_request, |_| {
1017 self.in_progress_operations.load(Ordering::Relaxed)
1018 != SNAPSHOT_REQUESTED_BIT
1019 });
1020 }
1021 suspended_operations = snapshot_request
1022 .suspended_operations
1023 .iter()
1024 .map(|op| op.arc().clone())
1025 .collect::<Vec<_>>();
1026 }
1027 self.storage.start_snapshot();
1028 let mut persisted_task_cache_log = self
1029 .persisted_task_cache_log
1030 .as_ref()
1031 .map(|l| l.take(|i| i))
1032 .unwrap_or_default();
1033 let mut snapshot_request = self.snapshot_request.lock();
1034 snapshot_request.snapshot_requested = false;
1035 self.in_progress_operations
1036 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1037 self.snapshot_completed.notify_all();
1038 let snapshot_time = Instant::now();
1039 drop(snapshot_request);
1040
1041 let preprocess = |task_id: TaskId, inner: &TaskStorage| {
1042 if task_id.is_transient() {
1043 return (None, None);
1044 }
1045
1046 let meta_restored = inner.flags.meta_restored();
1047 let data_restored = inner.flags.data_restored();
1048
1049 let meta = meta_restored.then(|| inner.clone_meta_snapshot());
1051 let data = data_restored.then(|| inner.clone_data_snapshot());
1052
1053 (meta, data)
1054 };
1055 let process =
1056 |task_id: TaskId, (meta, data): (Option<TaskStorage>, Option<TaskStorage>)| {
1057 (
1062 task_id,
1063 meta.map(|d| {
1064 self.backing_storage
1065 .serialize(task_id, &d, SpecificTaskDataCategory::Meta)
1066 }),
1067 data.map(|d| {
1068 self.backing_storage
1069 .serialize(task_id, &d, SpecificTaskDataCategory::Data)
1070 }),
1071 )
1072 };
1073 let process_snapshot = |task_id: TaskId, inner: Box<TaskStorage>| {
1074 if task_id.is_transient() {
1075 return (task_id, None, None);
1076 }
1077
1078 (
1080 task_id,
1081 inner.flags.meta_modified().then(|| {
1082 self.backing_storage
1083 .serialize(task_id, &inner, SpecificTaskDataCategory::Meta)
1084 }),
1085 inner.flags.data_modified().then(|| {
1086 self.backing_storage
1087 .serialize(task_id, &inner, SpecificTaskDataCategory::Data)
1088 }),
1089 )
1090 };
1091
1092 let snapshot = self
1093 .storage
1094 .take_snapshot(&preprocess, &process, &process_snapshot);
1095
1096 #[cfg(feature = "print_cache_item_size")]
1097 #[derive(Default)]
1098 struct TaskCacheStats {
1099 data: usize,
1100 data_compressed: usize,
1101 data_count: usize,
1102 meta: usize,
1103 meta_compressed: usize,
1104 meta_count: usize,
1105 }
1106 #[cfg(feature = "print_cache_item_size")]
1107 impl TaskCacheStats {
1108 fn compressed_size(data: &[u8]) -> Result<usize> {
1109 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1110 data,
1111 &mut Vec::new(),
1112 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1113 )?)
1114 }
1115
1116 fn add_data(&mut self, data: &[u8]) {
1117 self.data += data.len();
1118 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1119 self.data_count += 1;
1120 }
1121
1122 fn add_meta(&mut self, data: &[u8]) {
1123 self.meta += data.len();
1124 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1125 self.meta_count += 1;
1126 }
1127 }
1128 #[cfg(feature = "print_cache_item_size")]
1129 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1130 Mutex::new(FxHashMap::default());
1131
1132 let task_snapshots = snapshot
1133 .into_iter()
1134 .filter_map(|iter| {
1135 let mut iter = iter
1136 .filter_map(
1137 |(task_id, meta, data): (
1138 _,
1139 Option<Result<SmallVec<_>>>,
1140 Option<Result<SmallVec<_>>>,
1141 )| {
1142 let meta = match meta {
1143 Some(Ok(meta)) => {
1144 #[cfg(feature = "print_cache_item_size")]
1145 task_cache_stats
1146 .lock()
1147 .entry(self.get_task_description(task_id))
1148 .or_default()
1149 .add_meta(&meta);
1150 Some(meta)
1151 }
1152 None => None,
1153 Some(Err(err)) => {
1154 println!(
1155 "Serializing task {} failed (meta): {:?}",
1156 self.get_task_description(task_id),
1157 err
1158 );
1159 None
1160 }
1161 };
1162 let data = match data {
1163 Some(Ok(data)) => {
1164 #[cfg(feature = "print_cache_item_size")]
1165 task_cache_stats
1166 .lock()
1167 .entry(self.get_task_description(task_id))
1168 .or_default()
1169 .add_data(&data);
1170 Some(data)
1171 }
1172 None => None,
1173 Some(Err(err)) => {
1174 println!(
1175 "Serializing task {} failed (data): {:?}",
1176 self.get_task_description(task_id),
1177 err
1178 );
1179 None
1180 }
1181 };
1182 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1183 },
1184 )
1185 .peekable();
1186 iter.peek().is_some().then_some(iter)
1187 })
1188 .collect::<Vec<_>>();
1189
1190 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1191
1192 drop(snapshot_span);
1193
1194 if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1195 return Some((snapshot_time, false));
1196 }
1197
1198 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1199 {
1200 if let Err(err) = self.backing_storage.save_snapshot(
1201 suspended_operations,
1202 persisted_task_cache_log,
1203 task_snapshots,
1204 ) {
1205 println!("Persisting failed: {err:?}");
1206 return None;
1207 }
1208 #[cfg(feature = "print_cache_item_size")]
1209 {
1210 let mut task_cache_stats = task_cache_stats
1211 .into_inner()
1212 .into_iter()
1213 .collect::<Vec<_>>();
1214 if !task_cache_stats.is_empty() {
1215 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1216 (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1217 .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1218 });
1219 println!("Task cache stats:");
1220 for (task_desc, stats) in task_cache_stats {
1221 use turbo_tasks::util::FormatBytes;
1222
1223 println!(
1224 " {} ({}) {task_desc} = {} ({}) meta {} x {} ({}), {} ({}) data {} x \
1225 {} ({})",
1226 FormatBytes(stats.data_compressed + stats.meta_compressed),
1227 FormatBytes(stats.data + stats.meta),
1228 FormatBytes(stats.meta_compressed),
1229 FormatBytes(stats.meta),
1230 stats.meta_count,
1231 FormatBytes(
1232 stats
1233 .meta_compressed
1234 .checked_div(stats.meta_count)
1235 .unwrap_or(0)
1236 ),
1237 FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1238 FormatBytes(stats.data_compressed),
1239 FormatBytes(stats.data),
1240 stats.data_count,
1241 FormatBytes(
1242 stats
1243 .data_compressed
1244 .checked_div(stats.data_count)
1245 .unwrap_or(0)
1246 ),
1247 FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1248 );
1249 }
1250 }
1251 }
1252 }
1253
1254 let elapsed = start.elapsed();
1255 if elapsed > Duration::from_secs(10) {
1257 turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1258 "Finished writing to filesystem cache".to_string(),
1259 elapsed,
1260 )));
1261 }
1262
1263 Some((snapshot_time, true))
1264 }
1265
1266 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1267 if self.should_restore() {
1268 let uncompleted_operations = self
1272 .backing_storage
1273 .uncompleted_operations()
1274 .expect("Failed to get uncompleted operations");
1275 if !uncompleted_operations.is_empty() {
1276 let mut ctx = self.execute_context(turbo_tasks);
1277 for op in uncompleted_operations {
1278 op.execute(&mut ctx);
1279 }
1280 }
1281 }
1282
1283 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1286 let _span = trace_span!("persisting background job").entered();
1288 let _span = tracing::info_span!("thread").entered();
1289 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1290 }
1291 }
1292
1293 fn stopping(&self) {
1294 self.stopping.store(true, Ordering::Release);
1295 self.stopping_event.notify(usize::MAX);
1296 }
1297
1298 #[allow(unused_variables)]
1299 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1300 #[cfg(feature = "verify_aggregation_graph")]
1301 {
1302 self.is_idle.store(false, Ordering::Release);
1303 self.verify_aggregation_graph(turbo_tasks, false);
1304 }
1305 if self.should_persist() {
1306 self.snapshot_and_persist(Span::current().into(), "stop");
1307 }
1308 self.task_cache.drop_contents();
1309 drop_contents(&self.transient_tasks);
1310 self.storage.drop_contents();
1311 if let Err(err) = self.backing_storage.shutdown() {
1312 println!("Shutting down failed: {err}");
1313 }
1314 }
1315
1316 #[allow(unused_variables)]
1317 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1318 self.idle_start_event.notify(usize::MAX);
1319
1320 #[cfg(feature = "verify_aggregation_graph")]
1321 {
1322 use tokio::select;
1323
1324 self.is_idle.store(true, Ordering::Release);
1325 let this = self.clone();
1326 let turbo_tasks = turbo_tasks.pin();
1327 tokio::task::spawn(async move {
1328 select! {
1329 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1330 }
1332 _ = this.idle_end_event.listen() => {
1333 return;
1334 }
1335 }
1336 if !this.is_idle.load(Ordering::Relaxed) {
1337 return;
1338 }
1339 this.verify_aggregation_graph(&*turbo_tasks, true);
1340 });
1341 }
1342 }
1343
1344 fn idle_end(&self) {
1345 #[cfg(feature = "verify_aggregation_graph")]
1346 self.is_idle.store(false, Ordering::Release);
1347 self.idle_end_event.notify(usize::MAX);
1348 }
1349
1350 fn get_or_create_persistent_task(
1351 &self,
1352 task_type: CachedTaskType,
1353 parent_task: Option<TaskId>,
1354 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1355 ) -> TaskId {
1356 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1357 self.track_cache_hit(&task_type);
1358 self.connect_child(parent_task, task_id, turbo_tasks);
1359 return task_id;
1360 }
1361
1362 let check_backing_storage =
1363 self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1364 let tx = check_backing_storage
1365 .then(|| self.backing_storage.start_read_transaction())
1366 .flatten();
1367 let task_id = {
1368 if let Some(task_id) = unsafe {
1370 check_backing_storage
1371 .then(|| {
1372 self.backing_storage
1373 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1374 .expect("Failed to lookup task id")
1375 })
1376 .flatten()
1377 } {
1378 self.track_cache_hit(&task_type);
1379 let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1380 task_id
1381 } else {
1382 let task_type = Arc::new(task_type);
1383 let task_id = self.persisted_task_id_factory.get();
1384 let task_id = if let Err(existing_task_id) =
1385 self.task_cache.try_insert(task_type.clone(), task_id)
1386 {
1387 self.track_cache_hit(&task_type);
1388 unsafe {
1390 self.persisted_task_id_factory.reuse(task_id);
1391 }
1392 existing_task_id
1393 } else {
1394 self.track_cache_miss(&task_type);
1395 task_id
1396 };
1397 if let Some(log) = &self.persisted_task_cache_log {
1398 log.lock(task_id).push((task_type, task_id));
1399 }
1400 task_id
1401 }
1402 };
1403
1404 unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1406
1407 task_id
1408 }
1409
1410 fn get_or_create_transient_task(
1411 &self,
1412 task_type: CachedTaskType,
1413 parent_task: Option<TaskId>,
1414 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1415 ) -> TaskId {
1416 if let Some(parent_task) = parent_task
1417 && !parent_task.is_transient()
1418 {
1419 self.panic_persistent_calling_transient(
1420 self.lookup_task_type(parent_task).as_deref(),
1421 Some(&task_type),
1422 None,
1423 );
1424 }
1425 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1426 self.track_cache_hit(&task_type);
1427 self.connect_child(parent_task, task_id, turbo_tasks);
1428 return task_id;
1429 }
1430
1431 let task_type = Arc::new(task_type);
1432 let task_id = self.transient_task_id_factory.get();
1433 if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1434 self.track_cache_hit(&task_type);
1435 unsafe {
1437 self.transient_task_id_factory.reuse(task_id);
1438 }
1439 self.connect_child(parent_task, existing_task_id, turbo_tasks);
1440 return existing_task_id;
1441 }
1442
1443 self.track_cache_miss(&task_type);
1444 self.connect_child(parent_task, task_id, turbo_tasks);
1445
1446 task_id
1447 }
1448
1449 fn debug_trace_transient_task(
1452 &self,
1453 task_type: &CachedTaskType,
1454 cell_id: Option<CellId>,
1455 ) -> DebugTraceTransientTask {
1456 fn inner_id(
1459 backend: &TurboTasksBackendInner<impl BackingStorage>,
1460 task_id: TaskId,
1461 cell_type_id: Option<ValueTypeId>,
1462 visited_set: &mut FxHashSet<TaskId>,
1463 ) -> DebugTraceTransientTask {
1464 if let Some(task_type) = backend.lookup_task_type(task_id) {
1465 if visited_set.contains(&task_id) {
1466 let task_name = task_type.get_name();
1467 DebugTraceTransientTask::Collapsed {
1468 task_name,
1469 cell_type_id,
1470 }
1471 } else {
1472 inner_cached(backend, &task_type, cell_type_id, visited_set)
1473 }
1474 } else {
1475 DebugTraceTransientTask::Uncached { cell_type_id }
1476 }
1477 }
1478 fn inner_cached(
1479 backend: &TurboTasksBackendInner<impl BackingStorage>,
1480 task_type: &CachedTaskType,
1481 cell_type_id: Option<ValueTypeId>,
1482 visited_set: &mut FxHashSet<TaskId>,
1483 ) -> DebugTraceTransientTask {
1484 let task_name = task_type.get_name();
1485
1486 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1487 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1488 return None;
1492 };
1493 if task_id.is_transient() {
1494 Some(Box::new(inner_id(
1495 backend,
1496 task_id,
1497 cause_self_raw_vc.try_get_type_id(),
1498 visited_set,
1499 )))
1500 } else {
1501 None
1502 }
1503 });
1504 let cause_args = task_type
1505 .arg
1506 .get_raw_vcs()
1507 .into_iter()
1508 .filter_map(|raw_vc| {
1509 let Some(task_id) = raw_vc.try_get_task_id() else {
1510 return None;
1512 };
1513 if !task_id.is_transient() {
1514 return None;
1515 }
1516 Some((task_id, raw_vc.try_get_type_id()))
1517 })
1518 .collect::<IndexSet<_>>() .into_iter()
1520 .map(|(task_id, cell_type_id)| {
1521 inner_id(backend, task_id, cell_type_id, visited_set)
1522 })
1523 .collect();
1524
1525 DebugTraceTransientTask::Cached {
1526 task_name,
1527 cell_type_id,
1528 cause_self,
1529 cause_args,
1530 }
1531 }
1532 inner_cached(
1533 self,
1534 task_type,
1535 cell_id.map(|c| c.type_id),
1536 &mut FxHashSet::default(),
1537 )
1538 }
1539
1540 fn invalidate_task(
1541 &self,
1542 task_id: TaskId,
1543 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1544 ) {
1545 if !self.should_track_dependencies() {
1546 panic!("Dependency tracking is disabled so invalidation is not allowed");
1547 }
1548 operation::InvalidateOperation::run(
1549 smallvec![task_id],
1550 #[cfg(feature = "trace_task_dirty")]
1551 TaskDirtyCause::Invalidator,
1552 self.execute_context(turbo_tasks),
1553 );
1554 }
1555
1556 fn invalidate_tasks(
1557 &self,
1558 tasks: &[TaskId],
1559 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1560 ) {
1561 if !self.should_track_dependencies() {
1562 panic!("Dependency tracking is disabled so invalidation is not allowed");
1563 }
1564 operation::InvalidateOperation::run(
1565 tasks.iter().copied().collect(),
1566 #[cfg(feature = "trace_task_dirty")]
1567 TaskDirtyCause::Unknown,
1568 self.execute_context(turbo_tasks),
1569 );
1570 }
1571
1572 fn invalidate_tasks_set(
1573 &self,
1574 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1575 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1576 ) {
1577 if !self.should_track_dependencies() {
1578 panic!("Dependency tracking is disabled so invalidation is not allowed");
1579 }
1580 operation::InvalidateOperation::run(
1581 tasks.iter().copied().collect(),
1582 #[cfg(feature = "trace_task_dirty")]
1583 TaskDirtyCause::Unknown,
1584 self.execute_context(turbo_tasks),
1585 );
1586 }
1587
1588 fn invalidate_serialization(
1589 &self,
1590 task_id: TaskId,
1591 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1592 ) {
1593 if task_id.is_transient() {
1594 return;
1595 }
1596 let mut ctx = self.execute_context(turbo_tasks);
1597 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1598 task.invalidate_serialization();
1599 }
1600
1601 fn get_task_description(&self, task_id: TaskId) -> String {
1602 self.lookup_task_type(task_id).map_or_else(
1603 || format!("{task_id:?} transient"),
1604 |task_type| task_type.to_string(),
1605 )
1606 }
1607
1608 fn task_execution_canceled(
1609 &self,
1610 task_id: TaskId,
1611 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1612 ) {
1613 let mut ctx = self.execute_context(turbo_tasks);
1614 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1615 if let Some(in_progress) = task.take_in_progress() {
1616 match in_progress {
1617 InProgressState::Scheduled {
1618 done_event,
1619 reason: _,
1620 } => done_event.notify(usize::MAX),
1621 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1622 done_event.notify(usize::MAX)
1623 }
1624 InProgressState::Canceled => {}
1625 }
1626 }
1627 let old = task.set_in_progress(InProgressState::Canceled);
1628 debug_assert!(old.is_none(), "InProgress already exists");
1629 }
1630
1631 fn try_start_task_execution(
1632 &self,
1633 task_id: TaskId,
1634 priority: TaskPriority,
1635 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1636 ) -> Option<TaskExecutionSpec<'_>> {
1637 enum TaskType {
1638 Cached(Arc<CachedTaskType>),
1639 Transient(Arc<TransientTask>),
1640 }
1641 let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1642 (TaskType::Cached(task_type), false)
1643 } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1644 (
1645 TaskType::Transient(task_type.clone()),
1646 matches!(**task_type, TransientTask::Once(_)),
1647 )
1648 } else {
1649 return None;
1650 };
1651 let execution_reason;
1652 {
1653 let mut ctx = self.execute_context(turbo_tasks);
1654 let mut task = ctx.task(task_id, TaskDataCategory::All);
1655 if let Some(tasks) = task.prefetch() {
1656 drop(task);
1657 ctx.prepare_tasks(tasks);
1658 task = ctx.task(task_id, TaskDataCategory::All);
1659 }
1660 let in_progress = task.take_in_progress()?;
1661 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1662 let old = task.set_in_progress(in_progress);
1663 debug_assert!(old.is_none(), "InProgress already exists");
1664 return None;
1665 };
1666 execution_reason = reason;
1667 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1668 InProgressStateInner {
1669 stale: false,
1670 once_task,
1671 done_event,
1672 session_dependent: false,
1673 marked_as_completed: false,
1674 new_children: Default::default(),
1675 },
1676 )));
1677 debug_assert!(old.is_none(), "InProgress already exists");
1678
1679 enum Collectible {
1681 Current(CollectibleRef, i32),
1682 Outdated(CollectibleRef),
1683 }
1684 let collectibles = task
1685 .iter_collectibles()
1686 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1687 .chain(
1688 task.iter_outdated_collectibles()
1689 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1690 )
1691 .collect::<Vec<_>>();
1692 for collectible in collectibles {
1693 match collectible {
1694 Collectible::Current(collectible, value) => {
1695 let _ = task.insert_outdated_collectible(collectible, value);
1696 }
1697 Collectible::Outdated(collectible) => {
1698 if task
1699 .collectibles()
1700 .is_none_or(|m| m.get(&collectible).is_none())
1701 {
1702 task.remove_outdated_collectibles(&collectible);
1703 }
1704 }
1705 }
1706 }
1707
1708 if self.should_track_dependencies() {
1709 let cell_dependencies = task.iter_cell_dependencies().collect();
1711 task.set_outdated_cell_dependencies(cell_dependencies);
1712
1713 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1714 task.set_outdated_output_dependencies(outdated_output_dependencies);
1715 }
1716 }
1717
1718 let (span, future) = match task_type {
1719 TaskType::Cached(task_type) => {
1720 let CachedTaskType {
1721 native_fn,
1722 this,
1723 arg,
1724 } = &*task_type;
1725 (
1726 native_fn.span(task_id.persistence(), execution_reason, priority),
1727 native_fn.execute(*this, &**arg),
1728 )
1729 }
1730 TaskType::Transient(task_type) => {
1731 let span = tracing::trace_span!("turbo_tasks::root_task");
1732 let future = match &*task_type {
1733 TransientTask::Root(f) => f(),
1734 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1735 };
1736 (span, future)
1737 }
1738 };
1739 Some(TaskExecutionSpec { future, span })
1740 }
1741
1742 fn task_execution_completed(
1743 &self,
1744 task_id: TaskId,
1745 result: Result<RawVc, TurboTasksExecutionError>,
1746 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1747 has_invalidator: bool,
1748 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1749 ) -> bool {
1750 #[cfg(not(feature = "trace_task_details"))]
1765 let span = tracing::trace_span!(
1766 "task execution completed",
1767 new_children = tracing::field::Empty
1768 )
1769 .entered();
1770 #[cfg(feature = "trace_task_details")]
1771 let span = tracing::trace_span!(
1772 "task execution completed",
1773 task_id = display(task_id),
1774 result = match result.as_ref() {
1775 Ok(value) => display(either::Either::Left(value)),
1776 Err(err) => display(either::Either::Right(err)),
1777 },
1778 new_children = tracing::field::Empty,
1779 immutable = tracing::field::Empty,
1780 new_output = tracing::field::Empty,
1781 output_dependents = tracing::field::Empty,
1782 stale = tracing::field::Empty,
1783 )
1784 .entered();
1785
1786 let is_error = result.is_err();
1787
1788 let mut ctx = self.execute_context(turbo_tasks);
1789
1790 let Some(TaskExecutionCompletePrepareResult {
1791 new_children,
1792 is_now_immutable,
1793 #[cfg(feature = "verify_determinism")]
1794 no_output_set,
1795 new_output,
1796 output_dependent_tasks,
1797 }) = self.task_execution_completed_prepare(
1798 &mut ctx,
1799 #[cfg(feature = "trace_task_details")]
1800 &span,
1801 task_id,
1802 result,
1803 cell_counters,
1804 has_invalidator,
1805 )
1806 else {
1807 #[cfg(feature = "trace_task_details")]
1809 span.record("stale", "prepare");
1810 return true;
1811 };
1812
1813 #[cfg(feature = "trace_task_details")]
1814 span.record("new_output", new_output.is_some());
1815 #[cfg(feature = "trace_task_details")]
1816 span.record("output_dependents", output_dependent_tasks.len());
1817
1818 if !output_dependent_tasks.is_empty() {
1823 self.task_execution_completed_invalidate_output_dependent(
1824 &mut ctx,
1825 task_id,
1826 output_dependent_tasks,
1827 );
1828 }
1829
1830 let has_new_children = !new_children.is_empty();
1831 span.record("new_children", new_children.len());
1832
1833 if has_new_children {
1834 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1835 }
1836
1837 if has_new_children
1838 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1839 {
1840 #[cfg(feature = "trace_task_details")]
1842 span.record("stale", "connect");
1843 return true;
1844 }
1845
1846 let (stale, in_progress_cells) = self.task_execution_completed_finish(
1847 &mut ctx,
1848 task_id,
1849 #[cfg(feature = "verify_determinism")]
1850 no_output_set,
1851 new_output,
1852 is_now_immutable,
1853 );
1854 if stale {
1855 #[cfg(feature = "trace_task_details")]
1857 span.record("stale", "finish");
1858 return true;
1859 }
1860
1861 let removed_data =
1862 self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
1863
1864 drop(removed_data);
1866 drop(in_progress_cells);
1867
1868 false
1869 }
1870
1871 fn task_execution_completed_prepare(
1872 &self,
1873 ctx: &mut impl ExecuteContext<'_>,
1874 #[cfg(feature = "trace_task_details")] span: &Span,
1875 task_id: TaskId,
1876 result: Result<RawVc, TurboTasksExecutionError>,
1877 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1878 has_invalidator: bool,
1879 ) -> Option<TaskExecutionCompletePrepareResult> {
1880 let mut task = ctx.task(task_id, TaskDataCategory::All);
1881 let Some(in_progress) = task.get_in_progress_mut() else {
1882 panic!("Task execution completed, but task is not in progress: {task:#?}");
1883 };
1884 if matches!(in_progress, InProgressState::Canceled) {
1885 return Some(TaskExecutionCompletePrepareResult {
1886 new_children: Default::default(),
1887 is_now_immutable: false,
1888 #[cfg(feature = "verify_determinism")]
1889 no_output_set: false,
1890 new_output: None,
1891 output_dependent_tasks: Default::default(),
1892 });
1893 }
1894 let &mut InProgressState::InProgress(box InProgressStateInner {
1895 stale,
1896 ref mut new_children,
1897 session_dependent,
1898 ..
1899 }) = in_progress
1900 else {
1901 panic!("Task execution completed, but task is not in progress: {task:#?}");
1902 };
1903
1904 #[cfg(not(feature = "no_fast_stale"))]
1906 if stale {
1907 let Some(InProgressState::InProgress(box InProgressStateInner {
1908 done_event,
1909 mut new_children,
1910 ..
1911 })) = task.take_in_progress()
1912 else {
1913 unreachable!();
1914 };
1915 let old = task.set_in_progress(InProgressState::Scheduled {
1916 done_event,
1917 reason: TaskExecutionReason::Stale,
1918 });
1919 debug_assert!(old.is_none(), "InProgress already exists");
1920 for task in task.iter_children() {
1923 new_children.remove(&task);
1924 }
1925 drop(task);
1926
1927 AggregationUpdateQueue::run(
1930 AggregationUpdateJob::DecreaseActiveCounts {
1931 task_ids: new_children.into_iter().collect(),
1932 },
1933 ctx,
1934 );
1935 return None;
1936 }
1937
1938 let mut new_children = take(new_children);
1940
1941 if has_invalidator {
1943 task.set_invalidator(true);
1944 }
1945
1946 let old_counters: FxHashMap<_, _> = task
1948 .iter_cell_type_max_index()
1949 .map(|(&k, &v)| (k, v))
1950 .collect();
1951 let mut counters_to_remove = old_counters.clone();
1952
1953 for (&cell_type, &max_index) in cell_counters.iter() {
1954 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1955 if old_max_index != max_index {
1956 task.insert_cell_type_max_index(cell_type, max_index);
1957 }
1958 } else {
1959 task.insert_cell_type_max_index(cell_type, max_index);
1960 }
1961 }
1962 for (cell_type, _) in counters_to_remove {
1963 task.remove_cell_type_max_index(&cell_type);
1964 }
1965
1966 let mut queue = AggregationUpdateQueue::new();
1967
1968 let mut old_edges = Vec::new();
1969
1970 let has_children = !new_children.is_empty();
1971 let is_immutable = task.immutable();
1972 let task_dependencies_for_immutable =
1973 if !is_immutable
1975 && !session_dependent
1977 && !task.invalidator()
1979 && task.is_collectibles_dependencies_empty()
1981 {
1982 Some(
1983 task.iter_output_dependencies()
1985 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
1986 .collect::<FxHashSet<_>>(),
1987 )
1988 } else {
1989 None
1990 };
1991
1992 if has_children {
1993 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1995
1996 old_edges.extend(
1998 task.iter_children()
1999 .filter(|task| !new_children.remove(task))
2000 .map(OutdatedEdge::Child),
2001 );
2002 } else {
2003 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2004 }
2005
2006 old_edges.extend(
2007 task.iter_outdated_collectibles()
2008 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2009 );
2010
2011 if self.should_track_dependencies() {
2012 old_edges.extend(
2019 task.iter_outdated_cell_dependencies()
2020 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2021 );
2022 old_edges.extend(
2023 task.iter_outdated_output_dependencies()
2024 .map(OutdatedEdge::OutputDependency),
2025 );
2026 }
2027
2028 let current_output = task.get_output();
2030 #[cfg(feature = "verify_determinism")]
2031 let no_output_set = current_output.is_none();
2032 let new_output = match result {
2033 Ok(RawVc::TaskOutput(output_task_id)) => {
2034 if let Some(OutputValue::Output(current_task_id)) = current_output
2035 && *current_task_id == output_task_id
2036 {
2037 None
2038 } else {
2039 Some(OutputValue::Output(output_task_id))
2040 }
2041 }
2042 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2043 if let Some(OutputValue::Cell(CellRef {
2044 task: current_task_id,
2045 cell: current_cell,
2046 })) = current_output
2047 && *current_task_id == output_task_id
2048 && *current_cell == cell
2049 {
2050 None
2051 } else {
2052 Some(OutputValue::Cell(CellRef {
2053 task: output_task_id,
2054 cell,
2055 }))
2056 }
2057 }
2058 Ok(RawVc::LocalOutput(..)) => {
2059 panic!("Non-local tasks must not return a local Vc");
2060 }
2061 Err(err) => {
2062 if let Some(OutputValue::Error(old_error)) = current_output
2063 && old_error == &err
2064 {
2065 None
2066 } else {
2067 Some(OutputValue::Error(err))
2068 }
2069 }
2070 };
2071 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2072 if new_output.is_some() && ctx.should_track_dependencies() {
2074 output_dependent_tasks = task.iter_output_dependent().collect();
2075 }
2076
2077 drop(task);
2078
2079 let mut is_now_immutable = false;
2081 if let Some(dependencies) = task_dependencies_for_immutable
2082 && dependencies
2083 .iter()
2084 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Meta).immutable())
2085 {
2086 is_now_immutable = true;
2087 }
2088 #[cfg(feature = "trace_task_details")]
2089 span.record("immutable", is_immutable || is_now_immutable);
2090
2091 if !queue.is_empty() || !old_edges.is_empty() {
2092 #[cfg(feature = "trace_task_completion")]
2093 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2094 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2098 }
2099
2100 Some(TaskExecutionCompletePrepareResult {
2101 new_children,
2102 is_now_immutable,
2103 #[cfg(feature = "verify_determinism")]
2104 no_output_set,
2105 new_output,
2106 output_dependent_tasks,
2107 })
2108 }
2109
2110 fn task_execution_completed_invalidate_output_dependent(
2111 &self,
2112 ctx: &mut impl ExecuteContext<'_>,
2113 task_id: TaskId,
2114 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2115 ) {
2116 debug_assert!(!output_dependent_tasks.is_empty());
2117
2118 if output_dependent_tasks.len() > 1 {
2119 ctx.prepare_tasks(
2120 output_dependent_tasks
2121 .iter()
2122 .map(|&id| (id, TaskDataCategory::All)),
2123 );
2124 }
2125
2126 fn process_output_dependents(
2127 ctx: &mut impl ExecuteContext<'_>,
2128 task_id: TaskId,
2129 dependent_task_id: TaskId,
2130 queue: &mut AggregationUpdateQueue,
2131 ) {
2132 #[cfg(feature = "trace_task_output_dependencies")]
2133 let span = tracing::trace_span!(
2134 "invalidate output dependency",
2135 task = %task_id,
2136 dependent_task = %dependent_task_id,
2137 result = tracing::field::Empty,
2138 )
2139 .entered();
2140 if ctx.is_once_task(dependent_task_id) {
2141 #[cfg(feature = "trace_task_output_dependencies")]
2143 span.record("result", "once task");
2144 return;
2145 }
2146 let mut make_stale = true;
2147 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2148 if dependent.outdated_output_dependencies_contains(&task_id) {
2149 #[cfg(feature = "trace_task_output_dependencies")]
2150 span.record("result", "outdated dependency");
2151 make_stale = false;
2156 } else if !dependent.output_dependencies_contains(&task_id) {
2157 #[cfg(feature = "trace_task_output_dependencies")]
2160 span.record("result", "no backward dependency");
2161 return;
2162 }
2163 make_task_dirty_internal(
2164 dependent,
2165 dependent_task_id,
2166 make_stale,
2167 #[cfg(feature = "trace_task_dirty")]
2168 TaskDirtyCause::OutputChange { task_id },
2169 queue,
2170 ctx,
2171 );
2172 #[cfg(feature = "trace_task_output_dependencies")]
2173 span.record("result", "marked dirty");
2174 }
2175
2176 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2177 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2178 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2179 let _ = scope_and_block(chunks.len(), |scope| {
2180 for chunk in chunks {
2181 let child_ctx = ctx.child_context();
2182 scope.spawn(move || {
2183 let mut ctx = child_ctx.create();
2184 let mut queue = AggregationUpdateQueue::new();
2185 for dependent_task_id in chunk {
2186 process_output_dependents(
2187 &mut ctx,
2188 task_id,
2189 dependent_task_id,
2190 &mut queue,
2191 )
2192 }
2193 queue.execute(&mut ctx);
2194 });
2195 }
2196 });
2197 } else {
2198 let mut queue = AggregationUpdateQueue::new();
2199 for dependent_task_id in output_dependent_tasks {
2200 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2201 }
2202 queue.execute(ctx);
2203 }
2204 }
2205
2206 fn task_execution_completed_unfinished_children_dirty(
2207 &self,
2208 ctx: &mut impl ExecuteContext<'_>,
2209 new_children: &FxHashSet<TaskId>,
2210 ) {
2211 debug_assert!(!new_children.is_empty());
2212
2213 let mut queue = AggregationUpdateQueue::new();
2214 ctx.for_each_task_meta(new_children.iter().copied(), |child_task, ctx| {
2215 if !child_task.has_output() {
2216 let child_id = child_task.id();
2217 make_task_dirty_internal(
2218 child_task,
2219 child_id,
2220 false,
2221 #[cfg(feature = "trace_task_dirty")]
2222 TaskDirtyCause::InitialDirty,
2223 &mut queue,
2224 ctx,
2225 );
2226 }
2227 });
2228
2229 queue.execute(ctx);
2230 }
2231
2232 fn task_execution_completed_connect(
2233 &self,
2234 ctx: &mut impl ExecuteContext<'_>,
2235 task_id: TaskId,
2236 new_children: FxHashSet<TaskId>,
2237 ) -> bool {
2238 debug_assert!(!new_children.is_empty());
2239
2240 let mut task = ctx.task(task_id, TaskDataCategory::All);
2241 let Some(in_progress) = task.get_in_progress() else {
2242 panic!("Task execution completed, but task is not in progress: {task:#?}");
2243 };
2244 if matches!(in_progress, InProgressState::Canceled) {
2245 return false;
2247 }
2248 let InProgressState::InProgress(box InProgressStateInner {
2249 #[cfg(not(feature = "no_fast_stale"))]
2250 stale,
2251 ..
2252 }) = in_progress
2253 else {
2254 panic!("Task execution completed, but task is not in progress: {task:#?}");
2255 };
2256
2257 #[cfg(not(feature = "no_fast_stale"))]
2259 if *stale {
2260 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2261 task.take_in_progress()
2262 else {
2263 unreachable!();
2264 };
2265 let old = task.set_in_progress(InProgressState::Scheduled {
2266 done_event,
2267 reason: TaskExecutionReason::Stale,
2268 });
2269 debug_assert!(old.is_none(), "InProgress already exists");
2270 drop(task);
2271
2272 AggregationUpdateQueue::run(
2275 AggregationUpdateJob::DecreaseActiveCounts {
2276 task_ids: new_children.into_iter().collect(),
2277 },
2278 ctx,
2279 );
2280 return true;
2281 }
2282
2283 let has_active_count = ctx.should_track_activeness()
2284 && task
2285 .get_activeness()
2286 .is_some_and(|activeness| activeness.active_counter > 0);
2287 connect_children(
2288 ctx,
2289 task_id,
2290 task,
2291 new_children,
2292 has_active_count,
2293 ctx.should_track_activeness(),
2294 );
2295
2296 false
2297 }
2298
2299 fn task_execution_completed_finish(
2300 &self,
2301 ctx: &mut impl ExecuteContext<'_>,
2302 task_id: TaskId,
2303 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2304 new_output: Option<OutputValue>,
2305 is_now_immutable: bool,
2306 ) -> (
2307 bool,
2308 Option<
2309 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2310 >,
2311 ) {
2312 let mut task = ctx.task(task_id, TaskDataCategory::All);
2313 let Some(in_progress) = task.take_in_progress() else {
2314 panic!("Task execution completed, but task is not in progress: {task:#?}");
2315 };
2316 if matches!(in_progress, InProgressState::Canceled) {
2317 return (false, None);
2319 }
2320 let InProgressState::InProgress(box InProgressStateInner {
2321 done_event,
2322 once_task: _,
2323 stale,
2324 session_dependent,
2325 marked_as_completed: _,
2326 new_children,
2327 }) = in_progress
2328 else {
2329 panic!("Task execution completed, but task is not in progress: {task:#?}");
2330 };
2331 debug_assert!(new_children.is_empty());
2332
2333 if stale {
2335 let old = task.set_in_progress(InProgressState::Scheduled {
2336 done_event,
2337 reason: TaskExecutionReason::Stale,
2338 });
2339 debug_assert!(old.is_none(), "InProgress already exists");
2340 return (true, None);
2341 }
2342
2343 let mut old_content = None;
2345 if let Some(value) = new_output {
2346 old_content = task.set_output(value);
2347 }
2348
2349 if is_now_immutable {
2352 task.set_immutable(true);
2353 }
2354
2355 let in_progress_cells = task.take_in_progress_cells();
2357 if let Some(ref cells) = in_progress_cells {
2358 for state in cells.values() {
2359 state.event.notify(usize::MAX);
2360 }
2361 }
2362
2363 let old_dirtyness = task.get_dirty().cloned();
2365 let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2366 None => (false, false),
2367 Some(Dirtyness::Dirty(_)) => (true, false),
2368 Some(Dirtyness::SessionDependent) => {
2369 let clean_in_current_session = task.current_session_clean();
2370 (true, clean_in_current_session)
2371 }
2372 };
2373
2374 let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2376 (Some(Dirtyness::SessionDependent), true, true)
2377 } else {
2378 (None, false, false)
2379 };
2380
2381 if old_dirtyness != new_dirtyness {
2383 if let Some(value) = new_dirtyness {
2384 task.set_dirty(value);
2385 } else if old_dirtyness.is_some() {
2386 task.take_dirty();
2387 }
2388 }
2389 if old_current_session_self_clean != new_current_session_self_clean {
2390 if new_current_session_self_clean {
2391 task.set_current_session_clean(true);
2392 } else if old_current_session_self_clean {
2393 task.set_current_session_clean(false);
2394 }
2395 }
2396
2397 let data_update = if old_self_dirty != new_self_dirty
2399 || old_current_session_self_clean != new_current_session_self_clean
2400 {
2401 let dirty_container_count = task
2402 .get_aggregated_dirty_container_count()
2403 .cloned()
2404 .unwrap_or_default();
2405 let current_session_clean_container_count = task
2406 .get_aggregated_current_session_clean_container_count()
2407 .copied()
2408 .unwrap_or_default();
2409 let result = ComputeDirtyAndCleanUpdate {
2410 old_dirty_container_count: dirty_container_count,
2411 new_dirty_container_count: dirty_container_count,
2412 old_current_session_clean_container_count: current_session_clean_container_count,
2413 new_current_session_clean_container_count: current_session_clean_container_count,
2414 old_self_dirty,
2415 new_self_dirty,
2416 old_current_session_self_clean,
2417 new_current_session_self_clean,
2418 }
2419 .compute();
2420 if result.dirty_count_update - result.current_session_clean_update < 0 {
2421 if let Some(activeness_state) = task.get_activeness_mut() {
2423 activeness_state.all_clean_event.notify(usize::MAX);
2424 activeness_state.unset_active_until_clean();
2425 if activeness_state.is_empty() {
2426 task.take_activeness();
2427 }
2428 }
2429 }
2430 result
2431 .aggregated_update(task_id)
2432 .and_then(|aggregated_update| {
2433 AggregationUpdateJob::data_update(&mut task, aggregated_update)
2434 })
2435 } else {
2436 None
2437 };
2438
2439 #[cfg(feature = "verify_determinism")]
2440 let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2441 #[cfg(not(feature = "verify_determinism"))]
2442 let reschedule = false;
2443 if reschedule {
2444 let old = task.set_in_progress(InProgressState::Scheduled {
2445 done_event,
2446 reason: TaskExecutionReason::Stale,
2447 });
2448 debug_assert!(old.is_none(), "InProgress already exists");
2449 drop(task);
2450 } else {
2451 drop(task);
2452
2453 done_event.notify(usize::MAX);
2455 }
2456
2457 drop(old_content);
2458
2459 if let Some(data_update) = data_update {
2460 AggregationUpdateQueue::run(data_update, ctx);
2461 }
2462
2463 (reschedule, in_progress_cells)
2465 }
2466
2467 fn task_execution_completed_cleanup(
2468 &self,
2469 ctx: &mut impl ExecuteContext<'_>,
2470 task_id: TaskId,
2471 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2472 is_error: bool,
2473 ) -> Vec<SharedReference> {
2474 let mut task = ctx.task(task_id, TaskDataCategory::All);
2475 let mut removed_cell_data = Vec::new();
2476 if !is_error {
2480 let to_remove_persistent: Vec<_> = task
2487 .iter_persistent_cell_data()
2488 .filter_map(|(cell, _)| {
2489 cell_counters
2490 .get(&cell.type_id)
2491 .is_none_or(|start_index| cell.index >= *start_index)
2492 .then_some(*cell)
2493 })
2494 .collect();
2495
2496 let to_remove_transient: Vec<_> = task
2498 .iter_transient_cell_data()
2499 .filter_map(|(cell, _)| {
2500 cell_counters
2501 .get(&cell.type_id)
2502 .is_none_or(|start_index| cell.index >= *start_index)
2503 .then_some(*cell)
2504 })
2505 .collect();
2506 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2507 for cell in to_remove_persistent {
2508 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2509 removed_cell_data.push(data.into_untyped());
2510 }
2511 }
2512 for cell in to_remove_transient {
2513 if let Some(data) = task.remove_transient_cell_data(&cell) {
2514 removed_cell_data.push(data);
2515 }
2516 }
2517 }
2518
2519 task.shrink_persistent_cell_data();
2521 task.shrink_transient_cell_data();
2522 task.shrink_cell_type_max_index();
2523 task.shrink_cell_dependencies();
2524 task.shrink_output_dependencies();
2525 task.shrink_collectibles_dependencies();
2526 task.shrink_outdated_cell_dependencies();
2527 task.shrink_outdated_output_dependencies();
2528 task.shrink_outdated_collectibles();
2529 task.shrink_outdated_collectibles_dependencies();
2530 task.shrink_children();
2531 task.shrink_collectibles();
2532 task.shrink_in_progress_cells();
2533
2534 drop(task);
2535
2536 removed_cell_data
2538 }
2539
2540 fn run_backend_job<'a>(
2541 self: &'a Arc<Self>,
2542 job: TurboTasksBackendJob,
2543 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2544 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2545 Box::pin(async move {
2546 match job {
2547 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2548 debug_assert!(self.should_persist());
2549
2550 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2551 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2552 let mut idle_start_listener = self.idle_start_event.listen();
2553 let mut idle_end_listener = self.idle_end_event.listen();
2554 let mut fresh_idle = true;
2555 loop {
2556 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2557 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2558 let idle_timeout = *IDLE_TIMEOUT;
2559 let (time, mut reason) =
2560 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2561 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2562 } else {
2563 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2564 };
2565
2566 let until = last_snapshot + time;
2567 if until > Instant::now() {
2568 let mut stop_listener = self.stopping_event.listen();
2569 if self.stopping.load(Ordering::Acquire) {
2570 return;
2571 }
2572 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2573 Instant::now() + idle_timeout
2574 } else {
2575 far_future()
2576 };
2577 loop {
2578 tokio::select! {
2579 _ = &mut stop_listener => {
2580 return;
2581 },
2582 _ = &mut idle_start_listener => {
2583 fresh_idle = true;
2584 idle_time = Instant::now() + idle_timeout;
2585 idle_start_listener = self.idle_start_event.listen()
2586 },
2587 _ = &mut idle_end_listener => {
2588 idle_time = until + idle_timeout;
2589 idle_end_listener = self.idle_end_event.listen()
2590 },
2591 _ = tokio::time::sleep_until(until) => {
2592 break;
2593 },
2594 _ = tokio::time::sleep_until(idle_time) => {
2595 if turbo_tasks.is_idle() {
2596 reason = "idle timeout";
2597 break;
2598 }
2599 },
2600 }
2601 }
2602 }
2603
2604 let this = self.clone();
2605 let snapshot = this.snapshot_and_persist(None, reason);
2606 if let Some((snapshot_start, new_data)) = snapshot {
2607 last_snapshot = snapshot_start;
2608 if !new_data {
2609 fresh_idle = false;
2610 continue;
2611 }
2612 let last_snapshot = last_snapshot.duration_since(self.start_time);
2613 self.last_snapshot.store(
2614 last_snapshot.as_millis().try_into().unwrap(),
2615 Ordering::Relaxed,
2616 );
2617
2618 turbo_tasks.schedule_backend_background_job(
2619 TurboTasksBackendJob::FollowUpSnapshot,
2620 );
2621 return;
2622 }
2623 }
2624 }
2625 }
2626 })
2627 }
2628
2629 fn try_read_own_task_cell(
2630 &self,
2631 task_id: TaskId,
2632 cell: CellId,
2633 options: ReadCellOptions,
2634 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2635 ) -> Result<TypedCellContent> {
2636 let mut ctx = self.execute_context(turbo_tasks);
2637 let task = ctx.task(task_id, TaskDataCategory::Data);
2638 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2639 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2640 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2641 } else {
2642 Ok(CellContent(None).into_typed(cell.type_id))
2643 }
2644 }
2645
2646 fn read_task_collectibles(
2647 &self,
2648 task_id: TaskId,
2649 collectible_type: TraitTypeId,
2650 reader_id: Option<TaskId>,
2651 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2652 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2653 let mut ctx = self.execute_context(turbo_tasks);
2654 let mut collectibles = AutoMap::default();
2655 {
2656 let mut task = ctx.task(task_id, TaskDataCategory::All);
2657 loop {
2659 let aggregation_number = get_aggregation_number(&task);
2660 if is_root_node(aggregation_number) {
2661 break;
2662 }
2663 drop(task);
2664 AggregationUpdateQueue::run(
2665 AggregationUpdateJob::UpdateAggregationNumber {
2666 task_id,
2667 base_aggregation_number: u32::MAX,
2668 distance: None,
2669 },
2670 &mut ctx,
2671 );
2672 task = ctx.task(task_id, TaskDataCategory::All);
2673 }
2674 for (collectible, count) in task.iter_aggregated_collectibles() {
2675 if *count > 0 && collectible.collectible_type == collectible_type {
2676 *collectibles
2677 .entry(RawVc::TaskCell(
2678 collectible.cell.task,
2679 collectible.cell.cell,
2680 ))
2681 .or_insert(0) += 1;
2682 }
2683 }
2684 for (&collectible, &count) in task.iter_collectibles() {
2685 if collectible.collectible_type == collectible_type {
2686 *collectibles
2687 .entry(RawVc::TaskCell(
2688 collectible.cell.task,
2689 collectible.cell.cell,
2690 ))
2691 .or_insert(0) += count;
2692 }
2693 }
2694 if let Some(reader_id) = reader_id {
2695 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2696 }
2697 }
2698 if let Some(reader_id) = reader_id {
2699 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2700 let target = CollectiblesRef {
2701 task: task_id,
2702 collectible_type,
2703 };
2704 if !reader.remove_outdated_collectibles_dependencies(&target) {
2705 let _ = reader.add_collectibles_dependencies(target);
2706 }
2707 }
2708 collectibles
2709 }
2710
2711 fn emit_collectible(
2712 &self,
2713 collectible_type: TraitTypeId,
2714 collectible: RawVc,
2715 task_id: TaskId,
2716 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2717 ) {
2718 self.assert_valid_collectible(task_id, collectible);
2719
2720 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2721 panic!("Collectibles need to be resolved");
2722 };
2723 let cell = CellRef {
2724 task: collectible_task,
2725 cell,
2726 };
2727 operation::UpdateCollectibleOperation::run(
2728 task_id,
2729 CollectibleRef {
2730 collectible_type,
2731 cell,
2732 },
2733 1,
2734 self.execute_context(turbo_tasks),
2735 );
2736 }
2737
2738 fn unemit_collectible(
2739 &self,
2740 collectible_type: TraitTypeId,
2741 collectible: RawVc,
2742 count: u32,
2743 task_id: TaskId,
2744 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2745 ) {
2746 self.assert_valid_collectible(task_id, collectible);
2747
2748 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2749 panic!("Collectibles need to be resolved");
2750 };
2751 let cell = CellRef {
2752 task: collectible_task,
2753 cell,
2754 };
2755 operation::UpdateCollectibleOperation::run(
2756 task_id,
2757 CollectibleRef {
2758 collectible_type,
2759 cell,
2760 },
2761 -(i32::try_from(count).unwrap()),
2762 self.execute_context(turbo_tasks),
2763 );
2764 }
2765
2766 fn update_task_cell(
2767 &self,
2768 task_id: TaskId,
2769 cell: CellId,
2770 is_serializable_cell_content: bool,
2771 content: CellContent,
2772 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2773 verification_mode: VerificationMode,
2774 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2775 ) {
2776 operation::UpdateCellOperation::run(
2777 task_id,
2778 cell,
2779 content,
2780 is_serializable_cell_content,
2781 updated_key_hashes,
2782 verification_mode,
2783 self.execute_context(turbo_tasks),
2784 );
2785 }
2786
2787 fn mark_own_task_as_session_dependent(
2788 &self,
2789 task_id: TaskId,
2790 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2791 ) {
2792 if !self.should_track_dependencies() {
2793 return;
2795 }
2796 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2797 let mut ctx = self.execute_context(turbo_tasks);
2798 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2799 let aggregation_number = get_aggregation_number(&task);
2800 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2801 drop(task);
2802 AggregationUpdateQueue::run(
2805 AggregationUpdateJob::UpdateAggregationNumber {
2806 task_id,
2807 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2808 distance: None,
2809 },
2810 &mut ctx,
2811 );
2812 task = ctx.task(task_id, TaskDataCategory::Meta);
2813 }
2814 if let Some(InProgressState::InProgress(box InProgressStateInner {
2815 session_dependent,
2816 ..
2817 })) = task.get_in_progress_mut()
2818 {
2819 *session_dependent = true;
2820 }
2821 }
2822
2823 fn mark_own_task_as_finished(
2824 &self,
2825 task: TaskId,
2826 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2827 ) {
2828 let mut ctx = self.execute_context(turbo_tasks);
2829 let mut task = ctx.task(task, TaskDataCategory::Data);
2830 if let Some(InProgressState::InProgress(box InProgressStateInner {
2831 marked_as_completed,
2832 ..
2833 })) = task.get_in_progress_mut()
2834 {
2835 *marked_as_completed = true;
2836 }
2841 }
2842
2843 fn set_own_task_aggregation_number(
2844 &self,
2845 task: TaskId,
2846 aggregation_number: u32,
2847 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2848 ) {
2849 let mut ctx = self.execute_context(turbo_tasks);
2850 AggregationUpdateQueue::run(
2851 AggregationUpdateJob::UpdateAggregationNumber {
2852 task_id: task,
2853 base_aggregation_number: aggregation_number,
2854 distance: None,
2855 },
2856 &mut ctx,
2857 );
2858 }
2859
2860 fn connect_task(
2861 &self,
2862 task: TaskId,
2863 parent_task: Option<TaskId>,
2864 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2865 ) {
2866 self.assert_not_persistent_calling_transient(parent_task, task, None);
2867 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2868 }
2869
2870 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2871 let task_id = self.transient_task_id_factory.get();
2872 let root_type = match task_type {
2873 TransientTaskType::Root(_) => RootType::RootTask,
2874 TransientTaskType::Once(_) => RootType::OnceTask,
2875 };
2876 self.transient_tasks.insert(
2877 task_id,
2878 Arc::new(match task_type {
2879 TransientTaskType::Root(f) => TransientTask::Root(f),
2880 TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2881 }),
2882 );
2883 {
2884 let mut task = self.storage.access_mut(task_id);
2885 (*task).init_transient_task(task_id, root_type, self.should_track_activeness());
2886 }
2887 #[cfg(feature = "verify_aggregation_graph")]
2888 self.root_tasks.lock().insert(task_id);
2889 task_id
2890 }
2891
2892 fn dispose_root_task(
2893 &self,
2894 task_id: TaskId,
2895 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2896 ) {
2897 #[cfg(feature = "verify_aggregation_graph")]
2898 self.root_tasks.lock().remove(&task_id);
2899
2900 let mut ctx = self.execute_context(turbo_tasks);
2901 let mut task = ctx.task(task_id, TaskDataCategory::All);
2902 let is_dirty = task.is_dirty();
2903 let has_dirty_containers = task.has_dirty_containers();
2904 if is_dirty.is_some() || has_dirty_containers {
2905 if let Some(activeness_state) = task.get_activeness_mut() {
2906 activeness_state.unset_root_type();
2908 activeness_state.set_active_until_clean();
2909 };
2910 } else if let Some(activeness_state) = task.take_activeness() {
2911 activeness_state.all_clean_event.notify(usize::MAX);
2914 }
2915 }
2916
2917 #[cfg(feature = "verify_aggregation_graph")]
2918 fn verify_aggregation_graph(
2919 &self,
2920 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2921 idle: bool,
2922 ) {
2923 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2924 return;
2925 }
2926 use std::{collections::VecDeque, env, io::stdout};
2927
2928 use crate::backend::operation::{get_uppers, is_aggregating_node};
2929
2930 let mut ctx = self.execute_context(turbo_tasks);
2931 let root_tasks = self.root_tasks.lock().clone();
2932
2933 for task_id in root_tasks.into_iter() {
2934 let mut queue = VecDeque::new();
2935 let mut visited = FxHashSet::default();
2936 let mut aggregated_nodes = FxHashSet::default();
2937 let mut collectibles = FxHashMap::default();
2938 let root_task_id = task_id;
2939 visited.insert(task_id);
2940 aggregated_nodes.insert(task_id);
2941 queue.push_back(task_id);
2942 let mut counter = 0;
2943 while let Some(task_id) = queue.pop_front() {
2944 counter += 1;
2945 if counter % 100000 == 0 {
2946 println!(
2947 "queue={}, visited={}, aggregated_nodes={}",
2948 queue.len(),
2949 visited.len(),
2950 aggregated_nodes.len()
2951 );
2952 }
2953 let task = ctx.task(task_id, TaskDataCategory::All);
2954 if idle && !self.is_idle.load(Ordering::Relaxed) {
2955 return;
2956 }
2957
2958 let uppers = get_uppers(&task);
2959 if task_id != root_task_id
2960 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2961 {
2962 panic!(
2963 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2964 {:?})",
2965 task_id,
2966 ctx.get_task_description(task_id),
2967 uppers
2968 );
2969 }
2970
2971 let aggregated_collectibles: Vec<_> = task
2972 .iter_aggregated_collectibles()
2973 .map(|(collectible, _)| collectible)
2974 .collect();
2975 for collectible in aggregated_collectibles {
2976 collectibles
2977 .entry(collectible)
2978 .or_insert_with(|| (false, Vec::new()))
2979 .1
2980 .push(task_id);
2981 }
2982
2983 let own_collectibles: Vec<_> = task
2984 .iter_collectibles_entries()
2985 .filter_map(|(&collectible, &value)| (value > 0).then_some(collectible))
2986 .collect::<Vec<_>>();
2987 for collectible in own_collectibles {
2988 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2989 *flag = true
2990 } else {
2991 panic!(
2992 "Task {} has a collectible {:?} that is not in any upper task",
2993 task_id, collectible
2994 );
2995 }
2996 }
2997
2998 let is_dirty = task.has_dirty();
2999 let has_dirty_container = task.has_dirty_containers();
3000 let should_be_in_upper = is_dirty || has_dirty_container;
3001
3002 let aggregation_number = get_aggregation_number(&task);
3003 if is_aggregating_node(aggregation_number) {
3004 aggregated_nodes.insert(task_id);
3005 }
3006 for child_id in task.iter_children() {
3013 if visited.insert(child_id) {
3015 queue.push_back(child_id);
3016 }
3017 }
3018 drop(task);
3019
3020 if should_be_in_upper {
3021 for upper_id in uppers {
3022 let task = ctx.task(upper_id, TaskDataCategory::All);
3023 let in_upper = task
3024 .get_aggregated_dirty_containers(&task_id)
3025 .is_some_and(|&dirty| dirty > 0);
3026 if !in_upper {
3027 let containers: Vec<_> = task
3028 .iter_aggregated_dirty_containers_entries()
3029 .map(|(&k, &v)| (k, v))
3030 .collect();
3031 panic!(
3032 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3033 ({})\nThese dirty containers are present:\n{:#?}",
3034 task_id,
3035 ctx.get_task_description(task_id),
3036 upper_id,
3037 ctx.get_task_description(upper_id),
3038 containers,
3039 );
3040 }
3041 }
3042 }
3043 }
3044
3045 for (collectible, (flag, task_ids)) in collectibles {
3046 if !flag {
3047 use std::io::Write;
3048 let mut stdout = stdout().lock();
3049 writeln!(
3050 stdout,
3051 "{:?} that is not emitted in any child task but in these aggregated \
3052 tasks: {:#?}",
3053 collectible,
3054 task_ids
3055 .iter()
3056 .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
3057 .collect::<Vec<_>>()
3058 )
3059 .unwrap();
3060
3061 let task_id = collectible.cell.task;
3062 let mut queue = {
3063 let task = ctx.task(task_id, TaskDataCategory::All);
3064 get_uppers(&task)
3065 };
3066 let mut visited = FxHashSet::default();
3067 for &upper_id in queue.iter() {
3068 visited.insert(upper_id);
3069 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3070 }
3071 while let Some(task_id) = queue.pop() {
3072 let desc = ctx.get_task_description(task_id);
3073 let task = ctx.task(task_id, TaskDataCategory::All);
3074 let aggregated_collectible = task
3075 .get_aggregated_collectibles(&collectible)
3076 .copied()
3077 .unwrap_or_default();
3078 let uppers = get_uppers(&task);
3079 drop(task);
3080 writeln!(
3081 stdout,
3082 "upper {task_id} {desc} collectible={aggregated_collectible}"
3083 )
3084 .unwrap();
3085 if task_ids.contains(&task_id) {
3086 writeln!(
3087 stdout,
3088 "Task has an upper connection to an aggregated task that doesn't \
3089 reference it. Upper connection is invalid!"
3090 )
3091 .unwrap();
3092 }
3093 for upper_id in uppers {
3094 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3095 if !visited.contains(&upper_id) {
3096 queue.push(upper_id);
3097 }
3098 }
3099 }
3100 panic!("See stdout for more details");
3101 }
3102 }
3103 }
3104 }
3105
3106 fn assert_not_persistent_calling_transient(
3107 &self,
3108 parent_id: Option<TaskId>,
3109 child_id: TaskId,
3110 cell_id: Option<CellId>,
3111 ) {
3112 if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
3113 self.panic_persistent_calling_transient(
3114 parent_id
3115 .and_then(|id| self.lookup_task_type(id))
3116 .as_deref(),
3117 self.lookup_task_type(child_id).as_deref(),
3118 cell_id,
3119 );
3120 }
3121 }
3122
3123 fn panic_persistent_calling_transient(
3124 &self,
3125 parent: Option<&CachedTaskType>,
3126 child: Option<&CachedTaskType>,
3127 cell_id: Option<CellId>,
3128 ) {
3129 let transient_reason = if let Some(child) = child {
3130 Cow::Owned(format!(
3131 " The callee is transient because it depends on:\n{}",
3132 self.debug_trace_transient_task(child, cell_id),
3133 ))
3134 } else {
3135 Cow::Borrowed("")
3136 };
3137 panic!(
3138 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3139 parent.map_or("unknown", |t| t.get_name()),
3140 child.map_or("unknown", |t| t.get_name()),
3141 transient_reason,
3142 );
3143 }
3144
3145 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3146 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3148 let task_info = if let Some(col_task_ty) = collectible
3150 .try_get_task_id()
3151 .and_then(|t| self.lookup_task_type(t))
3152 {
3153 Cow::Owned(format!(" (return type of {col_task_ty})"))
3154 } else {
3155 Cow::Borrowed("")
3156 };
3157 panic!("Collectible{task_info} must be a ResolvedVc")
3158 };
3159 if col_task_id.is_transient() && !task_id.is_transient() {
3160 let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
3161 Cow::Owned(format!(
3162 ". The collectible is transient because it depends on:\n{}",
3163 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3164 ))
3165 } else {
3166 Cow::Borrowed("")
3167 };
3168 panic!(
3170 "Collectible is transient, transient collectibles cannot be emitted from \
3171 persistent tasks{transient_reason}",
3172 )
3173 }
3174 }
3175}
3176
3177impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3178 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3179 self.0.startup(turbo_tasks);
3180 }
3181
3182 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3183 self.0.stopping();
3184 }
3185
3186 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3187 self.0.stop(turbo_tasks);
3188 }
3189
3190 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3191 self.0.idle_start(turbo_tasks);
3192 }
3193
3194 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3195 self.0.idle_end();
3196 }
3197
3198 fn get_or_create_persistent_task(
3199 &self,
3200 task_type: CachedTaskType,
3201 parent_task: Option<TaskId>,
3202 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3203 ) -> TaskId {
3204 self.0
3205 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3206 }
3207
3208 fn get_or_create_transient_task(
3209 &self,
3210 task_type: CachedTaskType,
3211 parent_task: Option<TaskId>,
3212 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3213 ) -> TaskId {
3214 self.0
3215 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3216 }
3217
3218 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3219 self.0.invalidate_task(task_id, turbo_tasks);
3220 }
3221
3222 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3223 self.0.invalidate_tasks(tasks, turbo_tasks);
3224 }
3225
3226 fn invalidate_tasks_set(
3227 &self,
3228 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3229 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3230 ) {
3231 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3232 }
3233
3234 fn invalidate_serialization(
3235 &self,
3236 task_id: TaskId,
3237 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3238 ) {
3239 self.0.invalidate_serialization(task_id, turbo_tasks);
3240 }
3241
3242 fn get_task_description(&self, task: TaskId) -> String {
3243 self.0.get_task_description(task)
3244 }
3245
3246 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3247 self.0.task_execution_canceled(task, turbo_tasks)
3248 }
3249
3250 fn try_start_task_execution(
3251 &self,
3252 task_id: TaskId,
3253 priority: TaskPriority,
3254 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3255 ) -> Option<TaskExecutionSpec<'_>> {
3256 self.0
3257 .try_start_task_execution(task_id, priority, turbo_tasks)
3258 }
3259
3260 fn task_execution_completed(
3261 &self,
3262 task_id: TaskId,
3263 result: Result<RawVc, TurboTasksExecutionError>,
3264 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3265 has_invalidator: bool,
3266 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3267 ) -> bool {
3268 self.0.task_execution_completed(
3269 task_id,
3270 result,
3271 cell_counters,
3272 has_invalidator,
3273 turbo_tasks,
3274 )
3275 }
3276
3277 type BackendJob = TurboTasksBackendJob;
3278
3279 fn run_backend_job<'a>(
3280 &'a self,
3281 job: Self::BackendJob,
3282 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3283 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3284 self.0.run_backend_job(job, turbo_tasks)
3285 }
3286
3287 fn try_read_task_output(
3288 &self,
3289 task_id: TaskId,
3290 reader: Option<TaskId>,
3291 options: ReadOutputOptions,
3292 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3293 ) -> Result<Result<RawVc, EventListener>> {
3294 self.0
3295 .try_read_task_output(task_id, reader, options, turbo_tasks)
3296 }
3297
3298 fn try_read_task_cell(
3299 &self,
3300 task_id: TaskId,
3301 cell: CellId,
3302 reader: Option<TaskId>,
3303 options: ReadCellOptions,
3304 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3305 ) -> Result<Result<TypedCellContent, EventListener>> {
3306 self.0
3307 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3308 }
3309
3310 fn try_read_own_task_cell(
3311 &self,
3312 task_id: TaskId,
3313 cell: CellId,
3314 options: ReadCellOptions,
3315 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3316 ) -> Result<TypedCellContent> {
3317 self.0
3318 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3319 }
3320
3321 fn read_task_collectibles(
3322 &self,
3323 task_id: TaskId,
3324 collectible_type: TraitTypeId,
3325 reader: Option<TaskId>,
3326 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3327 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3328 self.0
3329 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3330 }
3331
3332 fn emit_collectible(
3333 &self,
3334 collectible_type: TraitTypeId,
3335 collectible: RawVc,
3336 task_id: TaskId,
3337 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3338 ) {
3339 self.0
3340 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3341 }
3342
3343 fn unemit_collectible(
3344 &self,
3345 collectible_type: TraitTypeId,
3346 collectible: RawVc,
3347 count: u32,
3348 task_id: TaskId,
3349 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3350 ) {
3351 self.0
3352 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3353 }
3354
3355 fn update_task_cell(
3356 &self,
3357 task_id: TaskId,
3358 cell: CellId,
3359 is_serializable_cell_content: bool,
3360 content: CellContent,
3361 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3362 verification_mode: VerificationMode,
3363 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3364 ) {
3365 self.0.update_task_cell(
3366 task_id,
3367 cell,
3368 is_serializable_cell_content,
3369 content,
3370 updated_key_hashes,
3371 verification_mode,
3372 turbo_tasks,
3373 );
3374 }
3375
3376 fn mark_own_task_as_finished(
3377 &self,
3378 task_id: TaskId,
3379 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3380 ) {
3381 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3382 }
3383
3384 fn set_own_task_aggregation_number(
3385 &self,
3386 task: TaskId,
3387 aggregation_number: u32,
3388 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3389 ) {
3390 self.0
3391 .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3392 }
3393
3394 fn mark_own_task_as_session_dependent(
3395 &self,
3396 task: TaskId,
3397 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3398 ) {
3399 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3400 }
3401
3402 fn connect_task(
3403 &self,
3404 task: TaskId,
3405 parent_task: Option<TaskId>,
3406 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3407 ) {
3408 self.0.connect_task(task, parent_task, turbo_tasks);
3409 }
3410
3411 fn create_transient_task(
3412 &self,
3413 task_type: TransientTaskType,
3414 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3415 ) -> TaskId {
3416 self.0.create_transient_task(task_type)
3417 }
3418
3419 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3420 self.0.dispose_root_task(task_id, turbo_tasks);
3421 }
3422
3423 fn task_statistics(&self) -> &TaskStatisticsApi {
3424 &self.0.task_statistics
3425 }
3426
3427 fn is_tracking_dependencies(&self) -> bool {
3428 self.0.options.dependency_tracking
3429 }
3430}
3431
3432enum DebugTraceTransientTask {
3433 Cached {
3434 task_name: &'static str,
3435 cell_type_id: Option<ValueTypeId>,
3436 cause_self: Option<Box<DebugTraceTransientTask>>,
3437 cause_args: Vec<DebugTraceTransientTask>,
3438 },
3439 Collapsed {
3441 task_name: &'static str,
3442 cell_type_id: Option<ValueTypeId>,
3443 },
3444 Uncached {
3445 cell_type_id: Option<ValueTypeId>,
3446 },
3447}
3448
3449impl DebugTraceTransientTask {
3450 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3451 let indent = " ".repeat(level);
3452 f.write_str(&indent)?;
3453
3454 fn fmt_cell_type_id(
3455 f: &mut fmt::Formatter<'_>,
3456 cell_type_id: Option<ValueTypeId>,
3457 ) -> fmt::Result {
3458 if let Some(ty) = cell_type_id {
3459 write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3460 } else {
3461 Ok(())
3462 }
3463 }
3464
3465 match self {
3467 Self::Cached {
3468 task_name,
3469 cell_type_id,
3470 ..
3471 }
3472 | Self::Collapsed {
3473 task_name,
3474 cell_type_id,
3475 ..
3476 } => {
3477 f.write_str(task_name)?;
3478 fmt_cell_type_id(f, *cell_type_id)?;
3479 if matches!(self, Self::Collapsed { .. }) {
3480 f.write_str(" (collapsed)")?;
3481 }
3482 }
3483 Self::Uncached { cell_type_id } => {
3484 f.write_str("unknown transient task")?;
3485 fmt_cell_type_id(f, *cell_type_id)?;
3486 }
3487 }
3488 f.write_char('\n')?;
3489
3490 if let Self::Cached {
3492 cause_self,
3493 cause_args,
3494 ..
3495 } = self
3496 {
3497 if let Some(c) = cause_self {
3498 writeln!(f, "{indent} self:")?;
3499 c.fmt_indented(f, level + 1)?;
3500 }
3501 if !cause_args.is_empty() {
3502 writeln!(f, "{indent} args:")?;
3503 for c in cause_args {
3504 c.fmt_indented(f, level + 1)?;
3505 }
3506 }
3507 }
3508 Ok(())
3509 }
3510}
3511
3512impl fmt::Display for DebugTraceTransientTask {
3513 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3514 self.fmt_indented(f, 0)
3515 }
3516}
3517
3518fn far_future() -> Instant {
3520 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3525}