1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6 borrow::Cow,
7 cmp::min,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 ops::Range,
13 pin::Pin,
14 sync::{
15 Arc,
16 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17 },
18};
19
20use anyhow::{Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, info_span, trace_span};
28use turbo_tasks::{
29 CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
30 ReadOutputOptions, ReadTracking, SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId,
31 TraitTypeId, TurboTasksBackendApi, ValueTypeId,
32 backend::{
33 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
34 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
35 },
36 event::{Event, EventListener},
37 message_queue::TimingEvent,
38 registry::get_value_type,
39 task_statistics::TaskStatisticsApi,
40 trace::TraceRawVcs,
41 turbo_tasks,
42 util::{IdFactoryWithReuse, good_chunk_size},
43};
44
45pub use self::{operation::AnyOperation, storage::TaskDataCategory};
46#[cfg(feature = "trace_task_dirty")]
47use crate::backend::operation::TaskDirtyCause;
48use crate::{
49 backend::{
50 operation::{
51 AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
52 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
53 Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
54 get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
55 },
56 storage::{
57 InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
58 iter_many, remove,
59 },
60 },
61 backing_storage::BackingStorage,
62 data::{
63 ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
64 CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
65 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
66 },
67 utils::{
68 bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
69 ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
70 },
71};
72
73const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
74
75struct SnapshotRequest {
76 snapshot_requested: bool,
77 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
78}
79
80impl SnapshotRequest {
81 fn new() -> Self {
82 Self {
83 snapshot_requested: false,
84 suspended_operations: FxHashSet::default(),
85 }
86 }
87}
88
89type TransientTaskOnce =
90 Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
91
92pub enum TransientTask {
93 Root(TransientTaskRoot),
99
100 Once(TransientTaskOnce),
109}
110
111pub enum StorageMode {
112 ReadOnly,
114 ReadWrite,
117}
118
119pub struct BackendOptions {
120 pub dependency_tracking: bool,
125
126 pub active_tracking: bool,
132
133 pub storage_mode: Option<StorageMode>,
135
136 pub num_workers: Option<usize>,
139
140 pub small_preallocation: bool,
142}
143
144impl Default for BackendOptions {
145 fn default() -> Self {
146 Self {
147 dependency_tracking: true,
148 active_tracking: true,
149 storage_mode: Some(StorageMode::ReadWrite),
150 num_workers: None,
151 small_preallocation: false,
152 }
153 }
154}
155
156pub enum TurboTasksBackendJob {
157 InitialSnapshot,
158 FollowUpSnapshot,
159 Prefetch {
160 data: Arc<FxIndexMap<TaskId, bool>>,
161 range: Option<Range<usize>>,
162 },
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
168
169struct TurboTasksBackendInner<B: BackingStorage> {
170 options: BackendOptions,
171
172 start_time: Instant,
173 session_id: SessionId,
174
175 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
176 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
177
178 persisted_task_cache_log: Option<TaskCacheLog>,
179 task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
180 transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
181
182 storage: Storage,
183
184 local_is_partial: AtomicBool,
186
187 in_progress_operations: AtomicUsize,
193
194 snapshot_request: Mutex<SnapshotRequest>,
195 operations_suspended: Condvar,
199 snapshot_completed: Condvar,
202 last_snapshot: AtomicU64,
204
205 stopping: AtomicBool,
206 stopping_event: Event,
207 idle_start_event: Event,
208 idle_end_event: Event,
209 #[cfg(feature = "verify_aggregation_graph")]
210 is_idle: AtomicBool,
211
212 task_statistics: TaskStatisticsApi,
213
214 backing_storage: B,
215
216 #[cfg(feature = "verify_aggregation_graph")]
217 root_tasks: Mutex<FxHashSet<TaskId>>,
218}
219
220impl<B: BackingStorage> TurboTasksBackend<B> {
221 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
222 Self(Arc::new(TurboTasksBackendInner::new(
223 options,
224 backing_storage,
225 )))
226 }
227
228 pub fn backing_storage(&self) -> &B {
229 &self.0.backing_storage
230 }
231}
232
233impl<B: BackingStorage> TurboTasksBackendInner<B> {
234 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
235 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
236 let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
237 if !options.dependency_tracking {
238 options.active_tracking = false;
239 }
240 let small_preallocation = options.small_preallocation;
241 let next_task_id = backing_storage
242 .next_free_task_id()
243 .expect("Failed to get task id");
244 Self {
245 options,
246 start_time: Instant::now(),
247 session_id: backing_storage
248 .next_session_id()
249 .expect("Failed get session id"),
250 persisted_task_id_factory: IdFactoryWithReuse::new(
251 next_task_id,
252 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
253 ),
254 transient_task_id_factory: IdFactoryWithReuse::new(
255 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
256 TaskId::MAX,
257 ),
258 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
259 task_cache: BiMap::new(),
260 transient_tasks: FxDashMap::default(),
261 local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
262 storage: Storage::new(shard_amount, small_preallocation),
263 in_progress_operations: AtomicUsize::new(0),
264 snapshot_request: Mutex::new(SnapshotRequest::new()),
265 operations_suspended: Condvar::new(),
266 snapshot_completed: Condvar::new(),
267 last_snapshot: AtomicU64::new(0),
268 stopping: AtomicBool::new(false),
269 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
270 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
271 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
272 #[cfg(feature = "verify_aggregation_graph")]
273 is_idle: AtomicBool::new(false),
274 task_statistics: TaskStatisticsApi::default(),
275 backing_storage,
276 #[cfg(feature = "verify_aggregation_graph")]
277 root_tasks: Default::default(),
278 }
279 }
280
281 fn execute_context<'a>(
282 &'a self,
283 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
284 ) -> impl ExecuteContext<'a> {
285 ExecuteContextImpl::new(self, turbo_tasks)
286 }
287
288 fn session_id(&self) -> SessionId {
289 self.session_id
290 }
291
292 unsafe fn execute_context_with_tx<'e, 'tx>(
296 &'e self,
297 tx: Option<&'e B::ReadTransaction<'tx>>,
298 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
299 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
300 where
301 'tx: 'e,
302 {
303 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
305 }
306
307 fn suspending_requested(&self) -> bool {
308 self.should_persist()
309 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
310 }
311
312 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
313 #[cold]
314 fn operation_suspend_point_cold<B: BackingStorage>(
315 this: &TurboTasksBackendInner<B>,
316 suspend: impl FnOnce() -> AnyOperation,
317 ) {
318 let operation = Arc::new(suspend());
319 let mut snapshot_request = this.snapshot_request.lock();
320 if snapshot_request.snapshot_requested {
321 snapshot_request
322 .suspended_operations
323 .insert(operation.clone().into());
324 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
325 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
326 if value == SNAPSHOT_REQUESTED_BIT {
327 this.operations_suspended.notify_all();
328 }
329 this.snapshot_completed
330 .wait_while(&mut snapshot_request, |snapshot_request| {
331 snapshot_request.snapshot_requested
332 });
333 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
334 snapshot_request
335 .suspended_operations
336 .remove(&operation.into());
337 }
338 }
339
340 if self.suspending_requested() {
341 operation_suspend_point_cold(self, suspend);
342 }
343 }
344
345 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
346 if !self.should_persist() {
347 return OperationGuard { backend: None };
348 }
349 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
350 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
351 let mut snapshot_request = self.snapshot_request.lock();
352 if snapshot_request.snapshot_requested {
353 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
354 if value == SNAPSHOT_REQUESTED_BIT {
355 self.operations_suspended.notify_all();
356 }
357 self.snapshot_completed
358 .wait_while(&mut snapshot_request, |snapshot_request| {
359 snapshot_request.snapshot_requested
360 });
361 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
362 }
363 }
364 OperationGuard {
365 backend: Some(self),
366 }
367 }
368
369 fn should_persist(&self) -> bool {
370 matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
371 }
372
373 fn should_restore(&self) -> bool {
374 self.options.storage_mode.is_some()
375 }
376
377 fn should_track_dependencies(&self) -> bool {
378 self.options.dependency_tracking
379 }
380
381 fn should_track_activeness(&self) -> bool {
382 self.options.active_tracking
383 }
384
385 fn track_cache_hit(&self, task_type: &CachedTaskType) {
386 self.task_statistics
387 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
388 }
389
390 fn track_cache_miss(&self, task_type: &CachedTaskType) {
391 self.task_statistics
392 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
393 }
394}
395
396pub(crate) struct OperationGuard<'a, B: BackingStorage> {
397 backend: Option<&'a TurboTasksBackendInner<B>>,
398}
399
400impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
401 fn drop(&mut self) {
402 if let Some(backend) = self.backend {
403 let fetch_sub = backend
404 .in_progress_operations
405 .fetch_sub(1, Ordering::AcqRel);
406 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
407 backend.operations_suspended.notify_all();
408 }
409 }
410 }
411}
412
413struct TaskExecutionCompletePrepareResult {
415 pub new_children: FxHashSet<TaskId>,
416 pub removed_data: Vec<CachedDataItem>,
417 pub is_now_immutable: bool,
418 #[cfg(feature = "verify_determinism")]
419 pub no_output_set: bool,
420 pub new_output: Option<OutputValue>,
421 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
422}
423
424impl<B: BackingStorage> TurboTasksBackendInner<B> {
426 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
430 &'l self,
431 tx: Option<&'l B::ReadTransaction<'tx>>,
432 parent_task: Option<TaskId>,
433 child_task: TaskId,
434 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
435 ) {
436 operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
437 self.execute_context_with_tx(tx, turbo_tasks)
438 });
439 }
440
441 fn connect_child(
442 &self,
443 parent_task: Option<TaskId>,
444 child_task: TaskId,
445 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
446 ) {
447 operation::ConnectChildOperation::run(
448 parent_task,
449 child_task,
450 self.execute_context(turbo_tasks),
451 );
452 }
453
454 fn try_read_task_output(
455 self: &Arc<Self>,
456 task_id: TaskId,
457 reader: Option<TaskId>,
458 options: ReadOutputOptions,
459 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
460 ) -> Result<Result<RawVc, EventListener>> {
461 self.assert_not_persistent_calling_transient(reader, task_id, None);
462
463 let mut ctx = self.execute_context(turbo_tasks);
464 let need_reader_task = if self.should_track_dependencies()
465 && !matches!(options.tracking, ReadTracking::Untracked)
466 && reader.is_some_and(|reader_id| reader_id != task_id)
467 && let Some(reader_id) = reader
468 && reader_id != task_id
469 {
470 Some(reader_id)
471 } else {
472 None
473 };
474 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
475 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
479 (task, Some(reader))
480 } else {
481 (ctx.task(task_id, TaskDataCategory::All), None)
482 };
483
484 fn listen_to_done_event<B: BackingStorage>(
485 this: &TurboTasksBackendInner<B>,
486 reader: Option<TaskId>,
487 tracking: ReadTracking,
488 done_event: &Event,
489 ) -> EventListener {
490 done_event.listen_with_note(move || {
491 let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
492 move || {
493 if let Some(reader_desc) = reader_desc.as_ref() {
494 format!("try_read_task_output from {} ({})", reader_desc(), tracking)
495 } else {
496 format!("try_read_task_output ({})", tracking)
497 }
498 }
499 })
500 }
501
502 fn check_in_progress<B: BackingStorage>(
503 this: &TurboTasksBackendInner<B>,
504 task: &impl TaskGuard,
505 reader: Option<TaskId>,
506 tracking: ReadTracking,
507 ctx: &impl ExecuteContext<'_>,
508 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
509 {
510 match get!(task, InProgress) {
511 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
512 listen_to_done_event(this, reader, tracking, done_event),
513 ))),
514 Some(InProgressState::InProgress(box InProgressStateInner {
515 done_event, ..
516 })) => Some(Ok(Err(listen_to_done_event(
517 this, reader, tracking, done_event,
518 )))),
519 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
520 "{} was canceled",
521 ctx.get_task_description(task.id())
522 ))),
523 None => None,
524 }
525 }
526
527 if matches!(options.consistency, ReadConsistency::Strong) {
528 loop {
530 let aggregation_number = get_aggregation_number(&task);
531 if is_root_node(aggregation_number) {
532 break;
533 }
534 drop(task);
535 drop(reader_task);
536 {
537 let _span = tracing::trace_span!(
538 "make root node for strongly consistent read",
539 %task_id
540 )
541 .entered();
542 AggregationUpdateQueue::run(
543 AggregationUpdateJob::UpdateAggregationNumber {
544 task_id,
545 base_aggregation_number: u32::MAX,
546 distance: None,
547 },
548 &mut ctx,
549 );
550 }
551 (task, reader_task) = if let Some(reader_id) = need_reader_task {
552 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
554 (task, Some(reader))
555 } else {
556 (ctx.task(task_id, TaskDataCategory::All), None)
557 }
558 }
559
560 let is_dirty =
561 get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
562
563 let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
565 .cloned()
566 .unwrap_or_default()
567 .get(self.session_id);
568 if dirty_tasks > 0 || is_dirty {
569 let activeness = get_mut!(task, Activeness);
570 let mut task_ids_to_schedule: Vec<_> = Vec::new();
571 let activeness = if let Some(activeness) = activeness {
573 activeness.set_active_until_clean();
577 activeness
578 } else {
579 get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
583 .set_active_until_clean();
584 if ctx.should_track_activeness() {
585 task_ids_to_schedule = get_many!(
587 task,
588 AggregatedDirtyContainer {
589 task
590 } count if count.get(self.session_id) > 0 => {
591 task
592 }
593 );
594 task_ids_to_schedule.push(task_id);
595 }
596 get!(task, Activeness).unwrap()
597 };
598 let listener = activeness.all_clean_event.listen_with_note(move || {
599 let this = self.clone();
600 let tt = turbo_tasks.pin();
601 move || {
602 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
603 let mut ctx = this.execute_context(tt);
604 let mut visited = FxHashSet::default();
605 fn indent(s: &str) -> String {
606 s.split_inclusive('\n')
607 .flat_map(|line: &str| [" ", line].into_iter())
608 .collect::<String>()
609 }
610 fn get_info(
611 ctx: &mut impl ExecuteContext<'_>,
612 task_id: TaskId,
613 parent_and_count: Option<(TaskId, i32)>,
614 visited: &mut FxHashSet<TaskId>,
615 ) -> String {
616 let task = ctx.task(task_id, TaskDataCategory::Data);
617 let is_dirty = get!(task, Dirty)
618 .map_or(false, |dirty_state| dirty_state.get(ctx.session_id()));
619 let in_progress =
620 get!(task, InProgress).map_or("not in progress", |p| match p {
621 InProgressState::InProgress(_) => "in progress",
622 InProgressState::Scheduled { .. } => "scheduled",
623 InProgressState::Canceled => "canceled",
624 });
625 let activeness = get!(task, Activeness).map_or_else(
626 || "not active".to_string(),
627 |activeness| format!("{activeness:?}"),
628 );
629 let aggregation_number = get_aggregation_number(&task);
630 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
631 {
632 let uppers = get_uppers(&task);
633 !uppers.contains(&parent_task_id)
634 } else {
635 false
636 };
637
638 let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
640 .cloned()
641 .unwrap_or_default()
642 .get(ctx.session_id());
643
644 let task_description = ctx.get_task_description(task_id);
645 let is_dirty = if is_dirty { ", dirty" } else { "" };
646 let count = if let Some((_, count)) = parent_and_count {
647 format!(" {count}")
648 } else {
649 String::new()
650 };
651 let mut info = format!(
652 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
653 {in_progress}, {activeness}{is_dirty})",
654 );
655 let children: Vec<_> = iter_many!(
656 task,
657 AggregatedDirtyContainer {
658 task
659 } count => {
660 (task, count.get(ctx.session_id()))
661 }
662 )
663 .filter(|(_, count)| *count > 0)
664 .collect();
665 drop(task);
666
667 if missing_upper {
668 info.push_str("\n ERROR: missing upper connection");
669 }
670
671 if dirty_tasks > 0 || !children.is_empty() {
672 writeln!(info, "\n {dirty_tasks} dirty tasks:").unwrap();
673
674 for (child_task_id, count) in children {
675 let task_description = ctx.get_task_description(child_task_id);
676 if visited.insert(child_task_id) {
677 let child_info = get_info(
678 ctx,
679 child_task_id,
680 Some((task_id, count)),
681 visited,
682 );
683 info.push_str(&indent(&child_info));
684 if !info.ends_with('\n') {
685 info.push('\n');
686 }
687 } else {
688 writeln!(
689 info,
690 " {child_task_id} {task_description} {count} \
691 (already visited)"
692 )
693 .unwrap();
694 }
695 }
696 }
697 info
698 }
699 let info = get_info(&mut ctx, task_id, None, &mut visited);
700 format!(
701 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
702 )
703 }
704 });
705 drop(reader_task);
706 drop(task);
707 if !task_ids_to_schedule.is_empty() {
708 let mut queue = AggregationUpdateQueue::new();
709 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
710 queue.execute(&mut ctx);
711 }
712
713 return Ok(Err(listener));
714 }
715 }
716
717 if let Some(value) = check_in_progress(self, &task, reader, options.tracking, &ctx) {
718 return value;
719 }
720
721 if let Some(output) = get!(task, Output) {
722 let result = match output {
723 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
724 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
725 OutputValue::Error(error) => Err(error
726 .with_task_context(ctx.get_task_description(task_id), Some(task_id))
727 .into()),
728 };
729 if let Some(mut reader_task) = reader_task
730 && options.tracking.should_track(result.is_err())
731 && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
732 {
733 #[cfg(feature = "trace_task_output_dependencies")]
734 let _span = tracing::trace_span!(
735 "add output dependency",
736 task = %task_id,
737 dependent_task = ?reader
738 )
739 .entered();
740 let _ = task.add(CachedDataItem::OutputDependent {
741 task: reader.unwrap(),
742 value: (),
743 });
744 drop(task);
745
746 if reader_task
752 .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
753 .is_none()
754 {
755 let _ = reader_task.add(CachedDataItem::OutputDependency {
756 target: task_id,
757 value: (),
758 });
759 }
760 }
761
762 return result;
763 }
764 drop(reader_task);
765
766 let note = move || {
767 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
768 move || {
769 if let Some(reader_desc) = reader_desc.as_ref() {
770 format!("try_read_task_output (recompute) from {}", (reader_desc)())
771 } else {
772 "try_read_task_output (recompute, untracked)".to_string()
773 }
774 }
775 };
776
777 let (item, listener) = CachedDataItem::new_scheduled_with_listener(
779 TaskExecutionReason::OutputNotAvailable,
780 || self.get_task_desc_fn(task_id),
781 note,
782 );
783 task.add_new(item);
786 ctx.schedule_task(task);
787
788 Ok(Err(listener))
789 }
790
791 fn try_read_task_cell(
792 &self,
793 task_id: TaskId,
794 reader: Option<TaskId>,
795 cell: CellId,
796 options: ReadCellOptions,
797 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
798 ) -> Result<Result<TypedCellContent, EventListener>> {
799 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
800
801 fn add_cell_dependency(
802 task_id: TaskId,
803 mut task: impl TaskGuard,
804 reader: Option<TaskId>,
805 reader_task: Option<impl TaskGuard>,
806 cell: CellId,
807 ) {
808 if let Some(mut reader_task) = reader_task
809 && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
810 {
811 let _ = task.add(CachedDataItem::CellDependent {
812 cell,
813 task: reader.unwrap(),
814 value: (),
815 });
816 drop(task);
817
818 let target = CellRef {
824 task: task_id,
825 cell,
826 };
827 if reader_task
828 .remove(&CachedDataItemKey::OutdatedCellDependency { target })
829 .is_none()
830 {
831 let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
832 }
833 }
834 }
835
836 let mut ctx = self.execute_context(turbo_tasks);
837 let (mut task, reader_task) = if self.should_track_dependencies()
838 && !matches!(options.tracking, ReadTracking::Untracked)
839 && let Some(reader_id) = reader
840 && reader_id != task_id
841 {
842 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::Data);
846 (task, Some(reader))
847 } else {
848 (ctx.task(task_id, TaskDataCategory::Data), None)
849 };
850
851 let content = if options.final_read_hint {
852 remove!(task, CellData { cell })
853 } else if let Some(content) = get!(task, CellData { cell }) {
854 let content = content.clone();
855 Some(content)
856 } else {
857 None
858 };
859 if let Some(content) = content {
860 if options.tracking.should_track(false) {
861 add_cell_dependency(task_id, task, reader, reader_task, cell);
862 }
863 return Ok(Ok(TypedCellContent(
864 cell.type_id,
865 CellContent(Some(content.reference)),
866 )));
867 }
868
869 let in_progress = get!(task, InProgress);
870 if matches!(
871 in_progress,
872 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
873 ) {
874 return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
875 }
876 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
877
878 let max_id = get!(
880 task,
881 CellTypeMaxIndex {
882 cell_type: cell.type_id
883 }
884 )
885 .copied();
886 let Some(max_id) = max_id else {
887 if options.tracking.should_track(true) {
888 add_cell_dependency(task_id, task, reader, reader_task, cell);
889 }
890 bail!(
891 "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
892 ctx.get_task_description(task_id)
893 );
894 };
895 if cell.index >= max_id {
896 if options.tracking.should_track(true) {
897 add_cell_dependency(task_id, task, reader, reader_task, cell);
898 }
899 bail!(
900 "Cell {cell:?} no longer exists in task {} (index out of bounds)",
901 ctx.get_task_description(task_id)
902 );
903 }
904 drop(reader_task);
905
906 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
911 if !new_listener {
912 return Ok(Err(listener));
913 }
914
915 let _span = tracing::trace_span!(
916 "recomputation",
917 cell_type = get_value_type(cell.type_id).global_name,
918 cell_index = cell.index
919 )
920 .entered();
921
922 if is_cancelled {
924 bail!("{} was canceled", ctx.get_task_description(task_id));
925 }
926 task.add_new(CachedDataItem::new_scheduled(
927 TaskExecutionReason::CellNotAvailable,
928 || self.get_task_desc_fn(task_id),
929 ));
930 ctx.schedule_task(task);
931
932 Ok(Err(listener))
933 }
934
935 fn listen_to_cell(
936 &self,
937 task: &mut impl TaskGuard,
938 task_id: TaskId,
939 reader: Option<TaskId>,
940 cell: CellId,
941 ) -> (EventListener, bool) {
942 let note = move || {
943 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
944 move || {
945 if let Some(reader_desc) = reader_desc.as_ref() {
946 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
947 } else {
948 "try_read_task_cell (in progress, untracked)".to_string()
949 }
950 }
951 };
952 if let Some(in_progress) = get!(task, InProgressCell { cell }) {
953 let listener = in_progress.event.listen_with_note(note);
955 return (listener, false);
956 }
957 let in_progress = InProgressCellState::new(task_id, cell);
958 let listener = in_progress.event.listen_with_note(note);
959 task.add_new(CachedDataItem::InProgressCell {
960 cell,
961 value: in_progress,
962 });
963 (listener, true)
964 }
965
966 fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
967 if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
968 return Some(task_type);
969 }
970 if self.should_restore()
971 && self.local_is_partial.load(Ordering::Acquire)
972 && !task_id.is_transient()
973 && let Some(task_type) = unsafe {
974 self.backing_storage
975 .reverse_lookup_task_cache(None, task_id)
976 .expect("Failed to lookup task type")
977 }
978 {
979 let _ = self.task_cache.try_insert(task_type.clone(), task_id);
980 return Some(task_type);
981 }
982 None
983 }
984
985 fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
986 let task_type = self.lookup_task_type(task_id);
987 move || {
988 task_type.as_ref().map_or_else(
989 || format!("{task_id:?} transient"),
990 |task_type| format!("{task_id:?} {task_type}"),
991 )
992 }
993 }
994
995 fn snapshot(&self) -> Option<(Instant, bool)> {
996 let start = Instant::now();
997 debug_assert!(self.should_persist());
998 let mut snapshot_request = self.snapshot_request.lock();
999 snapshot_request.snapshot_requested = true;
1000 let active_operations = self
1001 .in_progress_operations
1002 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1003 if active_operations != 0 {
1004 self.operations_suspended
1005 .wait_while(&mut snapshot_request, |_| {
1006 self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
1007 });
1008 }
1009 let suspended_operations = snapshot_request
1010 .suspended_operations
1011 .iter()
1012 .map(|op| op.arc().clone())
1013 .collect::<Vec<_>>();
1014 drop(snapshot_request);
1015 self.storage.start_snapshot();
1016 let mut persisted_task_cache_log = self
1017 .persisted_task_cache_log
1018 .as_ref()
1019 .map(|l| l.take(|i| i))
1020 .unwrap_or_default();
1021 let mut snapshot_request = self.snapshot_request.lock();
1022 snapshot_request.snapshot_requested = false;
1023 self.in_progress_operations
1024 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1025 self.snapshot_completed.notify_all();
1026 let snapshot_time = Instant::now();
1027 drop(snapshot_request);
1028
1029 let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
1030 if task_id.is_transient() {
1031 return (None, None);
1032 }
1033 let len = inner.len();
1034
1035 let meta_restored = inner.state().meta_restored();
1036 let data_restored = inner.state().data_restored();
1037
1038 let mut meta = meta_restored.then(|| Vec::with_capacity(len));
1039 let mut data = data_restored.then(|| Vec::with_capacity(len));
1040 for (key, value) in inner.iter_all() {
1041 if key.is_persistent() && value.is_persistent() {
1042 match key.category() {
1043 TaskDataCategory::Meta => {
1044 if let Some(meta) = &mut meta {
1045 meta.push(CachedDataItem::from_key_and_value_ref(key, value))
1046 }
1047 }
1048 TaskDataCategory::Data => {
1049 if let Some(data) = &mut data {
1050 data.push(CachedDataItem::from_key_and_value_ref(key, value))
1051 }
1052 }
1053 _ => {}
1054 }
1055 }
1056 }
1057
1058 (meta, data)
1059 };
1060 let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
1061 (
1062 task_id,
1063 meta.map(|d| self.backing_storage.serialize(task_id, &d)),
1064 data.map(|d| self.backing_storage.serialize(task_id, &d)),
1065 )
1066 };
1067 let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
1068 if task_id.is_transient() {
1069 return (task_id, None, None);
1070 }
1071 let len = inner.len();
1072 let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
1073 let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
1074 for (key, value) in inner.iter_all() {
1075 if key.is_persistent() && value.is_persistent() {
1076 match key.category() {
1077 TaskDataCategory::Meta => {
1078 if let Some(meta) = &mut meta {
1079 meta.push(CachedDataItem::from_key_and_value_ref(key, value));
1080 }
1081 }
1082 TaskDataCategory::Data => {
1083 if let Some(data) = &mut data {
1084 data.push(CachedDataItem::from_key_and_value_ref(key, value));
1085 }
1086 }
1087 _ => {}
1088 }
1089 }
1090 }
1091 (
1092 task_id,
1093 meta.map(|meta| self.backing_storage.serialize(task_id, &meta)),
1094 data.map(|data| self.backing_storage.serialize(task_id, &data)),
1095 )
1096 };
1097
1098 let snapshot = {
1099 let _span = tracing::trace_span!("take snapshot");
1100 self.storage
1101 .take_snapshot(&preprocess, &process, &process_snapshot)
1102 };
1103
1104 #[cfg(feature = "print_cache_item_size")]
1105 #[derive(Default)]
1106 struct TaskCacheStats {
1107 data: usize,
1108 data_count: usize,
1109 meta: usize,
1110 meta_count: usize,
1111 }
1112 #[cfg(feature = "print_cache_item_size")]
1113 impl TaskCacheStats {
1114 fn add_data(&mut self, len: usize) {
1115 self.data += len;
1116 self.data_count += 1;
1117 }
1118
1119 fn add_meta(&mut self, len: usize) {
1120 self.meta += len;
1121 self.meta_count += 1;
1122 }
1123 }
1124 #[cfg(feature = "print_cache_item_size")]
1125 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1126 Mutex::new(FxHashMap::default());
1127
1128 let task_snapshots = snapshot
1129 .into_iter()
1130 .filter_map(|iter| {
1131 let mut iter = iter
1132 .filter_map(
1133 |(task_id, meta, data): (
1134 _,
1135 Option<Result<SmallVec<_>>>,
1136 Option<Result<SmallVec<_>>>,
1137 )| {
1138 let meta = match meta {
1139 Some(Ok(meta)) => {
1140 #[cfg(feature = "print_cache_item_size")]
1141 task_cache_stats
1142 .lock()
1143 .entry(self.get_task_description(task_id))
1144 .or_default()
1145 .add_meta(meta.len());
1146 Some(meta)
1147 }
1148 None => None,
1149 Some(Err(err)) => {
1150 println!(
1151 "Serializing task {} failed (meta): {:?}",
1152 self.get_task_description(task_id),
1153 err
1154 );
1155 None
1156 }
1157 };
1158 let data = match data {
1159 Some(Ok(data)) => {
1160 #[cfg(feature = "print_cache_item_size")]
1161 task_cache_stats
1162 .lock()
1163 .entry(self.get_task_description(task_id))
1164 .or_default()
1165 .add_data(data.len());
1166 Some(data)
1167 }
1168 None => None,
1169 Some(Err(err)) => {
1170 println!(
1171 "Serializing task {} failed (data): {:?}",
1172 self.get_task_description(task_id),
1173 err
1174 );
1175 None
1176 }
1177 };
1178 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1179 },
1180 )
1181 .peekable();
1182 iter.peek().is_some().then_some(iter)
1183 })
1184 .collect::<Vec<_>>();
1185
1186 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1187
1188 let mut new_items = false;
1189
1190 if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
1191 new_items = true;
1192 if let Err(err) = self.backing_storage.save_snapshot(
1193 self.session_id,
1194 suspended_operations,
1195 persisted_task_cache_log,
1196 task_snapshots,
1197 ) {
1198 println!("Persisting failed: {err:?}");
1199 return None;
1200 }
1201 #[cfg(feature = "print_cache_item_size")]
1202 {
1203 let mut task_cache_stats = task_cache_stats
1204 .into_inner()
1205 .into_iter()
1206 .collect::<Vec<_>>();
1207 if !task_cache_stats.is_empty() {
1208 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1209 (stats_b.data + stats_b.meta, key_b)
1210 .cmp(&(stats_a.data + stats_a.meta, key_a))
1211 });
1212 println!("Task cache stats:");
1213 for (task_desc, stats) in task_cache_stats {
1214 use std::ops::Div;
1215
1216 use turbo_tasks::util::FormatBytes;
1217
1218 println!(
1219 " {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1220 FormatBytes(stats.data + stats.meta),
1221 FormatBytes(stats.meta),
1222 stats.meta_count,
1223 FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1224 FormatBytes(stats.data),
1225 stats.data_count,
1226 FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1227 );
1228 }
1229 }
1230 }
1231 }
1232
1233 if new_items {
1234 let elapsed = start.elapsed();
1235 if elapsed > Duration::from_secs(10) {
1237 turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1238 "Finished writing to filesystem cache".to_string(),
1239 elapsed,
1240 )));
1241 }
1242 }
1243
1244 Some((snapshot_time, new_items))
1245 }
1246
1247 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1248 if self.should_restore() {
1249 let uncompleted_operations = self
1253 .backing_storage
1254 .uncompleted_operations()
1255 .expect("Failed to get uncompleted operations");
1256 if !uncompleted_operations.is_empty() {
1257 let mut ctx = self.execute_context(turbo_tasks);
1258 for op in uncompleted_operations {
1259 op.execute(&mut ctx);
1260 }
1261 }
1262 }
1263
1264 if self.should_persist() {
1265 let _span = Span::none().entered();
1267 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1268 }
1269 }
1270
1271 fn stopping(&self) {
1272 self.stopping.store(true, Ordering::Release);
1273 self.stopping_event.notify(usize::MAX);
1274 }
1275
1276 #[allow(unused_variables)]
1277 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1278 #[cfg(feature = "verify_aggregation_graph")]
1279 {
1280 self.is_idle.store(false, Ordering::Release);
1281 self.verify_aggregation_graph(turbo_tasks, false);
1282 }
1283 if self.should_persist() {
1284 let _span = tracing::info_span!("persist on stop").entered();
1285 self.snapshot();
1286 }
1287 self.task_cache.drop_contents();
1288 drop_contents(&self.transient_tasks);
1289 self.storage.drop_contents();
1290 if let Err(err) = self.backing_storage.shutdown() {
1291 println!("Shutting down failed: {err}");
1292 }
1293 }
1294
1295 #[allow(unused_variables)]
1296 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1297 self.idle_start_event.notify(usize::MAX);
1298
1299 #[cfg(feature = "verify_aggregation_graph")]
1300 {
1301 use tokio::select;
1302
1303 self.is_idle.store(true, Ordering::Release);
1304 let this = self.clone();
1305 let turbo_tasks = turbo_tasks.pin();
1306 tokio::task::spawn(async move {
1307 select! {
1308 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1309 }
1311 _ = this.idle_end_event.listen() => {
1312 return;
1313 }
1314 }
1315 if !this.is_idle.load(Ordering::Relaxed) {
1316 return;
1317 }
1318 this.verify_aggregation_graph(&*turbo_tasks, true);
1319 });
1320 }
1321 }
1322
1323 fn idle_end(&self) {
1324 #[cfg(feature = "verify_aggregation_graph")]
1325 self.is_idle.store(false, Ordering::Release);
1326 self.idle_end_event.notify(usize::MAX);
1327 }
1328
1329 fn get_or_create_persistent_task(
1330 &self,
1331 task_type: CachedTaskType,
1332 parent_task: Option<TaskId>,
1333 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1334 ) -> TaskId {
1335 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1336 self.track_cache_hit(&task_type);
1337 self.connect_child(parent_task, task_id, turbo_tasks);
1338 return task_id;
1339 }
1340
1341 let check_backing_storage =
1342 self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1343 let tx = check_backing_storage
1344 .then(|| self.backing_storage.start_read_transaction())
1345 .flatten();
1346 let task_id = {
1347 if let Some(task_id) = unsafe {
1349 check_backing_storage
1350 .then(|| {
1351 self.backing_storage
1352 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1353 .expect("Failed to lookup task id")
1354 })
1355 .flatten()
1356 } {
1357 self.track_cache_hit(&task_type);
1358 let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1359 task_id
1360 } else {
1361 let task_type = Arc::new(task_type);
1362 let task_id = self.persisted_task_id_factory.get();
1363 let task_id = if let Err(existing_task_id) =
1364 self.task_cache.try_insert(task_type.clone(), task_id)
1365 {
1366 self.track_cache_hit(&task_type);
1367 unsafe {
1369 self.persisted_task_id_factory.reuse(task_id);
1370 }
1371 existing_task_id
1372 } else {
1373 self.track_cache_miss(&task_type);
1374 task_id
1375 };
1376 if let Some(log) = &self.persisted_task_cache_log {
1377 log.lock(task_id).push((task_type, task_id));
1378 }
1379 task_id
1380 }
1381 };
1382
1383 unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1385
1386 task_id
1387 }
1388
1389 fn get_or_create_transient_task(
1390 &self,
1391 task_type: CachedTaskType,
1392 parent_task: Option<TaskId>,
1393 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1394 ) -> TaskId {
1395 if let Some(parent_task) = parent_task
1396 && !parent_task.is_transient()
1397 {
1398 self.panic_persistent_calling_transient(
1399 self.lookup_task_type(parent_task).as_deref(),
1400 Some(&task_type),
1401 None,
1402 );
1403 }
1404 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1405 self.track_cache_hit(&task_type);
1406 self.connect_child(parent_task, task_id, turbo_tasks);
1407 return task_id;
1408 }
1409
1410 let task_type = Arc::new(task_type);
1411 let task_id = self.transient_task_id_factory.get();
1412 if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1413 self.track_cache_hit(&task_type);
1414 unsafe {
1416 self.transient_task_id_factory.reuse(task_id);
1417 }
1418 self.connect_child(parent_task, existing_task_id, turbo_tasks);
1419 return existing_task_id;
1420 }
1421
1422 self.track_cache_miss(&task_type);
1423 self.connect_child(parent_task, task_id, turbo_tasks);
1424
1425 task_id
1426 }
1427
1428 fn debug_trace_transient_task(
1431 &self,
1432 task_type: &CachedTaskType,
1433 cell_id: Option<CellId>,
1434 ) -> DebugTraceTransientTask {
1435 fn inner_id(
1438 backend: &TurboTasksBackendInner<impl BackingStorage>,
1439 task_id: TaskId,
1440 cell_type_id: Option<ValueTypeId>,
1441 visited_set: &mut FxHashSet<TaskId>,
1442 ) -> DebugTraceTransientTask {
1443 if let Some(task_type) = backend.lookup_task_type(task_id) {
1444 if visited_set.contains(&task_id) {
1445 let task_name = task_type.get_name();
1446 DebugTraceTransientTask::Collapsed {
1447 task_name,
1448 cell_type_id,
1449 }
1450 } else {
1451 inner_cached(backend, &task_type, cell_type_id, visited_set)
1452 }
1453 } else {
1454 DebugTraceTransientTask::Uncached { cell_type_id }
1455 }
1456 }
1457 fn inner_cached(
1458 backend: &TurboTasksBackendInner<impl BackingStorage>,
1459 task_type: &CachedTaskType,
1460 cell_type_id: Option<ValueTypeId>,
1461 visited_set: &mut FxHashSet<TaskId>,
1462 ) -> DebugTraceTransientTask {
1463 let task_name = task_type.get_name();
1464
1465 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1466 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1467 return None;
1471 };
1472 if task_id.is_transient() {
1473 Some(Box::new(inner_id(
1474 backend,
1475 task_id,
1476 cause_self_raw_vc.try_get_type_id(),
1477 visited_set,
1478 )))
1479 } else {
1480 None
1481 }
1482 });
1483 let cause_args = task_type
1484 .arg
1485 .get_raw_vcs()
1486 .into_iter()
1487 .filter_map(|raw_vc| {
1488 let Some(task_id) = raw_vc.try_get_task_id() else {
1489 return None;
1491 };
1492 if !task_id.is_transient() {
1493 return None;
1494 }
1495 Some((task_id, raw_vc.try_get_type_id()))
1496 })
1497 .collect::<IndexSet<_>>() .into_iter()
1499 .map(|(task_id, cell_type_id)| {
1500 inner_id(backend, task_id, cell_type_id, visited_set)
1501 })
1502 .collect();
1503
1504 DebugTraceTransientTask::Cached {
1505 task_name,
1506 cell_type_id,
1507 cause_self,
1508 cause_args,
1509 }
1510 }
1511 inner_cached(
1512 self,
1513 task_type,
1514 cell_id.map(|c| c.type_id),
1515 &mut FxHashSet::default(),
1516 )
1517 }
1518
1519 fn invalidate_task(
1520 &self,
1521 task_id: TaskId,
1522 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1523 ) {
1524 if !self.should_track_dependencies() {
1525 panic!("Dependency tracking is disabled so invalidation is not allowed");
1526 }
1527 operation::InvalidateOperation::run(
1528 smallvec![task_id],
1529 #[cfg(feature = "trace_task_dirty")]
1530 TaskDirtyCause::Invalidator,
1531 self.execute_context(turbo_tasks),
1532 );
1533 }
1534
1535 fn invalidate_tasks(
1536 &self,
1537 tasks: &[TaskId],
1538 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1539 ) {
1540 if !self.should_track_dependencies() {
1541 panic!("Dependency tracking is disabled so invalidation is not allowed");
1542 }
1543 operation::InvalidateOperation::run(
1544 tasks.iter().copied().collect(),
1545 #[cfg(feature = "trace_task_dirty")]
1546 TaskDirtyCause::Unknown,
1547 self.execute_context(turbo_tasks),
1548 );
1549 }
1550
1551 fn invalidate_tasks_set(
1552 &self,
1553 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1554 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1555 ) {
1556 if !self.should_track_dependencies() {
1557 panic!("Dependency tracking is disabled so invalidation is not allowed");
1558 }
1559 operation::InvalidateOperation::run(
1560 tasks.iter().copied().collect(),
1561 #[cfg(feature = "trace_task_dirty")]
1562 TaskDirtyCause::Unknown,
1563 self.execute_context(turbo_tasks),
1564 );
1565 }
1566
1567 fn invalidate_serialization(
1568 &self,
1569 task_id: TaskId,
1570 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1571 ) {
1572 if task_id.is_transient() {
1573 return;
1574 }
1575 let mut ctx = self.execute_context(turbo_tasks);
1576 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1577 task.invalidate_serialization();
1578 }
1579
1580 fn get_task_description(&self, task_id: TaskId) -> String {
1581 self.lookup_task_type(task_id).map_or_else(
1582 || format!("{task_id:?} transient"),
1583 |task_type| task_type.to_string(),
1584 )
1585 }
1586
1587 fn task_execution_canceled(
1588 &self,
1589 task_id: TaskId,
1590 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1591 ) {
1592 let mut ctx = self.execute_context(turbo_tasks);
1593 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1594 if let Some(in_progress) = remove!(task, InProgress) {
1595 match in_progress {
1596 InProgressState::Scheduled {
1597 done_event,
1598 reason: _,
1599 } => done_event.notify(usize::MAX),
1600 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1601 done_event.notify(usize::MAX)
1602 }
1603 InProgressState::Canceled => {}
1604 }
1605 }
1606 task.add_new(CachedDataItem::InProgress {
1607 value: InProgressState::Canceled,
1608 });
1609 }
1610
1611 fn try_start_task_execution(
1612 &self,
1613 task_id: TaskId,
1614 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1615 ) -> Option<TaskExecutionSpec<'_>> {
1616 enum TaskType {
1617 Cached(Arc<CachedTaskType>),
1618 Transient(Arc<TransientTask>),
1619 }
1620 let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1621 (TaskType::Cached(task_type), false)
1622 } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1623 (
1624 TaskType::Transient(task_type.clone()),
1625 matches!(**task_type, TransientTask::Once(_)),
1626 )
1627 } else {
1628 return None;
1629 };
1630 let execution_reason;
1631 {
1632 let mut ctx = self.execute_context(turbo_tasks);
1633 let mut task = ctx.task(task_id, TaskDataCategory::All);
1634 let in_progress = remove!(task, InProgress)?;
1635 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1636 task.add_new(CachedDataItem::InProgress { value: in_progress });
1637 return None;
1638 };
1639 execution_reason = reason;
1640 task.add_new(CachedDataItem::InProgress {
1641 value: InProgressState::InProgress(Box::new(InProgressStateInner {
1642 stale: false,
1643 once_task,
1644 done_event,
1645 session_dependent: false,
1646 marked_as_completed: false,
1647 new_children: Default::default(),
1648 })),
1649 });
1650
1651 enum Collectible {
1653 Current(CollectibleRef, i32),
1654 Outdated(CollectibleRef),
1655 }
1656 let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1657 .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1658 .collect::<Vec<_>>();
1659 for collectible in collectibles {
1660 match collectible {
1661 Collectible::Current(collectible, value) => {
1662 let _ =
1663 task.insert(CachedDataItem::OutdatedCollectible { collectible, value });
1664 }
1665 Collectible::Outdated(collectible) => {
1666 if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1667 task.remove(&CachedDataItemKey::OutdatedCollectible { collectible });
1668 }
1669 }
1670 }
1671 }
1672
1673 if self.should_track_dependencies() {
1674 enum Dep {
1676 CurrentCell(CellRef),
1677 CurrentOutput(TaskId),
1678 OutdatedCell(CellRef),
1679 OutdatedOutput(TaskId),
1680 }
1681 let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1682 .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1683 .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1684 .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1685 .collect::<Vec<_>>();
1686 for dep in dependencies {
1687 match dep {
1688 Dep::CurrentCell(cell) => {
1689 let _ = task.add(CachedDataItem::OutdatedCellDependency {
1690 target: cell,
1691 value: (),
1692 });
1693 }
1694 Dep::CurrentOutput(output) => {
1695 let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1696 target: output,
1697 value: (),
1698 });
1699 }
1700 Dep::OutdatedCell(cell) => {
1701 if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1702 task.remove(&CachedDataItemKey::OutdatedCellDependency {
1703 target: cell,
1704 });
1705 }
1706 }
1707 Dep::OutdatedOutput(output) => {
1708 if !task
1709 .has_key(&CachedDataItemKey::OutputDependency { target: output })
1710 {
1711 task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1712 target: output,
1713 });
1714 }
1715 }
1716 }
1717 }
1718 }
1719 }
1720
1721 let (span, future) = match task_type {
1722 TaskType::Cached(task_type) => {
1723 let CachedTaskType {
1724 native_fn,
1725 this,
1726 arg,
1727 } = &*task_type;
1728 (
1729 native_fn.span(task_id.persistence(), execution_reason),
1730 native_fn.execute(*this, &**arg),
1731 )
1732 }
1733 TaskType::Transient(task_type) => {
1734 let span = tracing::trace_span!("turbo_tasks::root_task");
1735 let future = match &*task_type {
1736 TransientTask::Root(f) => f(),
1737 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1738 };
1739 (span, future)
1740 }
1741 };
1742 Some(TaskExecutionSpec { future, span })
1743 }
1744
1745 fn task_execution_completed(
1746 &self,
1747 task_id: TaskId,
1748 result: Result<RawVc, TurboTasksExecutionError>,
1749 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1750 stateful: bool,
1751 has_invalidator: bool,
1752 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1753 ) -> bool {
1754 #[cfg(not(feature = "trace_task_details"))]
1769 let _span = tracing::trace_span!("task execution completed").entered();
1770 #[cfg(feature = "trace_task_details")]
1771 let span = tracing::trace_span!(
1772 "task execution completed",
1773 task_id = display(task_id),
1774 result = match result.as_ref() {
1775 Ok(value) => display(either::Either::Left(value)),
1776 Err(err) => display(either::Either::Right(err)),
1777 },
1778 immutable = tracing::field::Empty,
1779 new_output = tracing::field::Empty,
1780 output_dependents = tracing::field::Empty,
1781 stale = tracing::field::Empty,
1782 )
1783 .entered();
1784 let mut ctx = self.execute_context(turbo_tasks);
1785
1786 let Some(TaskExecutionCompletePrepareResult {
1787 new_children,
1788 mut removed_data,
1789 is_now_immutable,
1790 #[cfg(feature = "verify_determinism")]
1791 no_output_set,
1792 new_output,
1793 output_dependent_tasks,
1794 }) = self.task_execution_completed_prepare(
1795 &mut ctx,
1796 #[cfg(feature = "trace_task_details")]
1797 &span,
1798 task_id,
1799 result,
1800 cell_counters,
1801 stateful,
1802 has_invalidator,
1803 )
1804 else {
1805 #[cfg(feature = "trace_task_details")]
1807 span.record("stale", "true");
1808 return true;
1809 };
1810
1811 #[cfg(feature = "trace_task_details")]
1812 span.record("new_output", new_output.is_some());
1813 #[cfg(feature = "trace_task_details")]
1814 span.record("output_dependents", output_dependent_tasks.len());
1815
1816 if !output_dependent_tasks.is_empty() {
1821 self.task_execution_completed_invalidate_output_dependent(
1822 &mut ctx,
1823 task_id,
1824 output_dependent_tasks,
1825 );
1826 }
1827
1828 let has_new_children = !new_children.is_empty();
1829
1830 if has_new_children {
1831 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1832 }
1833
1834 if has_new_children
1835 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1836 {
1837 #[cfg(feature = "trace_task_details")]
1839 span.record("stale", "true");
1840 return true;
1841 }
1842
1843 if self.task_execution_completed_finish(
1844 &mut ctx,
1845 task_id,
1846 #[cfg(feature = "verify_determinism")]
1847 no_output_set,
1848 new_output,
1849 &mut removed_data,
1850 is_now_immutable,
1851 ) {
1852 #[cfg(feature = "trace_task_details")]
1854 span.record("stale", "true");
1855 return true;
1856 }
1857
1858 drop(removed_data);
1859
1860 self.task_execution_completed_cleanup(&mut ctx, task_id);
1861
1862 false
1863 }
1864
1865 fn task_execution_completed_prepare(
1866 &self,
1867 ctx: &mut impl ExecuteContext<'_>,
1868 #[cfg(feature = "trace_task_details")] span: &Span,
1869 task_id: TaskId,
1870 result: Result<RawVc, TurboTasksExecutionError>,
1871 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1872 stateful: bool,
1873 has_invalidator: bool,
1874 ) -> Option<TaskExecutionCompletePrepareResult> {
1875 let mut task = ctx.task(task_id, TaskDataCategory::All);
1876 let Some(in_progress) = get_mut!(task, InProgress) else {
1877 panic!("Task execution completed, but task is not in progress: {task:#?}");
1878 };
1879 if matches!(in_progress, InProgressState::Canceled) {
1880 return Some(TaskExecutionCompletePrepareResult {
1881 new_children: Default::default(),
1882 removed_data: Default::default(),
1883 is_now_immutable: false,
1884 #[cfg(feature = "verify_determinism")]
1885 no_output_set: false,
1886 new_output: None,
1887 output_dependent_tasks: Default::default(),
1888 });
1889 }
1890 let &mut InProgressState::InProgress(box InProgressStateInner {
1891 stale,
1892 ref mut new_children,
1893 session_dependent,
1894 ..
1895 }) = in_progress
1896 else {
1897 panic!("Task execution completed, but task is not in progress: {task:#?}");
1898 };
1899
1900 #[cfg(not(feature = "no_fast_stale"))]
1902 if stale {
1903 let Some(InProgressState::InProgress(box InProgressStateInner {
1904 done_event,
1905 mut new_children,
1906 ..
1907 })) = remove!(task, InProgress)
1908 else {
1909 unreachable!();
1910 };
1911 task.add_new(CachedDataItem::InProgress {
1912 value: InProgressState::Scheduled {
1913 done_event,
1914 reason: TaskExecutionReason::Stale,
1915 },
1916 });
1917 for task in iter_many!(task, Child { task } => task) {
1920 new_children.remove(&task);
1921 }
1922 drop(task);
1923
1924 AggregationUpdateQueue::run(
1927 AggregationUpdateJob::DecreaseActiveCounts {
1928 task_ids: new_children.into_iter().collect(),
1929 },
1930 ctx,
1931 );
1932 return None;
1933 }
1934
1935 let mut new_children = take(new_children);
1937
1938 if stateful {
1940 let _ = task.add(CachedDataItem::Stateful { value: () });
1941 }
1942
1943 if has_invalidator {
1945 let _ = task.add(CachedDataItem::HasInvalidator { value: () });
1946 }
1947
1948 let old_counters: FxHashMap<_, _> =
1950 get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1951 let mut counters_to_remove = old_counters.clone();
1952 for (&cell_type, &max_index) in cell_counters.iter() {
1953 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1954 if old_max_index != max_index {
1955 task.insert(CachedDataItem::CellTypeMaxIndex {
1956 cell_type,
1957 value: max_index,
1958 });
1959 }
1960 } else {
1961 task.add_new(CachedDataItem::CellTypeMaxIndex {
1962 cell_type,
1963 value: max_index,
1964 });
1965 }
1966 }
1967 for (cell_type, _) in counters_to_remove {
1968 task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1969 }
1970
1971 let mut queue = AggregationUpdateQueue::new();
1972
1973 let mut removed_data = Vec::new();
1974 let mut old_edges = Vec::new();
1975
1976 let has_children = !new_children.is_empty();
1977 let is_immutable = task.is_immutable();
1978 let task_dependencies_for_immutable =
1979 if !is_immutable
1981 && !session_dependent
1983 && !task.has_key(&CachedDataItemKey::HasInvalidator {})
1985 && count!(task, CollectiblesDependency) == 0
1987 {
1988 Some(
1989 iter_many!(task, OutputDependency { target } => target)
1991 .chain(iter_many!(task, CellDependency { target } => target.task))
1992 .collect::<FxHashSet<_>>(),
1993 )
1994 } else {
1995 None
1996 };
1997
1998 if has_children {
1999 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2001
2002 old_edges.extend(
2004 iter_many!(task, Child { task } => task)
2005 .filter(|task| !new_children.remove(task))
2006 .map(OutdatedEdge::Child),
2007 );
2008 } else {
2009 old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
2010 }
2011
2012 if task_id.is_transient() || iter_many!(task, CellData { cell }
2017 if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
2018 ).count() > 0 {
2019 removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
2020 matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
2021 .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
2022 }));
2023 }
2024
2025 old_edges.extend(
2026 task.iter(CachedDataItemType::OutdatedCollectible)
2027 .filter_map(|(key, value)| match (key, value) {
2028 (
2029 CachedDataItemKey::OutdatedCollectible { collectible },
2030 CachedDataItemValueRef::OutdatedCollectible { value },
2031 ) => Some(OutdatedEdge::Collectible(collectible, *value)),
2032 _ => None,
2033 }),
2034 );
2035
2036 if self.should_track_dependencies() {
2037 old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
2038 old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
2039 old_edges.extend(
2040 iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
2041 |(cell, task)| {
2042 if cell_counters
2043 .get(&cell.type_id)
2044 .is_none_or(|start_index| cell.index >= *start_index)
2045 && let Some(old_counter) = old_counters.get(&cell.type_id)
2046 && cell.index < *old_counter
2047 {
2048 return Some(OutdatedEdge::RemovedCellDependent {
2049 task_id: task,
2050 #[cfg(feature = "trace_task_dirty")]
2051 value_type_id: cell.type_id,
2052 });
2053 }
2054 None
2055 },
2056 ),
2057 );
2058 }
2059
2060 let current_output = get!(task, Output);
2062 #[cfg(feature = "verify_determinism")]
2063 let no_output_set = current_output.is_none();
2064 let new_output = match result {
2065 Ok(RawVc::TaskOutput(output_task_id)) => {
2066 if let Some(OutputValue::Output(current_task_id)) = current_output
2067 && *current_task_id == output_task_id
2068 {
2069 None
2070 } else {
2071 Some(OutputValue::Output(output_task_id))
2072 }
2073 }
2074 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2075 if let Some(OutputValue::Cell(CellRef {
2076 task: current_task_id,
2077 cell: current_cell,
2078 })) = current_output
2079 && *current_task_id == output_task_id
2080 && *current_cell == cell
2081 {
2082 None
2083 } else {
2084 Some(OutputValue::Cell(CellRef {
2085 task: output_task_id,
2086 cell,
2087 }))
2088 }
2089 }
2090 Ok(RawVc::LocalOutput(..)) => {
2091 panic!("Non-local tasks must not return a local Vc");
2092 }
2093 Err(err) => {
2094 if let Some(OutputValue::Error(old_error)) = current_output
2095 && old_error == &err
2096 {
2097 None
2098 } else {
2099 Some(OutputValue::Error(err))
2100 }
2101 }
2102 };
2103 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2104 if new_output.is_some() && ctx.should_track_dependencies() {
2106 output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
2107 }
2108
2109 drop(task);
2110
2111 let mut is_now_immutable = false;
2113 if let Some(dependencies) = task_dependencies_for_immutable
2114 && dependencies
2115 .iter()
2116 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).is_immutable())
2117 {
2118 is_now_immutable = true;
2119 }
2120 #[cfg(feature = "trace_task_details")]
2121 span.record("immutable", is_immutable || is_now_immutable);
2122
2123 if !queue.is_empty() || !old_edges.is_empty() {
2124 #[cfg(feature = "trace_task_completion")]
2125 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2126 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2130 }
2131
2132 Some(TaskExecutionCompletePrepareResult {
2133 new_children,
2134 removed_data,
2135 is_now_immutable,
2136 #[cfg(feature = "verify_determinism")]
2137 no_output_set,
2138 new_output,
2139 output_dependent_tasks,
2140 })
2141 }
2142
2143 fn task_execution_completed_invalidate_output_dependent(
2144 &self,
2145 ctx: &mut impl ExecuteContext<'_>,
2146 task_id: TaskId,
2147 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2148 ) {
2149 debug_assert!(!output_dependent_tasks.is_empty());
2150
2151 let mut queue = AggregationUpdateQueue::new();
2152 for dependent_task_id in output_dependent_tasks {
2153 #[cfg(feature = "trace_task_output_dependencies")]
2154 let span = tracing::trace_span!(
2155 "invalidate output dependency",
2156 task = %task_id,
2157 dependent_task = %dependent_task_id,
2158 result = tracing::field::Empty,
2159 )
2160 .entered();
2161 if ctx.is_once_task(dependent_task_id) {
2162 #[cfg(feature = "trace_task_output_dependencies")]
2164 span.record("result", "once task");
2165 continue;
2166 }
2167 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2168 if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
2169 #[cfg(feature = "trace_task_output_dependencies")]
2172 span.record("result", "outdated dependency");
2173 continue;
2174 }
2175 if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
2176 #[cfg(feature = "trace_task_output_dependencies")]
2179 span.record("result", "no backward dependency");
2180 continue;
2181 }
2182 make_task_dirty_internal(
2183 dependent,
2184 dependent_task_id,
2185 true,
2186 #[cfg(feature = "trace_task_dirty")]
2187 TaskDirtyCause::OutputChange { task_id },
2188 &mut queue,
2189 ctx,
2190 );
2191 #[cfg(feature = "trace_task_output_dependencies")]
2192 span.record("result", "marked dirty");
2193 }
2194
2195 queue.execute(ctx);
2196 }
2197
2198 fn task_execution_completed_unfinished_children_dirty(
2199 &self,
2200 ctx: &mut impl ExecuteContext<'_>,
2201 new_children: &FxHashSet<TaskId>,
2202 ) {
2203 debug_assert!(!new_children.is_empty());
2204
2205 let mut queue = AggregationUpdateQueue::new();
2206 for &child_id in new_children {
2207 let child_task = ctx.task(child_id, TaskDataCategory::Meta);
2208 if !child_task.has_key(&CachedDataItemKey::Output {}) {
2209 make_task_dirty_internal(
2210 child_task,
2211 child_id,
2212 false,
2213 #[cfg(feature = "trace_task_dirty")]
2214 TaskDirtyCause::InitialDirty,
2215 &mut queue,
2216 ctx,
2217 );
2218 }
2219 }
2220
2221 queue.execute(ctx);
2222 }
2223
2224 fn task_execution_completed_connect(
2225 &self,
2226 ctx: &mut impl ExecuteContext<'_>,
2227 task_id: TaskId,
2228 new_children: FxHashSet<TaskId>,
2229 ) -> bool {
2230 debug_assert!(!new_children.is_empty());
2231
2232 let mut task = ctx.task(task_id, TaskDataCategory::All);
2233 let Some(in_progress) = get!(task, InProgress) else {
2234 panic!("Task execution completed, but task is not in progress: {task:#?}");
2235 };
2236 if matches!(in_progress, InProgressState::Canceled) {
2237 return false;
2239 }
2240 let InProgressState::InProgress(box InProgressStateInner {
2241 #[cfg(not(feature = "no_fast_stale"))]
2242 stale,
2243 ..
2244 }) = in_progress
2245 else {
2246 panic!("Task execution completed, but task is not in progress: {task:#?}");
2247 };
2248
2249 #[cfg(not(feature = "no_fast_stale"))]
2251 if *stale {
2252 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2253 remove!(task, InProgress)
2254 else {
2255 unreachable!();
2256 };
2257 task.add_new(CachedDataItem::InProgress {
2258 value: InProgressState::Scheduled {
2259 done_event,
2260 reason: TaskExecutionReason::Stale,
2261 },
2262 });
2263 drop(task);
2264
2265 AggregationUpdateQueue::run(
2268 AggregationUpdateJob::DecreaseActiveCounts {
2269 task_ids: new_children.into_iter().collect(),
2270 },
2271 ctx,
2272 );
2273 return true;
2274 }
2275
2276 let has_active_count = ctx.should_track_activeness()
2277 && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
2278 connect_children(
2279 ctx,
2280 task_id,
2281 task,
2282 new_children,
2283 has_active_count,
2284 ctx.should_track_activeness(),
2285 );
2286
2287 false
2288 }
2289
2290 fn task_execution_completed_finish(
2291 &self,
2292 ctx: &mut impl ExecuteContext<'_>,
2293 task_id: TaskId,
2294 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2295 new_output: Option<OutputValue>,
2296 removed_data: &mut Vec<CachedDataItem>,
2297 is_now_immutable: bool,
2298 ) -> bool {
2299 let mut task = ctx.task(task_id, TaskDataCategory::All);
2300 let Some(in_progress) = remove!(task, InProgress) else {
2301 panic!("Task execution completed, but task is not in progress: {task:#?}");
2302 };
2303 if matches!(in_progress, InProgressState::Canceled) {
2304 return false;
2306 }
2307 let InProgressState::InProgress(box InProgressStateInner {
2308 done_event,
2309 once_task: _,
2310 stale,
2311 session_dependent,
2312 marked_as_completed: _,
2313 new_children,
2314 }) = in_progress
2315 else {
2316 panic!("Task execution completed, but task is not in progress: {task:#?}");
2317 };
2318 debug_assert!(new_children.is_empty());
2319
2320 if stale {
2322 task.add_new(CachedDataItem::InProgress {
2323 value: InProgressState::Scheduled {
2324 done_event,
2325 reason: TaskExecutionReason::Stale,
2326 },
2327 });
2328 return true;
2329 }
2330
2331 let mut old_content = None;
2333 if let Some(value) = new_output {
2334 old_content = task.insert(CachedDataItem::Output { value });
2335 }
2336
2337 if is_now_immutable {
2340 let _ = task.add(CachedDataItem::Immutable { value: () });
2341 }
2342
2343 removed_data.extend(task.extract_if(
2345 CachedDataItemType::InProgressCell,
2346 |key, value| match (key, value) {
2347 (
2348 CachedDataItemKey::InProgressCell { .. },
2349 CachedDataItemValueRef::InProgressCell { value },
2350 ) => {
2351 value.event.notify(usize::MAX);
2352 true
2353 }
2354 _ => false,
2355 },
2356 ));
2357
2358 let old_dirty_state = get!(task, Dirty).copied();
2360
2361 let new_dirty_state = if session_dependent {
2362 Some(DirtyState {
2363 clean_in_session: Some(self.session_id),
2364 })
2365 } else {
2366 None
2367 };
2368
2369 let dirty_changed = old_dirty_state != new_dirty_state;
2370 let data_update = if dirty_changed {
2371 if let Some(new_dirty_state) = new_dirty_state {
2372 task.insert(CachedDataItem::Dirty {
2373 value: new_dirty_state,
2374 });
2375 } else {
2376 task.remove(&CachedDataItemKey::Dirty {});
2377 }
2378
2379 if old_dirty_state.is_some() || new_dirty_state.is_some() {
2380 let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
2381 .cloned()
2382 .unwrap_or_default();
2383 if let Some(old_dirty_state) = old_dirty_state {
2384 dirty_containers.update_with_dirty_state(&old_dirty_state);
2385 }
2386 let aggregated_update = match (old_dirty_state, new_dirty_state) {
2387 (None, None) => unreachable!(),
2388 (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
2389 (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
2390 (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
2391 };
2392 if !aggregated_update.is_zero() {
2393 if aggregated_update.get(self.session_id) < 0
2394 && let Some(activeness_state) = get_mut!(task, Activeness)
2395 {
2396 activeness_state.all_clean_event.notify(usize::MAX);
2397 activeness_state.unset_active_until_clean();
2398 if activeness_state.is_empty() {
2399 task.remove(&CachedDataItemKey::Activeness {});
2400 }
2401 }
2402 AggregationUpdateJob::data_update(
2403 &mut task,
2404 AggregatedDataUpdate::new()
2405 .dirty_container_update(task_id, aggregated_update),
2406 )
2407 } else {
2408 None
2409 }
2410 } else {
2411 None
2412 }
2413 } else {
2414 None
2415 };
2416
2417 #[cfg(feature = "verify_determinism")]
2418 let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2419 #[cfg(not(feature = "verify_determinism"))]
2420 let reschedule = false;
2421 if reschedule {
2422 task.add_new(CachedDataItem::InProgress {
2423 value: InProgressState::Scheduled {
2424 done_event,
2425 reason: TaskExecutionReason::Stale,
2426 },
2427 });
2428 drop(task);
2429 } else {
2430 drop(task);
2431
2432 done_event.notify(usize::MAX);
2434 }
2435
2436 drop(old_content);
2437
2438 if let Some(data_update) = data_update {
2439 AggregationUpdateQueue::run(data_update, ctx);
2440 }
2441
2442 reschedule
2443 }
2444
2445 fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
2446 let mut task = ctx.task(task_id, TaskDataCategory::All);
2447 task.shrink_to_fit(CachedDataItemType::CellData);
2448 task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
2449 task.shrink_to_fit(CachedDataItemType::CellDependency);
2450 task.shrink_to_fit(CachedDataItemType::OutputDependency);
2451 task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
2452 drop(task);
2453 }
2454
2455 fn run_backend_job<'a>(
2456 self: &'a Arc<Self>,
2457 job: TurboTasksBackendJob,
2458 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2459 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2460 Box::pin(async move {
2461 match job {
2462 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2463 debug_assert!(self.should_persist());
2464
2465 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2466 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2467 loop {
2468 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2469 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2470 const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
2471
2472 let (time, mut reason) =
2473 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2474 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2475 } else {
2476 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2477 };
2478
2479 let until = last_snapshot + time;
2480 if until > Instant::now() {
2481 let mut stop_listener = self.stopping_event.listen();
2482 if self.stopping.load(Ordering::Acquire) {
2483 return;
2484 }
2485 let mut idle_start_listener = self.idle_start_event.listen();
2486 let mut idle_end_listener = self.idle_end_event.listen();
2487 let mut idle_time = if turbo_tasks.is_idle() {
2488 Instant::now() + IDLE_TIMEOUT
2489 } else {
2490 far_future()
2491 };
2492 loop {
2493 tokio::select! {
2494 _ = &mut stop_listener => {
2495 return;
2496 },
2497 _ = &mut idle_start_listener => {
2498 idle_time = Instant::now() + IDLE_TIMEOUT;
2499 idle_start_listener = self.idle_start_event.listen()
2500 },
2501 _ = &mut idle_end_listener => {
2502 idle_time = until + IDLE_TIMEOUT;
2503 idle_end_listener = self.idle_end_event.listen()
2504 },
2505 _ = tokio::time::sleep_until(until) => {
2506 break;
2507 },
2508 _ = tokio::time::sleep_until(idle_time) => {
2509 if turbo_tasks.is_idle() {
2510 reason = "idle timeout";
2511 break;
2512 }
2513 },
2514 }
2515 }
2516 }
2517
2518 let _span = info_span!("persist", reason = reason).entered();
2519 let this = self.clone();
2520 let snapshot = this.snapshot();
2521 if let Some((snapshot_start, new_data)) = snapshot {
2522 last_snapshot = snapshot_start;
2523 if new_data {
2524 continue;
2525 }
2526 let last_snapshot = last_snapshot.duration_since(self.start_time);
2527 self.last_snapshot.store(
2528 last_snapshot.as_millis().try_into().unwrap(),
2529 Ordering::Relaxed,
2530 );
2531
2532 let _span = Span::none().entered();
2533 turbo_tasks.schedule_backend_background_job(
2534 TurboTasksBackendJob::FollowUpSnapshot,
2535 );
2536 return;
2537 }
2538 }
2539 }
2540 TurboTasksBackendJob::Prefetch { data, range } => {
2541 let range = if let Some(range) = range {
2542 range
2543 } else {
2544 if data.len() > 128 {
2545 let chunk_size = good_chunk_size(data.len());
2546 let chunks = data.len().div_ceil(chunk_size);
2547 for i in 0..chunks {
2548 turbo_tasks.schedule_backend_background_job(
2549 TurboTasksBackendJob::Prefetch {
2550 data: data.clone(),
2551 range: Some(
2552 (i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2553 ),
2554 },
2555 );
2556 }
2557 return;
2558 }
2559 0..data.len()
2560 };
2561
2562 let _span = trace_span!("prefetching").entered();
2563 let mut ctx = self.execute_context(turbo_tasks);
2564 for i in range {
2565 let (&task, &with_data) = data.get_index(i).unwrap();
2566 let category = if with_data {
2567 TaskDataCategory::All
2568 } else {
2569 TaskDataCategory::Meta
2570 };
2571 drop(ctx.task(task, category));
2573 }
2574 }
2575 }
2576 })
2577 }
2578
2579 fn try_read_own_task_cell(
2580 &self,
2581 task_id: TaskId,
2582 cell: CellId,
2583 _options: ReadCellOptions,
2584 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2585 ) -> Result<TypedCellContent> {
2586 let mut ctx = self.execute_context(turbo_tasks);
2587 let task = ctx.task(task_id, TaskDataCategory::Data);
2588 if let Some(content) = get!(task, CellData { cell }) {
2589 Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
2590 } else {
2591 Ok(CellContent(None).into_typed(cell.type_id))
2592 }
2593 }
2594
2595 fn read_task_collectibles(
2596 &self,
2597 task_id: TaskId,
2598 collectible_type: TraitTypeId,
2599 reader_id: Option<TaskId>,
2600 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2601 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2602 let mut ctx = self.execute_context(turbo_tasks);
2603 let mut collectibles = AutoMap::default();
2604 {
2605 let mut task = ctx.task(task_id, TaskDataCategory::All);
2606 loop {
2608 let aggregation_number = get_aggregation_number(&task);
2609 if is_root_node(aggregation_number) {
2610 break;
2611 }
2612 drop(task);
2613 AggregationUpdateQueue::run(
2614 AggregationUpdateJob::UpdateAggregationNumber {
2615 task_id,
2616 base_aggregation_number: u32::MAX,
2617 distance: None,
2618 },
2619 &mut ctx,
2620 );
2621 task = ctx.task(task_id, TaskDataCategory::All);
2622 }
2623 for collectible in iter_many!(
2624 task,
2625 AggregatedCollectible {
2626 collectible
2627 } count if collectible.collectible_type == collectible_type && *count > 0 => {
2628 collectible.cell
2629 }
2630 ) {
2631 *collectibles
2632 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2633 .or_insert(0) += 1;
2634 }
2635 for (collectible, count) in iter_many!(
2636 task,
2637 Collectible {
2638 collectible
2639 } count if collectible.collectible_type == collectible_type => {
2640 (collectible.cell, *count)
2641 }
2642 ) {
2643 *collectibles
2644 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2645 .or_insert(0) += count;
2646 }
2647 if let Some(reader_id) = reader_id {
2648 let _ = task.add(CachedDataItem::CollectiblesDependent {
2649 collectible_type,
2650 task: reader_id,
2651 value: (),
2652 });
2653 }
2654 }
2655 if let Some(reader_id) = reader_id {
2656 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2657 let target = CollectiblesRef {
2658 task: task_id,
2659 collectible_type,
2660 };
2661 if reader
2662 .remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target })
2663 .is_none()
2664 {
2665 let _ = reader.add(CachedDataItem::CollectiblesDependency { target, value: () });
2666 }
2667 }
2668 collectibles
2669 }
2670
2671 fn emit_collectible(
2672 &self,
2673 collectible_type: TraitTypeId,
2674 collectible: RawVc,
2675 task_id: TaskId,
2676 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2677 ) {
2678 self.assert_valid_collectible(task_id, collectible);
2679
2680 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2681 panic!("Collectibles need to be resolved");
2682 };
2683 let cell = CellRef {
2684 task: collectible_task,
2685 cell,
2686 };
2687 operation::UpdateCollectibleOperation::run(
2688 task_id,
2689 CollectibleRef {
2690 collectible_type,
2691 cell,
2692 },
2693 1,
2694 self.execute_context(turbo_tasks),
2695 );
2696 }
2697
2698 fn unemit_collectible(
2699 &self,
2700 collectible_type: TraitTypeId,
2701 collectible: RawVc,
2702 count: u32,
2703 task_id: TaskId,
2704 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2705 ) {
2706 self.assert_valid_collectible(task_id, collectible);
2707
2708 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2709 panic!("Collectibles need to be resolved");
2710 };
2711 let cell = CellRef {
2712 task: collectible_task,
2713 cell,
2714 };
2715 operation::UpdateCollectibleOperation::run(
2716 task_id,
2717 CollectibleRef {
2718 collectible_type,
2719 cell,
2720 },
2721 -(i32::try_from(count).unwrap()),
2722 self.execute_context(turbo_tasks),
2723 );
2724 }
2725
2726 fn update_task_cell(
2727 &self,
2728 task_id: TaskId,
2729 cell: CellId,
2730 content: CellContent,
2731 verification_mode: VerificationMode,
2732 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2733 ) {
2734 operation::UpdateCellOperation::run(
2735 task_id,
2736 cell,
2737 content,
2738 verification_mode,
2739 self.execute_context(turbo_tasks),
2740 );
2741 }
2742
2743 fn mark_own_task_as_session_dependent(
2744 &self,
2745 task_id: TaskId,
2746 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2747 ) {
2748 if !self.should_track_dependencies() {
2749 return;
2751 }
2752 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2753 let mut ctx = self.execute_context(turbo_tasks);
2754 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2755 let aggregation_number = get_aggregation_number(&task);
2756 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2757 drop(task);
2758 AggregationUpdateQueue::run(
2761 AggregationUpdateJob::UpdateAggregationNumber {
2762 task_id,
2763 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2764 distance: None,
2765 },
2766 &mut ctx,
2767 );
2768 task = ctx.task(task_id, TaskDataCategory::Meta);
2769 }
2770 if let Some(InProgressState::InProgress(box InProgressStateInner {
2771 session_dependent,
2772 ..
2773 })) = get_mut!(task, InProgress)
2774 {
2775 *session_dependent = true;
2776 }
2777 }
2778
2779 fn mark_own_task_as_finished(
2780 &self,
2781 task: TaskId,
2782 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2783 ) {
2784 let mut ctx = self.execute_context(turbo_tasks);
2785 let mut task = ctx.task(task, TaskDataCategory::Data);
2786 if let Some(InProgressState::InProgress(box InProgressStateInner {
2787 marked_as_completed,
2788 ..
2789 })) = get_mut!(task, InProgress)
2790 {
2791 *marked_as_completed = true;
2792 }
2797 }
2798
2799 fn set_own_task_aggregation_number(
2800 &self,
2801 task: TaskId,
2802 aggregation_number: u32,
2803 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2804 ) {
2805 let mut ctx = self.execute_context(turbo_tasks);
2806 AggregationUpdateQueue::run(
2807 AggregationUpdateJob::UpdateAggregationNumber {
2808 task_id: task,
2809 base_aggregation_number: aggregation_number,
2810 distance: None,
2811 },
2812 &mut ctx,
2813 );
2814 }
2815
2816 fn connect_task(
2817 &self,
2818 task: TaskId,
2819 parent_task: Option<TaskId>,
2820 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2821 ) {
2822 self.assert_not_persistent_calling_transient(parent_task, task, None);
2823 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2824 }
2825
2826 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2827 let task_id = self.transient_task_id_factory.get();
2828 let root_type = match task_type {
2829 TransientTaskType::Root(_) => RootType::RootTask,
2830 TransientTaskType::Once(_) => RootType::OnceTask,
2831 };
2832 self.transient_tasks.insert(
2833 task_id,
2834 Arc::new(match task_type {
2835 TransientTaskType::Root(f) => TransientTask::Root(f),
2836 TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2837 }),
2838 );
2839 {
2840 let mut task = self.storage.access_mut(task_id);
2841 task.add(CachedDataItem::AggregationNumber {
2842 value: AggregationNumber {
2843 base: u32::MAX,
2844 distance: 0,
2845 effective: u32::MAX,
2846 },
2847 });
2848 if self.should_track_activeness() {
2849 task.add(CachedDataItem::Activeness {
2850 value: ActivenessState::new_root(root_type, task_id),
2851 });
2852 }
2853 task.add(CachedDataItem::new_scheduled(
2854 TaskExecutionReason::Initial,
2855 move || {
2856 move || match root_type {
2857 RootType::RootTask => "Root Task".to_string(),
2858 RootType::OnceTask => "Once Task".to_string(),
2859 }
2860 },
2861 ));
2862 }
2863 #[cfg(feature = "verify_aggregation_graph")]
2864 self.root_tasks.lock().insert(task_id);
2865 task_id
2866 }
2867
2868 fn dispose_root_task(
2869 &self,
2870 task_id: TaskId,
2871 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2872 ) {
2873 #[cfg(feature = "verify_aggregation_graph")]
2874 self.root_tasks.lock().remove(&task_id);
2875
2876 let mut ctx = self.execute_context(turbo_tasks);
2877 let mut task = ctx.task(task_id, TaskDataCategory::All);
2878 let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2879 let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2880 .map_or(false, |dirty_containers| {
2881 dirty_containers.get(self.session_id) > 0
2882 });
2883 if is_dirty || has_dirty_containers {
2884 if let Some(activeness_state) = get_mut!(task, Activeness) {
2885 activeness_state.unset_root_type();
2887 activeness_state.set_active_until_clean();
2888 };
2889 } else if let Some(activeness_state) = remove!(task, Activeness) {
2890 activeness_state.all_clean_event.notify(usize::MAX);
2893 }
2894 }
2895
2896 #[cfg(feature = "verify_aggregation_graph")]
2897 fn verify_aggregation_graph(
2898 &self,
2899 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2900 idle: bool,
2901 ) {
2902 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2903 return;
2904 }
2905 use std::{collections::VecDeque, env, io::stdout};
2906
2907 use crate::backend::operation::{get_uppers, is_aggregating_node};
2908
2909 let mut ctx = self.execute_context(turbo_tasks);
2910 let root_tasks = self.root_tasks.lock().clone();
2911
2912 for task_id in root_tasks.into_iter() {
2913 let mut queue = VecDeque::new();
2914 let mut visited = FxHashSet::default();
2915 let mut aggregated_nodes = FxHashSet::default();
2916 let mut collectibles = FxHashMap::default();
2917 let root_task_id = task_id;
2918 visited.insert(task_id);
2919 aggregated_nodes.insert(task_id);
2920 queue.push_back(task_id);
2921 let mut counter = 0;
2922 while let Some(task_id) = queue.pop_front() {
2923 counter += 1;
2924 if counter % 100000 == 0 {
2925 println!(
2926 "queue={}, visited={}, aggregated_nodes={}",
2927 queue.len(),
2928 visited.len(),
2929 aggregated_nodes.len()
2930 );
2931 }
2932 let task = ctx.task(task_id, TaskDataCategory::All);
2933 if idle && !self.is_idle.load(Ordering::Relaxed) {
2934 return;
2935 }
2936
2937 let uppers = get_uppers(&task);
2938 if task_id != root_task_id
2939 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2940 {
2941 panic!(
2942 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2943 {:?})",
2944 task_id,
2945 ctx.get_task_description(task_id),
2946 uppers
2947 );
2948 }
2949
2950 let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2951 for collectible in aggregated_collectibles {
2952 collectibles
2953 .entry(collectible)
2954 .or_insert_with(|| (false, Vec::new()))
2955 .1
2956 .push(task_id);
2957 }
2958
2959 let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2960 for collectible in own_collectibles {
2961 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2962 *flag = true
2963 } else {
2964 panic!(
2965 "Task {} has a collectible {:?} that is not in any upper task",
2966 task_id, collectible
2967 );
2968 }
2969 }
2970
2971 let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2972 let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2973 .is_some_and(|count| count.get(self.session_id) > 0);
2974 let should_be_in_upper = is_dirty || has_dirty_container;
2975
2976 let aggregation_number = get_aggregation_number(&task);
2977 if is_aggregating_node(aggregation_number) {
2978 aggregated_nodes.insert(task_id);
2979 }
2980 for child_id in iter_many!(task, Child { task } => task) {
2987 if visited.insert(child_id) {
2989 queue.push_back(child_id);
2990 }
2991 }
2992 drop(task);
2993
2994 if should_be_in_upper {
2995 for upper_id in uppers {
2996 let task = ctx.task(task_id, TaskDataCategory::All);
2997 let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2998 .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2999 if !in_upper {
3000 panic!(
3001 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3002 ({})",
3003 task_id,
3004 ctx.get_task_description(task_id),
3005 upper_id,
3006 ctx.get_task_description(upper_id)
3007 );
3008 }
3009 }
3010 }
3011 }
3012
3013 for (collectible, (flag, task_ids)) in collectibles {
3014 if !flag {
3015 use std::io::Write;
3016 let mut stdout = stdout().lock();
3017 writeln!(
3018 stdout,
3019 "{:?} that is not emitted in any child task but in these aggregated \
3020 tasks: {:#?}",
3021 collectible,
3022 task_ids
3023 .iter()
3024 .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
3025 .collect::<Vec<_>>()
3026 )
3027 .unwrap();
3028
3029 let task_id = collectible.cell.task;
3030 let mut queue = {
3031 let task = ctx.task(task_id, TaskDataCategory::All);
3032 get_uppers(&task)
3033 };
3034 let mut visited = FxHashSet::default();
3035 for &upper_id in queue.iter() {
3036 visited.insert(upper_id);
3037 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3038 }
3039 while let Some(task_id) = queue.pop() {
3040 let desc = ctx.get_task_description(task_id);
3041 let task = ctx.task(task_id, TaskDataCategory::All);
3042 let aggregated_collectible =
3043 get!(task, AggregatedCollectible { collectible })
3044 .copied()
3045 .unwrap_or_default();
3046 let uppers = get_uppers(&task);
3047 drop(task);
3048 writeln!(
3049 stdout,
3050 "upper {task_id} {desc} collectible={aggregated_collectible}"
3051 )
3052 .unwrap();
3053 if task_ids.contains(&task_id) {
3054 writeln!(
3055 stdout,
3056 "Task has an upper connection to an aggregated task that doesn't \
3057 reference it. Upper connection is invalid!"
3058 )
3059 .unwrap();
3060 }
3061 for upper_id in uppers {
3062 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3063 if !visited.contains(&upper_id) {
3064 queue.push(upper_id);
3065 }
3066 }
3067 }
3068 panic!("See stdout for more details");
3069 }
3070 }
3071 }
3072 }
3073
3074 fn assert_not_persistent_calling_transient(
3075 &self,
3076 parent_id: Option<TaskId>,
3077 child_id: TaskId,
3078 cell_id: Option<CellId>,
3079 ) {
3080 if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
3081 self.panic_persistent_calling_transient(
3082 parent_id
3083 .and_then(|id| self.lookup_task_type(id))
3084 .as_deref(),
3085 self.lookup_task_type(child_id).as_deref(),
3086 cell_id,
3087 );
3088 }
3089 }
3090
3091 fn panic_persistent_calling_transient(
3092 &self,
3093 parent: Option<&CachedTaskType>,
3094 child: Option<&CachedTaskType>,
3095 cell_id: Option<CellId>,
3096 ) {
3097 let transient_reason = if let Some(child) = child {
3098 Cow::Owned(format!(
3099 " The callee is transient because it depends on:\n{}",
3100 self.debug_trace_transient_task(child, cell_id),
3101 ))
3102 } else {
3103 Cow::Borrowed("")
3104 };
3105 panic!(
3106 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3107 parent.map_or("unknown", |t| t.get_name()),
3108 child.map_or("unknown", |t| t.get_name()),
3109 transient_reason,
3110 );
3111 }
3112
3113 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3114 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3116 let task_info = if let Some(col_task_ty) = collectible
3118 .try_get_task_id()
3119 .and_then(|t| self.lookup_task_type(t))
3120 {
3121 Cow::Owned(format!(" (return type of {col_task_ty})"))
3122 } else {
3123 Cow::Borrowed("")
3124 };
3125 panic!("Collectible{task_info} must be a ResolvedVc")
3126 };
3127 if col_task_id.is_transient() && !task_id.is_transient() {
3128 let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
3129 Cow::Owned(format!(
3130 ". The collectible is transient because it depends on:\n{}",
3131 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3132 ))
3133 } else {
3134 Cow::Borrowed("")
3135 };
3136 panic!(
3138 "Collectible is transient, transient collectibles cannot be emitted from \
3139 persistent tasks{transient_reason}",
3140 )
3141 }
3142 }
3143}
3144
3145impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3146 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3147 self.0.startup(turbo_tasks);
3148 }
3149
3150 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3151 self.0.stopping();
3152 }
3153
3154 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3155 self.0.stop(turbo_tasks);
3156 }
3157
3158 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3159 self.0.idle_start(turbo_tasks);
3160 }
3161
3162 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3163 self.0.idle_end();
3164 }
3165
3166 fn get_or_create_persistent_task(
3167 &self,
3168 task_type: CachedTaskType,
3169 parent_task: Option<TaskId>,
3170 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3171 ) -> TaskId {
3172 self.0
3173 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3174 }
3175
3176 fn get_or_create_transient_task(
3177 &self,
3178 task_type: CachedTaskType,
3179 parent_task: Option<TaskId>,
3180 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3181 ) -> TaskId {
3182 self.0
3183 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3184 }
3185
3186 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3187 self.0.invalidate_task(task_id, turbo_tasks);
3188 }
3189
3190 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3191 self.0.invalidate_tasks(tasks, turbo_tasks);
3192 }
3193
3194 fn invalidate_tasks_set(
3195 &self,
3196 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3197 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3198 ) {
3199 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3200 }
3201
3202 fn invalidate_serialization(
3203 &self,
3204 task_id: TaskId,
3205 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3206 ) {
3207 self.0.invalidate_serialization(task_id, turbo_tasks);
3208 }
3209
3210 fn get_task_description(&self, task: TaskId) -> String {
3211 self.0.get_task_description(task)
3212 }
3213
3214 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3215 self.0.task_execution_canceled(task, turbo_tasks)
3216 }
3217
3218 fn try_start_task_execution(
3219 &self,
3220 task_id: TaskId,
3221 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3222 ) -> Option<TaskExecutionSpec<'_>> {
3223 self.0.try_start_task_execution(task_id, turbo_tasks)
3224 }
3225
3226 fn task_execution_completed(
3227 &self,
3228 task_id: TaskId,
3229 result: Result<RawVc, TurboTasksExecutionError>,
3230 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3231 stateful: bool,
3232 has_invalidator: bool,
3233 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3234 ) -> bool {
3235 self.0.task_execution_completed(
3236 task_id,
3237 result,
3238 cell_counters,
3239 stateful,
3240 has_invalidator,
3241 turbo_tasks,
3242 )
3243 }
3244
3245 type BackendJob = TurboTasksBackendJob;
3246
3247 fn run_backend_job<'a>(
3248 &'a self,
3249 job: Self::BackendJob,
3250 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3251 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3252 self.0.run_backend_job(job, turbo_tasks)
3253 }
3254
3255 fn try_read_task_output(
3256 &self,
3257 task_id: TaskId,
3258 reader: Option<TaskId>,
3259 options: ReadOutputOptions,
3260 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3261 ) -> Result<Result<RawVc, EventListener>> {
3262 self.0
3263 .try_read_task_output(task_id, reader, options, turbo_tasks)
3264 }
3265
3266 fn try_read_task_cell(
3267 &self,
3268 task_id: TaskId,
3269 cell: CellId,
3270 reader: Option<TaskId>,
3271 options: ReadCellOptions,
3272 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3273 ) -> Result<Result<TypedCellContent, EventListener>> {
3274 self.0
3275 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3276 }
3277
3278 fn try_read_own_task_cell(
3279 &self,
3280 task_id: TaskId,
3281 cell: CellId,
3282 options: ReadCellOptions,
3283 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3284 ) -> Result<TypedCellContent> {
3285 self.0
3286 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3287 }
3288
3289 fn read_task_collectibles(
3290 &self,
3291 task_id: TaskId,
3292 collectible_type: TraitTypeId,
3293 reader: Option<TaskId>,
3294 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3295 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3296 self.0
3297 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3298 }
3299
3300 fn emit_collectible(
3301 &self,
3302 collectible_type: TraitTypeId,
3303 collectible: RawVc,
3304 task_id: TaskId,
3305 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3306 ) {
3307 self.0
3308 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3309 }
3310
3311 fn unemit_collectible(
3312 &self,
3313 collectible_type: TraitTypeId,
3314 collectible: RawVc,
3315 count: u32,
3316 task_id: TaskId,
3317 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3318 ) {
3319 self.0
3320 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3321 }
3322
3323 fn update_task_cell(
3324 &self,
3325 task_id: TaskId,
3326 cell: CellId,
3327 content: CellContent,
3328 verification_mode: VerificationMode,
3329 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3330 ) {
3331 self.0
3332 .update_task_cell(task_id, cell, content, verification_mode, turbo_tasks);
3333 }
3334
3335 fn mark_own_task_as_finished(
3336 &self,
3337 task_id: TaskId,
3338 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3339 ) {
3340 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3341 }
3342
3343 fn set_own_task_aggregation_number(
3344 &self,
3345 task: TaskId,
3346 aggregation_number: u32,
3347 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3348 ) {
3349 self.0
3350 .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3351 }
3352
3353 fn mark_own_task_as_session_dependent(
3354 &self,
3355 task: TaskId,
3356 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3357 ) {
3358 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3359 }
3360
3361 fn connect_task(
3362 &self,
3363 task: TaskId,
3364 parent_task: Option<TaskId>,
3365 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3366 ) {
3367 self.0.connect_task(task, parent_task, turbo_tasks);
3368 }
3369
3370 fn create_transient_task(
3371 &self,
3372 task_type: TransientTaskType,
3373 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3374 ) -> TaskId {
3375 self.0.create_transient_task(task_type)
3376 }
3377
3378 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3379 self.0.dispose_root_task(task_id, turbo_tasks);
3380 }
3381
3382 fn task_statistics(&self) -> &TaskStatisticsApi {
3383 &self.0.task_statistics
3384 }
3385
3386 fn is_tracking_dependencies(&self) -> bool {
3387 self.0.options.dependency_tracking
3388 }
3389}
3390
3391enum DebugTraceTransientTask {
3392 Cached {
3393 task_name: &'static str,
3394 cell_type_id: Option<ValueTypeId>,
3395 cause_self: Option<Box<DebugTraceTransientTask>>,
3396 cause_args: Vec<DebugTraceTransientTask>,
3397 },
3398 Collapsed {
3400 task_name: &'static str,
3401 cell_type_id: Option<ValueTypeId>,
3402 },
3403 Uncached {
3404 cell_type_id: Option<ValueTypeId>,
3405 },
3406}
3407
3408impl DebugTraceTransientTask {
3409 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3410 let indent = " ".repeat(level);
3411 f.write_str(&indent)?;
3412
3413 fn fmt_cell_type_id(
3414 f: &mut fmt::Formatter<'_>,
3415 cell_type_id: Option<ValueTypeId>,
3416 ) -> fmt::Result {
3417 if let Some(ty) = cell_type_id {
3418 write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3419 } else {
3420 Ok(())
3421 }
3422 }
3423
3424 match self {
3426 Self::Cached {
3427 task_name,
3428 cell_type_id,
3429 ..
3430 }
3431 | Self::Collapsed {
3432 task_name,
3433 cell_type_id,
3434 ..
3435 } => {
3436 f.write_str(task_name)?;
3437 fmt_cell_type_id(f, *cell_type_id)?;
3438 if matches!(self, Self::Collapsed { .. }) {
3439 f.write_str(" (collapsed)")?;
3440 }
3441 }
3442 Self::Uncached { cell_type_id } => {
3443 f.write_str("unknown transient task")?;
3444 fmt_cell_type_id(f, *cell_type_id)?;
3445 }
3446 }
3447 f.write_char('\n')?;
3448
3449 if let Self::Cached {
3451 cause_self,
3452 cause_args,
3453 ..
3454 } = self
3455 {
3456 if let Some(c) = cause_self {
3457 writeln!(f, "{indent} self:")?;
3458 c.fmt_indented(f, level + 1)?;
3459 }
3460 if !cause_args.is_empty() {
3461 writeln!(f, "{indent} args:")?;
3462 for c in cause_args {
3463 c.fmt_indented(f, level + 1)?;
3464 }
3465 }
3466 }
3467 Ok(())
3468 }
3469}
3470
3471impl fmt::Display for DebugTraceTransientTask {
3472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3473 self.fmt_indented(f, 0)
3474 }
3475}
3476
3477fn far_future() -> Instant {
3479 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3484}