1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6 borrow::Cow,
7 cmp::min,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 ops::Range,
13 pin::Pin,
14 sync::{
15 Arc,
16 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17 },
18};
19
20use anyhow::{Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, field::Empty, info_span, trace_span};
28use turbo_tasks::{
29 CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
30 ReadOutputOptions, ReadTracking, SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId,
31 TraitTypeId, TurboTasksBackendApi, ValueTypeId,
32 backend::{
33 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
34 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
35 },
36 event::{Event, EventListener},
37 message_queue::TimingEvent,
38 registry::get_value_type,
39 task_statistics::TaskStatisticsApi,
40 trace::TraceRawVcs,
41 turbo_tasks,
42 util::{IdFactoryWithReuse, good_chunk_size},
43};
44
45pub use self::{operation::AnyOperation, storage::TaskDataCategory};
46#[cfg(feature = "trace_task_dirty")]
47use crate::backend::operation::TaskDirtyCause;
48use crate::{
49 backend::{
50 operation::{
51 AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
52 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
53 Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
54 get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
55 },
56 storage::{
57 InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
58 iter_many, remove,
59 },
60 },
61 backing_storage::BackingStorage,
62 data::{
63 ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
64 CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
65 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
66 },
67 utils::{
68 bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
69 ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
70 },
71};
72
73const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
74
75struct SnapshotRequest {
76 snapshot_requested: bool,
77 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
78}
79
80impl SnapshotRequest {
81 fn new() -> Self {
82 Self {
83 snapshot_requested: false,
84 suspended_operations: FxHashSet::default(),
85 }
86 }
87}
88
89type TransientTaskOnce =
90 Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
91
92pub enum TransientTask {
93 Root(TransientTaskRoot),
99
100 Once(TransientTaskOnce),
109}
110
111pub enum StorageMode {
112 ReadOnly,
114 ReadWrite,
117}
118
119pub struct BackendOptions {
120 pub dependency_tracking: bool,
125
126 pub active_tracking: bool,
132
133 pub storage_mode: Option<StorageMode>,
135
136 pub num_workers: Option<usize>,
139
140 pub small_preallocation: bool,
142}
143
144impl Default for BackendOptions {
145 fn default() -> Self {
146 Self {
147 dependency_tracking: true,
148 active_tracking: true,
149 storage_mode: Some(StorageMode::ReadWrite),
150 num_workers: None,
151 small_preallocation: false,
152 }
153 }
154}
155
156pub enum TurboTasksBackendJob {
157 InitialSnapshot,
158 FollowUpSnapshot,
159 Prefetch {
160 data: Arc<FxIndexMap<TaskId, bool>>,
161 range: Option<Range<usize>>,
162 },
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
168
169struct TurboTasksBackendInner<B: BackingStorage> {
170 options: BackendOptions,
171
172 start_time: Instant,
173 session_id: SessionId,
174
175 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
176 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
177
178 persisted_task_cache_log: Option<TaskCacheLog>,
179 task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
180 transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
181
182 storage: Storage,
183
184 local_is_partial: AtomicBool,
186
187 in_progress_operations: AtomicUsize,
193
194 snapshot_request: Mutex<SnapshotRequest>,
195 operations_suspended: Condvar,
199 snapshot_completed: Condvar,
202 last_snapshot: AtomicU64,
204
205 stopping: AtomicBool,
206 stopping_event: Event,
207 idle_start_event: Event,
208 idle_end_event: Event,
209 #[cfg(feature = "verify_aggregation_graph")]
210 is_idle: AtomicBool,
211
212 task_statistics: TaskStatisticsApi,
213
214 backing_storage: B,
215
216 #[cfg(feature = "verify_aggregation_graph")]
217 root_tasks: Mutex<FxHashSet<TaskId>>,
218}
219
220impl<B: BackingStorage> TurboTasksBackend<B> {
221 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
222 Self(Arc::new(TurboTasksBackendInner::new(
223 options,
224 backing_storage,
225 )))
226 }
227
228 pub fn backing_storage(&self) -> &B {
229 &self.0.backing_storage
230 }
231}
232
233impl<B: BackingStorage> TurboTasksBackendInner<B> {
234 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
235 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
236 let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
237 if !options.dependency_tracking {
238 options.active_tracking = false;
239 }
240 let small_preallocation = options.small_preallocation;
241 let next_task_id = backing_storage
242 .next_free_task_id()
243 .expect("Failed to get task id");
244 Self {
245 options,
246 start_time: Instant::now(),
247 session_id: backing_storage
248 .next_session_id()
249 .expect("Failed get session id"),
250 persisted_task_id_factory: IdFactoryWithReuse::new(
251 next_task_id,
252 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
253 ),
254 transient_task_id_factory: IdFactoryWithReuse::new(
255 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
256 TaskId::MAX,
257 ),
258 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
259 task_cache: BiMap::new(),
260 transient_tasks: FxDashMap::default(),
261 local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
262 storage: Storage::new(shard_amount, small_preallocation),
263 in_progress_operations: AtomicUsize::new(0),
264 snapshot_request: Mutex::new(SnapshotRequest::new()),
265 operations_suspended: Condvar::new(),
266 snapshot_completed: Condvar::new(),
267 last_snapshot: AtomicU64::new(0),
268 stopping: AtomicBool::new(false),
269 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
270 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
271 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
272 #[cfg(feature = "verify_aggregation_graph")]
273 is_idle: AtomicBool::new(false),
274 task_statistics: TaskStatisticsApi::default(),
275 backing_storage,
276 #[cfg(feature = "verify_aggregation_graph")]
277 root_tasks: Default::default(),
278 }
279 }
280
281 fn execute_context<'a>(
282 &'a self,
283 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
284 ) -> impl ExecuteContext<'a> {
285 ExecuteContextImpl::new(self, turbo_tasks)
286 }
287
288 fn session_id(&self) -> SessionId {
289 self.session_id
290 }
291
292 unsafe fn execute_context_with_tx<'e, 'tx>(
296 &'e self,
297 tx: Option<&'e B::ReadTransaction<'tx>>,
298 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
299 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
300 where
301 'tx: 'e,
302 {
303 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
305 }
306
307 fn suspending_requested(&self) -> bool {
308 self.should_persist()
309 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
310 }
311
312 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
313 #[cold]
314 fn operation_suspend_point_cold<B: BackingStorage>(
315 this: &TurboTasksBackendInner<B>,
316 suspend: impl FnOnce() -> AnyOperation,
317 ) {
318 let operation = Arc::new(suspend());
319 let mut snapshot_request = this.snapshot_request.lock();
320 if snapshot_request.snapshot_requested {
321 snapshot_request
322 .suspended_operations
323 .insert(operation.clone().into());
324 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
325 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
326 if value == SNAPSHOT_REQUESTED_BIT {
327 this.operations_suspended.notify_all();
328 }
329 this.snapshot_completed
330 .wait_while(&mut snapshot_request, |snapshot_request| {
331 snapshot_request.snapshot_requested
332 });
333 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
334 snapshot_request
335 .suspended_operations
336 .remove(&operation.into());
337 }
338 }
339
340 if self.suspending_requested() {
341 operation_suspend_point_cold(self, suspend);
342 }
343 }
344
345 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
346 if !self.should_persist() {
347 return OperationGuard { backend: None };
348 }
349 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
350 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
351 let mut snapshot_request = self.snapshot_request.lock();
352 if snapshot_request.snapshot_requested {
353 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
354 if value == SNAPSHOT_REQUESTED_BIT {
355 self.operations_suspended.notify_all();
356 }
357 self.snapshot_completed
358 .wait_while(&mut snapshot_request, |snapshot_request| {
359 snapshot_request.snapshot_requested
360 });
361 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
362 }
363 }
364 OperationGuard {
365 backend: Some(self),
366 }
367 }
368
369 fn should_persist(&self) -> bool {
370 matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
371 }
372
373 fn should_restore(&self) -> bool {
374 self.options.storage_mode.is_some()
375 }
376
377 fn should_track_dependencies(&self) -> bool {
378 self.options.dependency_tracking
379 }
380
381 fn should_track_activeness(&self) -> bool {
382 self.options.active_tracking
383 }
384
385 fn track_cache_hit(&self, task_type: &CachedTaskType) {
386 self.task_statistics
387 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
388 }
389
390 fn track_cache_miss(&self, task_type: &CachedTaskType) {
391 self.task_statistics
392 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
393 }
394}
395
396pub(crate) struct OperationGuard<'a, B: BackingStorage> {
397 backend: Option<&'a TurboTasksBackendInner<B>>,
398}
399
400impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
401 fn drop(&mut self) {
402 if let Some(backend) = self.backend {
403 let fetch_sub = backend
404 .in_progress_operations
405 .fetch_sub(1, Ordering::AcqRel);
406 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
407 backend.operations_suspended.notify_all();
408 }
409 }
410 }
411}
412
413struct TaskExecutionCompletePrepareResult {
415 pub new_children: FxHashSet<TaskId>,
416 pub removed_data: Vec<CachedDataItem>,
417 pub is_now_immutable: bool,
418 #[cfg(feature = "verify_determinism")]
419 pub no_output_set: bool,
420 pub new_output: Option<OutputValue>,
421 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
422}
423
424impl<B: BackingStorage> TurboTasksBackendInner<B> {
426 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
430 &'l self,
431 tx: Option<&'l B::ReadTransaction<'tx>>,
432 parent_task: Option<TaskId>,
433 child_task: TaskId,
434 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
435 ) {
436 operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
437 self.execute_context_with_tx(tx, turbo_tasks)
438 });
439 }
440
441 fn connect_child(
442 &self,
443 parent_task: Option<TaskId>,
444 child_task: TaskId,
445 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
446 ) {
447 operation::ConnectChildOperation::run(
448 parent_task,
449 child_task,
450 self.execute_context(turbo_tasks),
451 );
452 }
453
454 fn try_read_task_output(
455 self: &Arc<Self>,
456 task_id: TaskId,
457 reader: Option<TaskId>,
458 options: ReadOutputOptions,
459 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
460 ) -> Result<Result<RawVc, EventListener>> {
461 self.assert_not_persistent_calling_transient(reader, task_id, None);
462
463 let mut ctx = self.execute_context(turbo_tasks);
464 let need_reader_task = if self.should_track_dependencies()
465 && !matches!(options.tracking, ReadTracking::Untracked)
466 && reader.is_some_and(|reader_id| reader_id != task_id)
467 && let Some(reader_id) = reader
468 && reader_id != task_id
469 {
470 Some(reader_id)
471 } else {
472 None
473 };
474 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
475 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
479 (task, Some(reader))
480 } else {
481 (ctx.task(task_id, TaskDataCategory::All), None)
482 };
483
484 fn listen_to_done_event<B: BackingStorage>(
485 this: &TurboTasksBackendInner<B>,
486 reader: Option<TaskId>,
487 tracking: ReadTracking,
488 done_event: &Event,
489 ) -> EventListener {
490 done_event.listen_with_note(move || {
491 let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
492 move || {
493 if let Some(reader_desc) = reader_desc.as_ref() {
494 format!("try_read_task_output from {} ({})", reader_desc(), tracking)
495 } else {
496 format!("try_read_task_output ({})", tracking)
497 }
498 }
499 })
500 }
501
502 fn check_in_progress<B: BackingStorage>(
503 this: &TurboTasksBackendInner<B>,
504 task: &impl TaskGuard,
505 reader: Option<TaskId>,
506 tracking: ReadTracking,
507 ctx: &impl ExecuteContext<'_>,
508 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
509 {
510 match get!(task, InProgress) {
511 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
512 listen_to_done_event(this, reader, tracking, done_event),
513 ))),
514 Some(InProgressState::InProgress(box InProgressStateInner {
515 done_event, ..
516 })) => Some(Ok(Err(listen_to_done_event(
517 this, reader, tracking, done_event,
518 )))),
519 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
520 "{} was canceled",
521 ctx.get_task_description(task.id())
522 ))),
523 None => None,
524 }
525 }
526
527 if matches!(options.consistency, ReadConsistency::Strong) {
528 loop {
530 let aggregation_number = get_aggregation_number(&task);
531 if is_root_node(aggregation_number) {
532 break;
533 }
534 drop(task);
535 drop(reader_task);
536 {
537 let _span = tracing::trace_span!(
538 "make root node for strongly consistent read",
539 %task_id
540 )
541 .entered();
542 AggregationUpdateQueue::run(
543 AggregationUpdateJob::UpdateAggregationNumber {
544 task_id,
545 base_aggregation_number: u32::MAX,
546 distance: None,
547 },
548 &mut ctx,
549 );
550 }
551 (task, reader_task) = if let Some(reader_id) = need_reader_task {
552 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
554 (task, Some(reader))
555 } else {
556 (ctx.task(task_id, TaskDataCategory::All), None)
557 }
558 }
559
560 let is_dirty =
561 get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
562
563 let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
565 .cloned()
566 .unwrap_or_default()
567 .get(self.session_id);
568 if dirty_tasks > 0 || is_dirty {
569 let activeness = get_mut!(task, Activeness);
570 let mut task_ids_to_schedule: Vec<_> = Vec::new();
571 let activeness = if let Some(activeness) = activeness {
573 activeness.set_active_until_clean();
577 activeness
578 } else {
579 get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
583 .set_active_until_clean();
584 if ctx.should_track_activeness() {
585 task_ids_to_schedule = get_many!(
587 task,
588 AggregatedDirtyContainer {
589 task
590 } count if count.get(self.session_id) > 0 => {
591 task
592 }
593 );
594 task_ids_to_schedule.push(task_id);
595 }
596 get!(task, Activeness).unwrap()
597 };
598 let listener = activeness.all_clean_event.listen_with_note(move || {
599 let this = self.clone();
600 let tt = turbo_tasks.pin();
601 move || {
602 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
603 let mut ctx = this.execute_context(tt);
604 let mut visited = FxHashSet::default();
605 fn indent(s: &str) -> String {
606 s.split_inclusive('\n')
607 .flat_map(|line: &str| [" ", line].into_iter())
608 .collect::<String>()
609 }
610 fn get_info(
611 ctx: &mut impl ExecuteContext<'_>,
612 task_id: TaskId,
613 parent_and_count: Option<(TaskId, i32)>,
614 visited: &mut FxHashSet<TaskId>,
615 ) -> String {
616 let task = ctx.task(task_id, TaskDataCategory::Data);
617 let is_dirty = get!(task, Dirty)
618 .map_or(false, |dirty_state| dirty_state.get(ctx.session_id()));
619 let in_progress =
620 get!(task, InProgress).map_or("not in progress", |p| match p {
621 InProgressState::InProgress(_) => "in progress",
622 InProgressState::Scheduled { .. } => "scheduled",
623 InProgressState::Canceled => "canceled",
624 });
625 let activeness = get!(task, Activeness).map_or_else(
626 || "not active".to_string(),
627 |activeness| format!("{activeness:?}"),
628 );
629 let aggregation_number = get_aggregation_number(&task);
630 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
631 {
632 let uppers = get_uppers(&task);
633 !uppers.contains(&parent_task_id)
634 } else {
635 false
636 };
637
638 let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
640 .cloned()
641 .unwrap_or_default()
642 .get(ctx.session_id());
643
644 let task_description = ctx.get_task_description(task_id);
645 let is_dirty = if is_dirty { ", dirty" } else { "" };
646 let count = if let Some((_, count)) = parent_and_count {
647 format!(" {count}")
648 } else {
649 String::new()
650 };
651 let mut info = format!(
652 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
653 {in_progress}, {activeness}{is_dirty})",
654 );
655 let children: Vec<_> = iter_many!(
656 task,
657 AggregatedDirtyContainer {
658 task
659 } count => {
660 (task, count.get(ctx.session_id()))
661 }
662 )
663 .filter(|(_, count)| *count > 0)
664 .collect();
665 drop(task);
666
667 if missing_upper {
668 info.push_str("\n ERROR: missing upper connection");
669 }
670
671 if dirty_tasks > 0 || !children.is_empty() {
672 writeln!(info, "\n {dirty_tasks} dirty tasks:").unwrap();
673
674 for (child_task_id, count) in children {
675 let task_description = ctx.get_task_description(child_task_id);
676 if visited.insert(child_task_id) {
677 let child_info = get_info(
678 ctx,
679 child_task_id,
680 Some((task_id, count)),
681 visited,
682 );
683 info.push_str(&indent(&child_info));
684 if !info.ends_with('\n') {
685 info.push('\n');
686 }
687 } else {
688 writeln!(
689 info,
690 " {child_task_id} {task_description} {count} \
691 (already visited)"
692 )
693 .unwrap();
694 }
695 }
696 }
697 info
698 }
699 let info = get_info(&mut ctx, task_id, None, &mut visited);
700 format!(
701 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
702 )
703 }
704 });
705 drop(reader_task);
706 drop(task);
707 if !task_ids_to_schedule.is_empty() {
708 let mut queue = AggregationUpdateQueue::new();
709 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
710 queue.execute(&mut ctx);
711 }
712
713 return Ok(Err(listener));
714 }
715 }
716
717 if let Some(value) = check_in_progress(self, &task, reader, options.tracking, &ctx) {
718 return value;
719 }
720
721 if let Some(output) = get!(task, Output) {
722 let result = match output {
723 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
724 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
725 OutputValue::Error(error) => {
726 let err: anyhow::Error = error.clone().into();
727 Err(err.context(format!(
728 "Execution of {} failed",
729 ctx.get_task_description(task_id)
730 )))
731 }
732 };
733 if let Some(mut reader_task) = reader_task
734 && options.tracking.should_track(result.is_err())
735 && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
736 {
737 let reader = reader.unwrap();
738 let _ = task.add(CachedDataItem::OutputDependent {
739 task: reader,
740 value: (),
741 });
742 drop(task);
743
744 if reader_task
750 .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
751 .is_none()
752 {
753 let _ = reader_task.add(CachedDataItem::OutputDependency {
754 target: task_id,
755 value: (),
756 });
757 }
758 }
759
760 return result;
761 }
762 drop(reader_task);
763
764 let note = move || {
765 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
766 move || {
767 if let Some(reader_desc) = reader_desc.as_ref() {
768 format!("try_read_task_output (recompute) from {}", (reader_desc)())
769 } else {
770 "try_read_task_output (recompute, untracked)".to_string()
771 }
772 }
773 };
774
775 let (item, listener) = CachedDataItem::new_scheduled_with_listener(
777 TaskExecutionReason::OutputNotAvailable,
778 || self.get_task_desc_fn(task_id),
779 note,
780 );
781 task.add_new(item);
784 ctx.schedule_task(task);
785
786 Ok(Err(listener))
787 }
788
789 fn try_read_task_cell(
790 &self,
791 task_id: TaskId,
792 reader: Option<TaskId>,
793 cell: CellId,
794 options: ReadCellOptions,
795 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
796 ) -> Result<Result<TypedCellContent, EventListener>> {
797 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
798
799 fn add_cell_dependency(
800 task_id: TaskId,
801 mut task: impl TaskGuard,
802 reader: Option<TaskId>,
803 reader_task: Option<impl TaskGuard>,
804 cell: CellId,
805 ) {
806 if let Some(mut reader_task) = reader_task
807 && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
808 {
809 let _ = task.add(CachedDataItem::CellDependent {
810 cell,
811 task: reader.unwrap(),
812 value: (),
813 });
814 drop(task);
815
816 let target = CellRef {
822 task: task_id,
823 cell,
824 };
825 if reader_task
826 .remove(&CachedDataItemKey::OutdatedCellDependency { target })
827 .is_none()
828 {
829 let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
830 }
831 }
832 }
833
834 let mut ctx = self.execute_context(turbo_tasks);
835 let (mut task, reader_task) = if self.should_track_dependencies()
836 && !matches!(options.tracking, ReadTracking::Untracked)
837 && let Some(reader_id) = reader
838 && reader_id != task_id
839 {
840 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::Data);
844 (task, Some(reader))
845 } else {
846 (ctx.task(task_id, TaskDataCategory::Data), None)
847 };
848
849 let content = if options.final_read_hint {
850 remove!(task, CellData { cell })
851 } else if let Some(content) = get!(task, CellData { cell }) {
852 let content = content.clone();
853 Some(content)
854 } else {
855 None
856 };
857 if let Some(content) = content {
858 if options.tracking.should_track(false) {
859 add_cell_dependency(task_id, task, reader, reader_task, cell);
860 }
861 return Ok(Ok(TypedCellContent(
862 cell.type_id,
863 CellContent(Some(content.reference)),
864 )));
865 }
866
867 let in_progress = get!(task, InProgress);
868 if matches!(
869 in_progress,
870 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
871 ) {
872 return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
873 }
874 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
875
876 let max_id = get!(
878 task,
879 CellTypeMaxIndex {
880 cell_type: cell.type_id
881 }
882 )
883 .copied();
884 let Some(max_id) = max_id else {
885 if options.tracking.should_track(true) {
886 add_cell_dependency(task_id, task, reader, reader_task, cell);
887 }
888 bail!(
889 "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
890 ctx.get_task_description(task_id)
891 );
892 };
893 if cell.index >= max_id {
894 if options.tracking.should_track(true) {
895 add_cell_dependency(task_id, task, reader, reader_task, cell);
896 }
897 bail!(
898 "Cell {cell:?} no longer exists in task {} (index out of bounds)",
899 ctx.get_task_description(task_id)
900 );
901 }
902 drop(reader_task);
903
904 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
909 if !new_listener {
910 return Ok(Err(listener));
911 }
912
913 let _span = tracing::trace_span!(
914 "recomputation",
915 cell_type = get_value_type(cell.type_id).global_name,
916 cell_index = cell.index
917 )
918 .entered();
919
920 if is_cancelled {
922 bail!("{} was canceled", ctx.get_task_description(task_id));
923 }
924 task.add_new(CachedDataItem::new_scheduled(
925 TaskExecutionReason::CellNotAvailable,
926 || self.get_task_desc_fn(task_id),
927 ));
928 ctx.schedule_task(task);
929
930 Ok(Err(listener))
931 }
932
933 fn listen_to_cell(
934 &self,
935 task: &mut impl TaskGuard,
936 task_id: TaskId,
937 reader: Option<TaskId>,
938 cell: CellId,
939 ) -> (EventListener, bool) {
940 let note = move || {
941 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
942 move || {
943 if let Some(reader_desc) = reader_desc.as_ref() {
944 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
945 } else {
946 "try_read_task_cell (in progress, untracked)".to_string()
947 }
948 }
949 };
950 if let Some(in_progress) = get!(task, InProgressCell { cell }) {
951 let listener = in_progress.event.listen_with_note(note);
953 return (listener, false);
954 }
955 let in_progress = InProgressCellState::new(task_id, cell);
956 let listener = in_progress.event.listen_with_note(note);
957 task.add_new(CachedDataItem::InProgressCell {
958 cell,
959 value: in_progress,
960 });
961 (listener, true)
962 }
963
964 fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
965 if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
966 return Some(task_type);
967 }
968 if self.should_restore()
969 && self.local_is_partial.load(Ordering::Acquire)
970 && !task_id.is_transient()
971 && let Some(task_type) = unsafe {
972 self.backing_storage
973 .reverse_lookup_task_cache(None, task_id)
974 .expect("Failed to lookup task type")
975 }
976 {
977 let _ = self.task_cache.try_insert(task_type.clone(), task_id);
978 return Some(task_type);
979 }
980 None
981 }
982
983 fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
984 let task_type = self.lookup_task_type(task_id);
985 move || {
986 task_type.as_ref().map_or_else(
987 || format!("{task_id:?} transient"),
988 |task_type| format!("{task_id:?} {task_type}"),
989 )
990 }
991 }
992
993 fn snapshot(&self) -> Option<(Instant, bool)> {
994 let start = Instant::now();
995 debug_assert!(self.should_persist());
996 let mut snapshot_request = self.snapshot_request.lock();
997 snapshot_request.snapshot_requested = true;
998 let active_operations = self
999 .in_progress_operations
1000 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1001 if active_operations != 0 {
1002 self.operations_suspended
1003 .wait_while(&mut snapshot_request, |_| {
1004 self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
1005 });
1006 }
1007 let suspended_operations = snapshot_request
1008 .suspended_operations
1009 .iter()
1010 .map(|op| op.arc().clone())
1011 .collect::<Vec<_>>();
1012 drop(snapshot_request);
1013 self.storage.start_snapshot();
1014 let mut persisted_task_cache_log = self
1015 .persisted_task_cache_log
1016 .as_ref()
1017 .map(|l| l.take(|i| i))
1018 .unwrap_or_default();
1019 let mut snapshot_request = self.snapshot_request.lock();
1020 snapshot_request.snapshot_requested = false;
1021 self.in_progress_operations
1022 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1023 self.snapshot_completed.notify_all();
1024 let snapshot_time = Instant::now();
1025 drop(snapshot_request);
1026
1027 let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
1028 if task_id.is_transient() {
1029 return (None, None);
1030 }
1031 let len = inner.len();
1032
1033 let meta_restored = inner.state().meta_restored();
1034 let data_restored = inner.state().data_restored();
1035
1036 let mut meta = meta_restored.then(|| Vec::with_capacity(len));
1037 let mut data = data_restored.then(|| Vec::with_capacity(len));
1038 for (key, value) in inner.iter_all() {
1039 if key.is_persistent() && value.is_persistent() {
1040 match key.category() {
1041 TaskDataCategory::Meta => {
1042 if let Some(meta) = &mut meta {
1043 meta.push(CachedDataItem::from_key_and_value_ref(key, value))
1044 }
1045 }
1046 TaskDataCategory::Data => {
1047 if let Some(data) = &mut data {
1048 data.push(CachedDataItem::from_key_and_value_ref(key, value))
1049 }
1050 }
1051 _ => {}
1052 }
1053 }
1054 }
1055
1056 (meta, data)
1057 };
1058 let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
1059 (
1060 task_id,
1061 meta.map(|d| self.backing_storage.serialize(task_id, &d)),
1062 data.map(|d| self.backing_storage.serialize(task_id, &d)),
1063 )
1064 };
1065 let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
1066 if task_id.is_transient() {
1067 return (task_id, None, None);
1068 }
1069 let len = inner.len();
1070 let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
1071 let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
1072 for (key, value) in inner.iter_all() {
1073 if key.is_persistent() && value.is_persistent() {
1074 match key.category() {
1075 TaskDataCategory::Meta => {
1076 if let Some(meta) = &mut meta {
1077 meta.push(CachedDataItem::from_key_and_value_ref(key, value));
1078 }
1079 }
1080 TaskDataCategory::Data => {
1081 if let Some(data) = &mut data {
1082 data.push(CachedDataItem::from_key_and_value_ref(key, value));
1083 }
1084 }
1085 _ => {}
1086 }
1087 }
1088 }
1089 (
1090 task_id,
1091 meta.map(|meta| self.backing_storage.serialize(task_id, &meta)),
1092 data.map(|data| self.backing_storage.serialize(task_id, &data)),
1093 )
1094 };
1095
1096 let snapshot = {
1097 let _span = tracing::trace_span!("take snapshot");
1098 self.storage
1099 .take_snapshot(&preprocess, &process, &process_snapshot)
1100 };
1101
1102 #[cfg(feature = "print_cache_item_size")]
1103 #[derive(Default)]
1104 struct TaskCacheStats {
1105 data: usize,
1106 data_count: usize,
1107 meta: usize,
1108 meta_count: usize,
1109 }
1110 #[cfg(feature = "print_cache_item_size")]
1111 impl TaskCacheStats {
1112 fn add_data(&mut self, len: usize) {
1113 self.data += len;
1114 self.data_count += 1;
1115 }
1116
1117 fn add_meta(&mut self, len: usize) {
1118 self.meta += len;
1119 self.meta_count += 1;
1120 }
1121 }
1122 #[cfg(feature = "print_cache_item_size")]
1123 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1124 Mutex::new(FxHashMap::default());
1125
1126 let task_snapshots = snapshot
1127 .into_iter()
1128 .filter_map(|iter| {
1129 let mut iter = iter
1130 .filter_map(
1131 |(task_id, meta, data): (
1132 _,
1133 Option<Result<SmallVec<_>>>,
1134 Option<Result<SmallVec<_>>>,
1135 )| {
1136 let meta = match meta {
1137 Some(Ok(meta)) => {
1138 #[cfg(feature = "print_cache_item_size")]
1139 task_cache_stats
1140 .lock()
1141 .entry(self.get_task_description(task_id))
1142 .or_default()
1143 .add_meta(meta.len());
1144 Some(meta)
1145 }
1146 None => None,
1147 Some(Err(err)) => {
1148 println!(
1149 "Serializing task {} failed (meta): {:?}",
1150 self.get_task_description(task_id),
1151 err
1152 );
1153 None
1154 }
1155 };
1156 let data = match data {
1157 Some(Ok(data)) => {
1158 #[cfg(feature = "print_cache_item_size")]
1159 task_cache_stats
1160 .lock()
1161 .entry(self.get_task_description(task_id))
1162 .or_default()
1163 .add_data(data.len());
1164 Some(data)
1165 }
1166 None => None,
1167 Some(Err(err)) => {
1168 println!(
1169 "Serializing task {} failed (data): {:?}",
1170 self.get_task_description(task_id),
1171 err
1172 );
1173 None
1174 }
1175 };
1176 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1177 },
1178 )
1179 .peekable();
1180 iter.peek().is_some().then_some(iter)
1181 })
1182 .collect::<Vec<_>>();
1183
1184 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1185
1186 let mut new_items = false;
1187
1188 if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
1189 new_items = true;
1190 if let Err(err) = self.backing_storage.save_snapshot(
1191 self.session_id,
1192 suspended_operations,
1193 persisted_task_cache_log,
1194 task_snapshots,
1195 ) {
1196 println!("Persisting failed: {err:?}");
1197 return None;
1198 }
1199 #[cfg(feature = "print_cache_item_size")]
1200 {
1201 let mut task_cache_stats = task_cache_stats
1202 .into_inner()
1203 .into_iter()
1204 .collect::<Vec<_>>();
1205 if !task_cache_stats.is_empty() {
1206 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1207 (stats_b.data + stats_b.meta, key_b)
1208 .cmp(&(stats_a.data + stats_a.meta, key_a))
1209 });
1210 println!("Task cache stats:");
1211 for (task_desc, stats) in task_cache_stats {
1212 use std::ops::Div;
1213
1214 use turbo_tasks::util::FormatBytes;
1215
1216 println!(
1217 " {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1218 FormatBytes(stats.data + stats.meta),
1219 FormatBytes(stats.meta),
1220 stats.meta_count,
1221 FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1222 FormatBytes(stats.data),
1223 stats.data_count,
1224 FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1225 );
1226 }
1227 }
1228 }
1229 }
1230
1231 if new_items {
1232 let elapsed = start.elapsed();
1233 if elapsed > Duration::from_secs(10) {
1235 turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1236 "Finished writing to filesystem cache".to_string(),
1237 elapsed,
1238 )));
1239 }
1240 }
1241
1242 Some((snapshot_time, new_items))
1243 }
1244
1245 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1246 if self.should_restore() {
1247 let uncompleted_operations = self
1251 .backing_storage
1252 .uncompleted_operations()
1253 .expect("Failed to get uncompleted operations");
1254 if !uncompleted_operations.is_empty() {
1255 let mut ctx = self.execute_context(turbo_tasks);
1256 for op in uncompleted_operations {
1257 op.execute(&mut ctx);
1258 }
1259 }
1260 }
1261
1262 if self.should_persist() {
1263 let _span = Span::none().entered();
1265 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1266 }
1267 }
1268
1269 fn stopping(&self) {
1270 self.stopping.store(true, Ordering::Release);
1271 self.stopping_event.notify(usize::MAX);
1272 }
1273
1274 #[allow(unused_variables)]
1275 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1276 #[cfg(feature = "verify_aggregation_graph")]
1277 {
1278 self.is_idle.store(false, Ordering::Release);
1279 self.verify_aggregation_graph(turbo_tasks, false);
1280 }
1281 if self.should_persist() {
1282 let _span = tracing::info_span!("persist on stop").entered();
1283 self.snapshot();
1284 }
1285 self.task_cache.drop_contents();
1286 drop_contents(&self.transient_tasks);
1287 self.storage.drop_contents();
1288 if let Err(err) = self.backing_storage.shutdown() {
1289 println!("Shutting down failed: {err}");
1290 }
1291 }
1292
1293 #[allow(unused_variables)]
1294 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1295 self.idle_start_event.notify(usize::MAX);
1296
1297 #[cfg(feature = "verify_aggregation_graph")]
1298 {
1299 use tokio::select;
1300
1301 self.is_idle.store(true, Ordering::Release);
1302 let this = self.clone();
1303 let turbo_tasks = turbo_tasks.pin();
1304 tokio::task::spawn(async move {
1305 select! {
1306 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1307 }
1309 _ = this.idle_end_event.listen() => {
1310 return;
1311 }
1312 }
1313 if !this.is_idle.load(Ordering::Relaxed) {
1314 return;
1315 }
1316 this.verify_aggregation_graph(&*turbo_tasks, true);
1317 });
1318 }
1319 }
1320
1321 fn idle_end(&self) {
1322 #[cfg(feature = "verify_aggregation_graph")]
1323 self.is_idle.store(false, Ordering::Release);
1324 self.idle_end_event.notify(usize::MAX);
1325 }
1326
1327 fn get_or_create_persistent_task(
1328 &self,
1329 task_type: CachedTaskType,
1330 parent_task: Option<TaskId>,
1331 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1332 ) -> TaskId {
1333 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1334 self.track_cache_hit(&task_type);
1335 self.connect_child(parent_task, task_id, turbo_tasks);
1336 return task_id;
1337 }
1338
1339 let check_backing_storage =
1340 self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1341 let tx = check_backing_storage
1342 .then(|| self.backing_storage.start_read_transaction())
1343 .flatten();
1344 let task_id = {
1345 if let Some(task_id) = unsafe {
1347 check_backing_storage
1348 .then(|| {
1349 self.backing_storage
1350 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1351 .expect("Failed to lookup task id")
1352 })
1353 .flatten()
1354 } {
1355 self.track_cache_hit(&task_type);
1356 let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1357 task_id
1358 } else {
1359 let task_type = Arc::new(task_type);
1360 let task_id = self.persisted_task_id_factory.get();
1361 let task_id = if let Err(existing_task_id) =
1362 self.task_cache.try_insert(task_type.clone(), task_id)
1363 {
1364 self.track_cache_hit(&task_type);
1365 unsafe {
1367 self.persisted_task_id_factory.reuse(task_id);
1368 }
1369 existing_task_id
1370 } else {
1371 self.track_cache_miss(&task_type);
1372 task_id
1373 };
1374 if let Some(log) = &self.persisted_task_cache_log {
1375 log.lock(task_id).push((task_type, task_id));
1376 }
1377 task_id
1378 }
1379 };
1380
1381 unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1383
1384 task_id
1385 }
1386
1387 fn get_or_create_transient_task(
1388 &self,
1389 task_type: CachedTaskType,
1390 parent_task: Option<TaskId>,
1391 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1392 ) -> TaskId {
1393 if let Some(parent_task) = parent_task
1394 && !parent_task.is_transient()
1395 {
1396 self.panic_persistent_calling_transient(
1397 self.lookup_task_type(parent_task).as_deref(),
1398 Some(&task_type),
1399 None,
1400 );
1401 }
1402 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1403 self.track_cache_hit(&task_type);
1404 self.connect_child(parent_task, task_id, turbo_tasks);
1405 return task_id;
1406 }
1407
1408 let task_type = Arc::new(task_type);
1409 let task_id = self.transient_task_id_factory.get();
1410 if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1411 self.track_cache_hit(&task_type);
1412 unsafe {
1414 self.transient_task_id_factory.reuse(task_id);
1415 }
1416 self.connect_child(parent_task, existing_task_id, turbo_tasks);
1417 return existing_task_id;
1418 }
1419
1420 self.track_cache_miss(&task_type);
1421 self.connect_child(parent_task, task_id, turbo_tasks);
1422
1423 task_id
1424 }
1425
1426 fn debug_trace_transient_task(
1429 &self,
1430 task_type: &CachedTaskType,
1431 cell_id: Option<CellId>,
1432 ) -> DebugTraceTransientTask {
1433 fn inner_id(
1436 backend: &TurboTasksBackendInner<impl BackingStorage>,
1437 task_id: TaskId,
1438 cell_type_id: Option<ValueTypeId>,
1439 visited_set: &mut FxHashSet<TaskId>,
1440 ) -> DebugTraceTransientTask {
1441 if let Some(task_type) = backend.lookup_task_type(task_id) {
1442 if visited_set.contains(&task_id) {
1443 let task_name = task_type.get_name();
1444 DebugTraceTransientTask::Collapsed {
1445 task_name,
1446 cell_type_id,
1447 }
1448 } else {
1449 inner_cached(backend, &task_type, cell_type_id, visited_set)
1450 }
1451 } else {
1452 DebugTraceTransientTask::Uncached { cell_type_id }
1453 }
1454 }
1455 fn inner_cached(
1456 backend: &TurboTasksBackendInner<impl BackingStorage>,
1457 task_type: &CachedTaskType,
1458 cell_type_id: Option<ValueTypeId>,
1459 visited_set: &mut FxHashSet<TaskId>,
1460 ) -> DebugTraceTransientTask {
1461 let task_name = task_type.get_name();
1462
1463 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1464 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1465 return None;
1469 };
1470 if task_id.is_transient() {
1471 Some(Box::new(inner_id(
1472 backend,
1473 task_id,
1474 cause_self_raw_vc.try_get_type_id(),
1475 visited_set,
1476 )))
1477 } else {
1478 None
1479 }
1480 });
1481 let cause_args = task_type
1482 .arg
1483 .get_raw_vcs()
1484 .into_iter()
1485 .filter_map(|raw_vc| {
1486 let Some(task_id) = raw_vc.try_get_task_id() else {
1487 return None;
1489 };
1490 if !task_id.is_transient() {
1491 return None;
1492 }
1493 Some((task_id, raw_vc.try_get_type_id()))
1494 })
1495 .collect::<IndexSet<_>>() .into_iter()
1497 .map(|(task_id, cell_type_id)| {
1498 inner_id(backend, task_id, cell_type_id, visited_set)
1499 })
1500 .collect();
1501
1502 DebugTraceTransientTask::Cached {
1503 task_name,
1504 cell_type_id,
1505 cause_self,
1506 cause_args,
1507 }
1508 }
1509 inner_cached(
1510 self,
1511 task_type,
1512 cell_id.map(|c| c.type_id),
1513 &mut FxHashSet::default(),
1514 )
1515 }
1516
1517 fn invalidate_task(
1518 &self,
1519 task_id: TaskId,
1520 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1521 ) {
1522 if !self.should_track_dependencies() {
1523 panic!("Dependency tracking is disabled so invalidation is not allowed");
1524 }
1525 operation::InvalidateOperation::run(
1526 smallvec![task_id],
1527 #[cfg(feature = "trace_task_dirty")]
1528 TaskDirtyCause::Invalidator,
1529 self.execute_context(turbo_tasks),
1530 );
1531 }
1532
1533 fn invalidate_tasks(
1534 &self,
1535 tasks: &[TaskId],
1536 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1537 ) {
1538 if !self.should_track_dependencies() {
1539 panic!("Dependency tracking is disabled so invalidation is not allowed");
1540 }
1541 operation::InvalidateOperation::run(
1542 tasks.iter().copied().collect(),
1543 #[cfg(feature = "trace_task_dirty")]
1544 TaskDirtyCause::Unknown,
1545 self.execute_context(turbo_tasks),
1546 );
1547 }
1548
1549 fn invalidate_tasks_set(
1550 &self,
1551 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1552 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1553 ) {
1554 if !self.should_track_dependencies() {
1555 panic!("Dependency tracking is disabled so invalidation is not allowed");
1556 }
1557 operation::InvalidateOperation::run(
1558 tasks.iter().copied().collect(),
1559 #[cfg(feature = "trace_task_dirty")]
1560 TaskDirtyCause::Unknown,
1561 self.execute_context(turbo_tasks),
1562 );
1563 }
1564
1565 fn invalidate_serialization(
1566 &self,
1567 task_id: TaskId,
1568 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1569 ) {
1570 if task_id.is_transient() {
1571 return;
1572 }
1573 let mut ctx = self.execute_context(turbo_tasks);
1574 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1575 task.invalidate_serialization();
1576 }
1577
1578 fn get_task_description(&self, task_id: TaskId) -> String {
1579 self.lookup_task_type(task_id).map_or_else(
1580 || format!("{task_id:?} transient"),
1581 |task_type| task_type.to_string(),
1582 )
1583 }
1584
1585 fn task_execution_canceled(
1586 &self,
1587 task_id: TaskId,
1588 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1589 ) {
1590 let mut ctx = self.execute_context(turbo_tasks);
1591 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1592 if let Some(in_progress) = remove!(task, InProgress) {
1593 match in_progress {
1594 InProgressState::Scheduled {
1595 done_event,
1596 reason: _,
1597 } => done_event.notify(usize::MAX),
1598 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1599 done_event.notify(usize::MAX)
1600 }
1601 InProgressState::Canceled => {}
1602 }
1603 }
1604 task.add_new(CachedDataItem::InProgress {
1605 value: InProgressState::Canceled,
1606 });
1607 }
1608
1609 fn try_start_task_execution(
1610 &self,
1611 task_id: TaskId,
1612 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1613 ) -> Option<TaskExecutionSpec<'_>> {
1614 enum TaskType {
1615 Cached(Arc<CachedTaskType>),
1616 Transient(Arc<TransientTask>),
1617 }
1618 let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1619 (TaskType::Cached(task_type), false)
1620 } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1621 (
1622 TaskType::Transient(task_type.clone()),
1623 matches!(**task_type, TransientTask::Once(_)),
1624 )
1625 } else {
1626 return None;
1627 };
1628 let execution_reason;
1629 {
1630 let mut ctx = self.execute_context(turbo_tasks);
1631 let mut task = ctx.task(task_id, TaskDataCategory::All);
1632 let in_progress = remove!(task, InProgress)?;
1633 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1634 task.add_new(CachedDataItem::InProgress { value: in_progress });
1635 return None;
1636 };
1637 execution_reason = reason;
1638 task.add_new(CachedDataItem::InProgress {
1639 value: InProgressState::InProgress(Box::new(InProgressStateInner {
1640 stale: false,
1641 once_task,
1642 done_event,
1643 session_dependent: false,
1644 marked_as_completed: false,
1645 new_children: Default::default(),
1646 })),
1647 });
1648
1649 enum Collectible {
1651 Current(CollectibleRef, i32),
1652 Outdated(CollectibleRef),
1653 }
1654 let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1655 .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1656 .collect::<Vec<_>>();
1657 for collectible in collectibles {
1658 match collectible {
1659 Collectible::Current(collectible, value) => {
1660 let _ =
1661 task.insert(CachedDataItem::OutdatedCollectible { collectible, value });
1662 }
1663 Collectible::Outdated(collectible) => {
1664 if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1665 task.remove(&CachedDataItemKey::OutdatedCollectible { collectible });
1666 }
1667 }
1668 }
1669 }
1670
1671 if self.should_track_dependencies() {
1672 enum Dep {
1674 CurrentCell(CellRef),
1675 CurrentOutput(TaskId),
1676 OutdatedCell(CellRef),
1677 OutdatedOutput(TaskId),
1678 }
1679 let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1680 .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1681 .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1682 .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1683 .collect::<Vec<_>>();
1684 for dep in dependencies {
1685 match dep {
1686 Dep::CurrentCell(cell) => {
1687 let _ = task.add(CachedDataItem::OutdatedCellDependency {
1688 target: cell,
1689 value: (),
1690 });
1691 }
1692 Dep::CurrentOutput(output) => {
1693 let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1694 target: output,
1695 value: (),
1696 });
1697 }
1698 Dep::OutdatedCell(cell) => {
1699 if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1700 task.remove(&CachedDataItemKey::OutdatedCellDependency {
1701 target: cell,
1702 });
1703 }
1704 }
1705 Dep::OutdatedOutput(output) => {
1706 if !task
1707 .has_key(&CachedDataItemKey::OutputDependency { target: output })
1708 {
1709 task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1710 target: output,
1711 });
1712 }
1713 }
1714 }
1715 }
1716 }
1717 }
1718
1719 let (span, future) = match task_type {
1720 TaskType::Cached(task_type) => {
1721 let CachedTaskType {
1722 native_fn,
1723 this,
1724 arg,
1725 } = &*task_type;
1726 (
1727 native_fn.span(task_id.persistence(), execution_reason),
1728 native_fn.execute(*this, &**arg),
1729 )
1730 }
1731 TaskType::Transient(task_type) => {
1732 let span = tracing::trace_span!("turbo_tasks::root_task");
1733 let future = match &*task_type {
1734 TransientTask::Root(f) => f(),
1735 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1736 };
1737 (span, future)
1738 }
1739 };
1740 Some(TaskExecutionSpec { future, span })
1741 }
1742
1743 fn task_execution_completed(
1744 &self,
1745 task_id: TaskId,
1746 result: Result<RawVc, TurboTasksExecutionError>,
1747 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1748 stateful: bool,
1749 has_invalidator: bool,
1750 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1751 ) -> bool {
1752 let span = tracing::trace_span!("task execution completed", immutable = Empty).entered();
1767 let mut ctx = self.execute_context(turbo_tasks);
1768
1769 let Some(TaskExecutionCompletePrepareResult {
1770 new_children,
1771 mut removed_data,
1772 is_now_immutable,
1773 #[cfg(feature = "verify_determinism")]
1774 no_output_set,
1775 new_output,
1776 output_dependent_tasks,
1777 }) = self.task_execution_completed_prepare(
1778 &mut ctx,
1779 &span,
1780 task_id,
1781 result,
1782 cell_counters,
1783 stateful,
1784 has_invalidator,
1785 )
1786 else {
1787 return true;
1789 };
1790
1791 if !output_dependent_tasks.is_empty() {
1796 self.task_execution_completed_invalidate_output_dependent(
1797 &mut ctx,
1798 task_id,
1799 output_dependent_tasks,
1800 );
1801 }
1802
1803 let has_new_children = !new_children.is_empty();
1804
1805 if has_new_children {
1806 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1807 }
1808
1809 if has_new_children
1810 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1811 {
1812 return true;
1814 }
1815
1816 if self.task_execution_completed_finish(
1817 &mut ctx,
1818 task_id,
1819 #[cfg(feature = "verify_determinism")]
1820 no_output_set,
1821 new_output,
1822 &mut removed_data,
1823 is_now_immutable,
1824 ) {
1825 return true;
1827 }
1828
1829 drop(removed_data);
1830
1831 self.task_execution_completed_cleanup(&mut ctx, task_id);
1832
1833 false
1834 }
1835
1836 fn task_execution_completed_prepare(
1837 &self,
1838 ctx: &mut impl ExecuteContext<'_>,
1839 span: &Span,
1840 task_id: TaskId,
1841 result: Result<RawVc, TurboTasksExecutionError>,
1842 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1843 stateful: bool,
1844 has_invalidator: bool,
1845 ) -> Option<TaskExecutionCompletePrepareResult> {
1846 let mut task = ctx.task(task_id, TaskDataCategory::All);
1847 let Some(in_progress) = get_mut!(task, InProgress) else {
1848 panic!("Task execution completed, but task is not in progress: {task:#?}");
1849 };
1850 if matches!(in_progress, InProgressState::Canceled) {
1851 return Some(TaskExecutionCompletePrepareResult {
1852 new_children: Default::default(),
1853 removed_data: Default::default(),
1854 is_now_immutable: false,
1855 #[cfg(feature = "verify_determinism")]
1856 no_output_set: false,
1857 new_output: None,
1858 output_dependent_tasks: Default::default(),
1859 });
1860 }
1861 let &mut InProgressState::InProgress(box InProgressStateInner {
1862 stale,
1863 ref mut new_children,
1864 session_dependent,
1865 ..
1866 }) = in_progress
1867 else {
1868 panic!("Task execution completed, but task is not in progress: {task:#?}");
1869 };
1870
1871 #[cfg(not(feature = "no_fast_stale"))]
1873 if stale {
1874 let Some(InProgressState::InProgress(box InProgressStateInner {
1875 done_event,
1876 mut new_children,
1877 ..
1878 })) = remove!(task, InProgress)
1879 else {
1880 unreachable!();
1881 };
1882 task.add_new(CachedDataItem::InProgress {
1883 value: InProgressState::Scheduled {
1884 done_event,
1885 reason: TaskExecutionReason::Stale,
1886 },
1887 });
1888 for task in iter_many!(task, Child { task } => task) {
1891 new_children.remove(&task);
1892 }
1893 drop(task);
1894
1895 AggregationUpdateQueue::run(
1898 AggregationUpdateJob::DecreaseActiveCounts {
1899 task_ids: new_children.into_iter().collect(),
1900 },
1901 ctx,
1902 );
1903 return None;
1904 }
1905
1906 let mut new_children = take(new_children);
1908
1909 if stateful {
1911 let _ = task.add(CachedDataItem::Stateful { value: () });
1912 }
1913
1914 if has_invalidator {
1916 let _ = task.add(CachedDataItem::HasInvalidator { value: () });
1917 }
1918
1919 let old_counters: FxHashMap<_, _> =
1921 get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1922 let mut counters_to_remove = old_counters.clone();
1923 for (&cell_type, &max_index) in cell_counters.iter() {
1924 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1925 if old_max_index != max_index {
1926 task.insert(CachedDataItem::CellTypeMaxIndex {
1927 cell_type,
1928 value: max_index,
1929 });
1930 }
1931 } else {
1932 task.add_new(CachedDataItem::CellTypeMaxIndex {
1933 cell_type,
1934 value: max_index,
1935 });
1936 }
1937 }
1938 for (cell_type, _) in counters_to_remove {
1939 task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1940 }
1941
1942 let mut queue = AggregationUpdateQueue::new();
1943
1944 let mut removed_data = Vec::new();
1945 let mut old_edges = Vec::new();
1946
1947 let has_children = !new_children.is_empty();
1948 let is_immutable = task.is_immutable();
1949 let task_dependencies_for_immutable =
1950 if !is_immutable
1952 && !session_dependent
1954 && !task.has_key(&CachedDataItemKey::HasInvalidator {})
1956 && count!(task, CollectiblesDependency) == 0
1958 {
1959 Some(
1960 iter_many!(task, OutputDependency { target } => target)
1962 .chain(iter_many!(task, CellDependency { target } => target.task))
1963 .collect::<FxHashSet<_>>(),
1964 )
1965 } else {
1966 None
1967 };
1968
1969 if has_children {
1970 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1972
1973 old_edges.extend(
1975 iter_many!(task, Child { task } => task)
1976 .filter(|task| !new_children.remove(task))
1977 .map(OutdatedEdge::Child),
1978 );
1979 } else {
1980 old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
1981 }
1982
1983 if task_id.is_transient() || iter_many!(task, CellData { cell }
1988 if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
1989 ).count() > 0 {
1990 removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
1991 matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
1992 .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
1993 }));
1994 }
1995
1996 old_edges.extend(
1997 task.iter(CachedDataItemType::OutdatedCollectible)
1998 .filter_map(|(key, value)| match (key, value) {
1999 (
2000 CachedDataItemKey::OutdatedCollectible { collectible },
2001 CachedDataItemValueRef::OutdatedCollectible { value },
2002 ) => Some(OutdatedEdge::Collectible(collectible, *value)),
2003 _ => None,
2004 }),
2005 );
2006
2007 if self.should_track_dependencies() {
2008 old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
2009 old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
2010 old_edges.extend(
2011 iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
2012 |(cell, task)| {
2013 if cell_counters
2014 .get(&cell.type_id)
2015 .is_none_or(|start_index| cell.index >= *start_index)
2016 && let Some(old_counter) = old_counters.get(&cell.type_id)
2017 && cell.index < *old_counter
2018 {
2019 return Some(OutdatedEdge::RemovedCellDependent {
2020 task_id: task,
2021 #[cfg(feature = "trace_task_dirty")]
2022 value_type_id: cell.type_id,
2023 });
2024 }
2025 None
2026 },
2027 ),
2028 );
2029 }
2030
2031 let current_output = get!(task, Output);
2033 #[cfg(feature = "verify_determinism")]
2034 let no_output_set = current_output.is_none();
2035 let new_output = match result {
2036 Ok(RawVc::TaskOutput(output_task_id)) => {
2037 if let Some(OutputValue::Output(current_task_id)) = current_output
2038 && *current_task_id == output_task_id
2039 {
2040 None
2041 } else {
2042 Some(OutputValue::Output(output_task_id))
2043 }
2044 }
2045 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2046 if let Some(OutputValue::Cell(CellRef {
2047 task: current_task_id,
2048 cell: current_cell,
2049 })) = current_output
2050 && *current_task_id == output_task_id
2051 && *current_cell == cell
2052 {
2053 None
2054 } else {
2055 Some(OutputValue::Cell(CellRef {
2056 task: output_task_id,
2057 cell,
2058 }))
2059 }
2060 }
2061 Ok(RawVc::LocalOutput(..)) => {
2062 panic!("Non-local tasks must not return a local Vc");
2063 }
2064 Err(err) => {
2065 if let Some(OutputValue::Error(old_error)) = current_output
2066 && old_error == &err
2067 {
2068 None
2069 } else {
2070 Some(OutputValue::Error(err))
2071 }
2072 }
2073 };
2074 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2075 if new_output.is_some() && ctx.should_track_dependencies() {
2077 output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
2078 }
2079
2080 drop(task);
2081
2082 let mut is_now_immutable = false;
2084 if let Some(dependencies) = task_dependencies_for_immutable
2085 && dependencies
2086 .iter()
2087 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).is_immutable())
2088 {
2089 is_now_immutable = true;
2090 }
2091 span.record("immutable", is_immutable || is_now_immutable);
2092
2093 if !queue.is_empty() || !old_edges.is_empty() {
2094 #[cfg(feature = "trace_task_completion")]
2095 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2096 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2100 }
2101
2102 Some(TaskExecutionCompletePrepareResult {
2103 new_children,
2104 removed_data,
2105 is_now_immutable,
2106 #[cfg(feature = "verify_determinism")]
2107 no_output_set,
2108 new_output,
2109 output_dependent_tasks,
2110 })
2111 }
2112
2113 fn task_execution_completed_invalidate_output_dependent(
2114 &self,
2115 ctx: &mut impl ExecuteContext<'_>,
2116 task_id: TaskId,
2117 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2118 ) {
2119 debug_assert!(!output_dependent_tasks.is_empty());
2120
2121 let mut queue = AggregationUpdateQueue::new();
2122 for dependent_task_id in output_dependent_tasks {
2123 if ctx.is_once_task(dependent_task_id) {
2124 continue;
2126 }
2127 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2128 if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
2129 continue;
2132 }
2133 if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
2134 continue;
2137 }
2138 make_task_dirty_internal(
2139 dependent,
2140 dependent_task_id,
2141 true,
2142 #[cfg(feature = "trace_task_dirty")]
2143 TaskDirtyCause::OutputChange { task_id },
2144 &mut queue,
2145 ctx,
2146 );
2147 }
2148
2149 queue.execute(ctx);
2150 }
2151
2152 fn task_execution_completed_unfinished_children_dirty(
2153 &self,
2154 ctx: &mut impl ExecuteContext<'_>,
2155 new_children: &FxHashSet<TaskId>,
2156 ) {
2157 debug_assert!(!new_children.is_empty());
2158
2159 let mut queue = AggregationUpdateQueue::new();
2160 for &child_id in new_children {
2161 let child_task = ctx.task(child_id, TaskDataCategory::Meta);
2162 if !child_task.has_key(&CachedDataItemKey::Output {}) {
2163 make_task_dirty_internal(
2164 child_task,
2165 child_id,
2166 false,
2167 #[cfg(feature = "trace_task_dirty")]
2168 TaskDirtyCause::InitialDirty,
2169 &mut queue,
2170 ctx,
2171 );
2172 }
2173 }
2174
2175 queue.execute(ctx);
2176 }
2177
2178 fn task_execution_completed_connect(
2179 &self,
2180 ctx: &mut impl ExecuteContext<'_>,
2181 task_id: TaskId,
2182 new_children: FxHashSet<TaskId>,
2183 ) -> bool {
2184 debug_assert!(!new_children.is_empty());
2185
2186 let mut task = ctx.task(task_id, TaskDataCategory::All);
2187 let Some(in_progress) = get!(task, InProgress) else {
2188 panic!("Task execution completed, but task is not in progress: {task:#?}");
2189 };
2190 if matches!(in_progress, InProgressState::Canceled) {
2191 return false;
2193 }
2194 let InProgressState::InProgress(box InProgressStateInner {
2195 #[cfg(not(feature = "no_fast_stale"))]
2196 stale,
2197 ..
2198 }) = in_progress
2199 else {
2200 panic!("Task execution completed, but task is not in progress: {task:#?}");
2201 };
2202
2203 #[cfg(not(feature = "no_fast_stale"))]
2205 if *stale {
2206 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2207 remove!(task, InProgress)
2208 else {
2209 unreachable!();
2210 };
2211 task.add_new(CachedDataItem::InProgress {
2212 value: InProgressState::Scheduled {
2213 done_event,
2214 reason: TaskExecutionReason::Stale,
2215 },
2216 });
2217 drop(task);
2218
2219 AggregationUpdateQueue::run(
2222 AggregationUpdateJob::DecreaseActiveCounts {
2223 task_ids: new_children.into_iter().collect(),
2224 },
2225 ctx,
2226 );
2227 return true;
2228 }
2229
2230 let has_active_count = ctx.should_track_activeness()
2231 && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
2232 connect_children(
2233 ctx,
2234 task_id,
2235 task,
2236 new_children,
2237 has_active_count,
2238 ctx.should_track_activeness(),
2239 );
2240
2241 false
2242 }
2243
2244 fn task_execution_completed_finish(
2245 &self,
2246 ctx: &mut impl ExecuteContext<'_>,
2247 task_id: TaskId,
2248 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2249 new_output: Option<OutputValue>,
2250 removed_data: &mut Vec<CachedDataItem>,
2251 is_now_immutable: bool,
2252 ) -> bool {
2253 let mut task = ctx.task(task_id, TaskDataCategory::All);
2254 let Some(in_progress) = remove!(task, InProgress) else {
2255 panic!("Task execution completed, but task is not in progress: {task:#?}");
2256 };
2257 if matches!(in_progress, InProgressState::Canceled) {
2258 return false;
2260 }
2261 let InProgressState::InProgress(box InProgressStateInner {
2262 done_event,
2263 once_task: _,
2264 stale,
2265 session_dependent,
2266 marked_as_completed: _,
2267 new_children,
2268 }) = in_progress
2269 else {
2270 panic!("Task execution completed, but task is not in progress: {task:#?}");
2271 };
2272 debug_assert!(new_children.is_empty());
2273
2274 if stale {
2276 task.add_new(CachedDataItem::InProgress {
2277 value: InProgressState::Scheduled {
2278 done_event,
2279 reason: TaskExecutionReason::Stale,
2280 },
2281 });
2282 return true;
2283 }
2284
2285 let mut old_content = None;
2287 if let Some(value) = new_output {
2288 old_content = task.insert(CachedDataItem::Output { value });
2289 }
2290
2291 if is_now_immutable {
2294 let _ = task.add(CachedDataItem::Immutable { value: () });
2295 }
2296
2297 removed_data.extend(task.extract_if(
2299 CachedDataItemType::InProgressCell,
2300 |key, value| match (key, value) {
2301 (
2302 CachedDataItemKey::InProgressCell { .. },
2303 CachedDataItemValueRef::InProgressCell { value },
2304 ) => {
2305 value.event.notify(usize::MAX);
2306 true
2307 }
2308 _ => false,
2309 },
2310 ));
2311
2312 let old_dirty_state = get!(task, Dirty).copied();
2314
2315 let new_dirty_state = if session_dependent {
2316 Some(DirtyState {
2317 clean_in_session: Some(self.session_id),
2318 })
2319 } else {
2320 None
2321 };
2322
2323 let dirty_changed = old_dirty_state != new_dirty_state;
2324 let data_update = if dirty_changed {
2325 if let Some(new_dirty_state) = new_dirty_state {
2326 task.insert(CachedDataItem::Dirty {
2327 value: new_dirty_state,
2328 });
2329 } else {
2330 task.remove(&CachedDataItemKey::Dirty {});
2331 }
2332
2333 if old_dirty_state.is_some() || new_dirty_state.is_some() {
2334 let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
2335 .cloned()
2336 .unwrap_or_default();
2337 if let Some(old_dirty_state) = old_dirty_state {
2338 dirty_containers.update_with_dirty_state(&old_dirty_state);
2339 }
2340 let aggregated_update = match (old_dirty_state, new_dirty_state) {
2341 (None, None) => unreachable!(),
2342 (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
2343 (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
2344 (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
2345 };
2346 if !aggregated_update.is_zero() {
2347 if aggregated_update.get(self.session_id) < 0
2348 && let Some(activeness_state) = get_mut!(task, Activeness)
2349 {
2350 activeness_state.all_clean_event.notify(usize::MAX);
2351 activeness_state.unset_active_until_clean();
2352 if activeness_state.is_empty() {
2353 task.remove(&CachedDataItemKey::Activeness {});
2354 }
2355 }
2356 AggregationUpdateJob::data_update(
2357 &mut task,
2358 AggregatedDataUpdate::new()
2359 .dirty_container_update(task_id, aggregated_update),
2360 )
2361 } else {
2362 None
2363 }
2364 } else {
2365 None
2366 }
2367 } else {
2368 None
2369 };
2370
2371 #[cfg(feature = "verify_determinism")]
2372 let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2373 #[cfg(not(feature = "verify_determinism"))]
2374 let reschedule = false;
2375 if reschedule {
2376 task.add_new(CachedDataItem::InProgress {
2377 value: InProgressState::Scheduled {
2378 done_event,
2379 reason: TaskExecutionReason::Stale,
2380 },
2381 });
2382 drop(task);
2383 } else {
2384 drop(task);
2385
2386 done_event.notify(usize::MAX);
2388 }
2389
2390 drop(old_content);
2391
2392 if let Some(data_update) = data_update {
2393 AggregationUpdateQueue::run(data_update, ctx);
2394 }
2395
2396 reschedule
2397 }
2398
2399 fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
2400 let mut task = ctx.task(task_id, TaskDataCategory::All);
2401 task.shrink_to_fit(CachedDataItemType::CellData);
2402 task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
2403 task.shrink_to_fit(CachedDataItemType::CellDependency);
2404 task.shrink_to_fit(CachedDataItemType::OutputDependency);
2405 task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
2406 drop(task);
2407 }
2408
2409 fn run_backend_job<'a>(
2410 self: &'a Arc<Self>,
2411 job: TurboTasksBackendJob,
2412 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2413 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2414 Box::pin(async move {
2415 match job {
2416 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2417 debug_assert!(self.should_persist());
2418
2419 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2420 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2421 loop {
2422 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2423 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2424 const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
2425
2426 let (time, mut reason) =
2427 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2428 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2429 } else {
2430 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2431 };
2432
2433 let until = last_snapshot + time;
2434 if until > Instant::now() {
2435 let mut stop_listener = self.stopping_event.listen();
2436 if self.stopping.load(Ordering::Acquire) {
2437 return;
2438 }
2439 let mut idle_start_listener = self.idle_start_event.listen();
2440 let mut idle_end_listener = self.idle_end_event.listen();
2441 let mut idle_time = if turbo_tasks.is_idle() {
2442 Instant::now() + IDLE_TIMEOUT
2443 } else {
2444 far_future()
2445 };
2446 loop {
2447 tokio::select! {
2448 _ = &mut stop_listener => {
2449 return;
2450 },
2451 _ = &mut idle_start_listener => {
2452 idle_time = Instant::now() + IDLE_TIMEOUT;
2453 idle_start_listener = self.idle_start_event.listen()
2454 },
2455 _ = &mut idle_end_listener => {
2456 idle_time = until + IDLE_TIMEOUT;
2457 idle_end_listener = self.idle_end_event.listen()
2458 },
2459 _ = tokio::time::sleep_until(until) => {
2460 break;
2461 },
2462 _ = tokio::time::sleep_until(idle_time) => {
2463 if turbo_tasks.is_idle() {
2464 reason = "idle timeout";
2465 break;
2466 }
2467 },
2468 }
2469 }
2470 }
2471
2472 let _span = info_span!("persist", reason = reason).entered();
2473 let this = self.clone();
2474 let snapshot = this.snapshot();
2475 if let Some((snapshot_start, new_data)) = snapshot {
2476 last_snapshot = snapshot_start;
2477 if new_data {
2478 continue;
2479 }
2480 let last_snapshot = last_snapshot.duration_since(self.start_time);
2481 self.last_snapshot.store(
2482 last_snapshot.as_millis().try_into().unwrap(),
2483 Ordering::Relaxed,
2484 );
2485
2486 let _span = Span::none().entered();
2487 turbo_tasks.schedule_backend_background_job(
2488 TurboTasksBackendJob::FollowUpSnapshot,
2489 );
2490 return;
2491 }
2492 }
2493 }
2494 TurboTasksBackendJob::Prefetch { data, range } => {
2495 let range = if let Some(range) = range {
2496 range
2497 } else {
2498 if data.len() > 128 {
2499 let chunk_size = good_chunk_size(data.len());
2500 let chunks = data.len().div_ceil(chunk_size);
2501 for i in 0..chunks {
2502 turbo_tasks.schedule_backend_background_job(
2503 TurboTasksBackendJob::Prefetch {
2504 data: data.clone(),
2505 range: Some(
2506 (i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2507 ),
2508 },
2509 );
2510 }
2511 return;
2512 }
2513 0..data.len()
2514 };
2515
2516 let _span = trace_span!("prefetching").entered();
2517 let mut ctx = self.execute_context(turbo_tasks);
2518 for i in range {
2519 let (&task, &with_data) = data.get_index(i).unwrap();
2520 let category = if with_data {
2521 TaskDataCategory::All
2522 } else {
2523 TaskDataCategory::Meta
2524 };
2525 drop(ctx.task(task, category));
2527 }
2528 }
2529 }
2530 })
2531 }
2532
2533 fn try_read_own_task_cell(
2534 &self,
2535 task_id: TaskId,
2536 cell: CellId,
2537 _options: ReadCellOptions,
2538 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2539 ) -> Result<TypedCellContent> {
2540 let mut ctx = self.execute_context(turbo_tasks);
2541 let task = ctx.task(task_id, TaskDataCategory::Data);
2542 if let Some(content) = get!(task, CellData { cell }) {
2543 Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
2544 } else {
2545 Ok(CellContent(None).into_typed(cell.type_id))
2546 }
2547 }
2548
2549 fn read_task_collectibles(
2550 &self,
2551 task_id: TaskId,
2552 collectible_type: TraitTypeId,
2553 reader_id: Option<TaskId>,
2554 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2555 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2556 let mut ctx = self.execute_context(turbo_tasks);
2557 let mut collectibles = AutoMap::default();
2558 {
2559 let mut task = ctx.task(task_id, TaskDataCategory::All);
2560 loop {
2562 let aggregation_number = get_aggregation_number(&task);
2563 if is_root_node(aggregation_number) {
2564 break;
2565 }
2566 drop(task);
2567 AggregationUpdateQueue::run(
2568 AggregationUpdateJob::UpdateAggregationNumber {
2569 task_id,
2570 base_aggregation_number: u32::MAX,
2571 distance: None,
2572 },
2573 &mut ctx,
2574 );
2575 task = ctx.task(task_id, TaskDataCategory::All);
2576 }
2577 for collectible in iter_many!(
2578 task,
2579 AggregatedCollectible {
2580 collectible
2581 } count if collectible.collectible_type == collectible_type && *count > 0 => {
2582 collectible.cell
2583 }
2584 ) {
2585 *collectibles
2586 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2587 .or_insert(0) += 1;
2588 }
2589 for (collectible, count) in iter_many!(
2590 task,
2591 Collectible {
2592 collectible
2593 } count if collectible.collectible_type == collectible_type => {
2594 (collectible.cell, *count)
2595 }
2596 ) {
2597 *collectibles
2598 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2599 .or_insert(0) += count;
2600 }
2601 if let Some(reader_id) = reader_id {
2602 let _ = task.add(CachedDataItem::CollectiblesDependent {
2603 collectible_type,
2604 task: reader_id,
2605 value: (),
2606 });
2607 }
2608 }
2609 if let Some(reader_id) = reader_id {
2610 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2611 let target = CollectiblesRef {
2612 task: task_id,
2613 collectible_type,
2614 };
2615 if reader
2616 .remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target })
2617 .is_none()
2618 {
2619 let _ = reader.add(CachedDataItem::CollectiblesDependency { target, value: () });
2620 }
2621 }
2622 collectibles
2623 }
2624
2625 fn emit_collectible(
2626 &self,
2627 collectible_type: TraitTypeId,
2628 collectible: RawVc,
2629 task_id: TaskId,
2630 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2631 ) {
2632 self.assert_valid_collectible(task_id, collectible);
2633
2634 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2635 panic!("Collectibles need to be resolved");
2636 };
2637 let cell = CellRef {
2638 task: collectible_task,
2639 cell,
2640 };
2641 operation::UpdateCollectibleOperation::run(
2642 task_id,
2643 CollectibleRef {
2644 collectible_type,
2645 cell,
2646 },
2647 1,
2648 self.execute_context(turbo_tasks),
2649 );
2650 }
2651
2652 fn unemit_collectible(
2653 &self,
2654 collectible_type: TraitTypeId,
2655 collectible: RawVc,
2656 count: u32,
2657 task_id: TaskId,
2658 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2659 ) {
2660 self.assert_valid_collectible(task_id, collectible);
2661
2662 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2663 panic!("Collectibles need to be resolved");
2664 };
2665 let cell = CellRef {
2666 task: collectible_task,
2667 cell,
2668 };
2669 operation::UpdateCollectibleOperation::run(
2670 task_id,
2671 CollectibleRef {
2672 collectible_type,
2673 cell,
2674 },
2675 -(i32::try_from(count).unwrap()),
2676 self.execute_context(turbo_tasks),
2677 );
2678 }
2679
2680 fn update_task_cell(
2681 &self,
2682 task_id: TaskId,
2683 cell: CellId,
2684 content: CellContent,
2685 verification_mode: VerificationMode,
2686 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2687 ) {
2688 operation::UpdateCellOperation::run(
2689 task_id,
2690 cell,
2691 content,
2692 verification_mode,
2693 self.execute_context(turbo_tasks),
2694 );
2695 }
2696
2697 fn mark_own_task_as_session_dependent(
2698 &self,
2699 task_id: TaskId,
2700 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2701 ) {
2702 if !self.should_track_dependencies() {
2703 return;
2705 }
2706 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2707 let mut ctx = self.execute_context(turbo_tasks);
2708 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2709 let aggregation_number = get_aggregation_number(&task);
2710 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2711 drop(task);
2712 AggregationUpdateQueue::run(
2715 AggregationUpdateJob::UpdateAggregationNumber {
2716 task_id,
2717 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2718 distance: None,
2719 },
2720 &mut ctx,
2721 );
2722 task = ctx.task(task_id, TaskDataCategory::Meta);
2723 }
2724 if let Some(InProgressState::InProgress(box InProgressStateInner {
2725 session_dependent,
2726 ..
2727 })) = get_mut!(task, InProgress)
2728 {
2729 *session_dependent = true;
2730 }
2731 }
2732
2733 fn mark_own_task_as_finished(
2734 &self,
2735 task: TaskId,
2736 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2737 ) {
2738 let mut ctx = self.execute_context(turbo_tasks);
2739 let mut task = ctx.task(task, TaskDataCategory::Data);
2740 if let Some(InProgressState::InProgress(box InProgressStateInner {
2741 marked_as_completed,
2742 ..
2743 })) = get_mut!(task, InProgress)
2744 {
2745 *marked_as_completed = true;
2746 }
2751 }
2752
2753 fn set_own_task_aggregation_number(
2754 &self,
2755 task: TaskId,
2756 aggregation_number: u32,
2757 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2758 ) {
2759 let mut ctx = self.execute_context(turbo_tasks);
2760 AggregationUpdateQueue::run(
2761 AggregationUpdateJob::UpdateAggregationNumber {
2762 task_id: task,
2763 base_aggregation_number: aggregation_number,
2764 distance: None,
2765 },
2766 &mut ctx,
2767 );
2768 }
2769
2770 fn connect_task(
2771 &self,
2772 task: TaskId,
2773 parent_task: Option<TaskId>,
2774 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2775 ) {
2776 self.assert_not_persistent_calling_transient(parent_task, task, None);
2777 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2778 }
2779
2780 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2781 let task_id = self.transient_task_id_factory.get();
2782 let root_type = match task_type {
2783 TransientTaskType::Root(_) => RootType::RootTask,
2784 TransientTaskType::Once(_) => RootType::OnceTask,
2785 };
2786 self.transient_tasks.insert(
2787 task_id,
2788 Arc::new(match task_type {
2789 TransientTaskType::Root(f) => TransientTask::Root(f),
2790 TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2791 }),
2792 );
2793 {
2794 let mut task = self.storage.access_mut(task_id);
2795 task.add(CachedDataItem::AggregationNumber {
2796 value: AggregationNumber {
2797 base: u32::MAX,
2798 distance: 0,
2799 effective: u32::MAX,
2800 },
2801 });
2802 if self.should_track_activeness() {
2803 task.add(CachedDataItem::Activeness {
2804 value: ActivenessState::new_root(root_type, task_id),
2805 });
2806 }
2807 task.add(CachedDataItem::new_scheduled(
2808 TaskExecutionReason::Initial,
2809 move || {
2810 move || match root_type {
2811 RootType::RootTask => "Root Task".to_string(),
2812 RootType::OnceTask => "Once Task".to_string(),
2813 }
2814 },
2815 ));
2816 }
2817 #[cfg(feature = "verify_aggregation_graph")]
2818 self.root_tasks.lock().insert(task_id);
2819 task_id
2820 }
2821
2822 fn dispose_root_task(
2823 &self,
2824 task_id: TaskId,
2825 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2826 ) {
2827 #[cfg(feature = "verify_aggregation_graph")]
2828 self.root_tasks.lock().remove(&task_id);
2829
2830 let mut ctx = self.execute_context(turbo_tasks);
2831 let mut task = ctx.task(task_id, TaskDataCategory::All);
2832 let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2833 let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2834 .map_or(false, |dirty_containers| {
2835 dirty_containers.get(self.session_id) > 0
2836 });
2837 if is_dirty || has_dirty_containers {
2838 if let Some(activeness_state) = get_mut!(task, Activeness) {
2839 activeness_state.unset_root_type();
2841 activeness_state.set_active_until_clean();
2842 };
2843 } else if let Some(activeness_state) = remove!(task, Activeness) {
2844 activeness_state.all_clean_event.notify(usize::MAX);
2847 }
2848 }
2849
2850 #[cfg(feature = "verify_aggregation_graph")]
2851 fn verify_aggregation_graph(
2852 &self,
2853 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2854 idle: bool,
2855 ) {
2856 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2857 return;
2858 }
2859 use std::{collections::VecDeque, env, io::stdout};
2860
2861 use crate::backend::operation::{get_uppers, is_aggregating_node};
2862
2863 let mut ctx = self.execute_context(turbo_tasks);
2864 let root_tasks = self.root_tasks.lock().clone();
2865
2866 for task_id in root_tasks.into_iter() {
2867 let mut queue = VecDeque::new();
2868 let mut visited = FxHashSet::default();
2869 let mut aggregated_nodes = FxHashSet::default();
2870 let mut collectibles = FxHashMap::default();
2871 let root_task_id = task_id;
2872 visited.insert(task_id);
2873 aggregated_nodes.insert(task_id);
2874 queue.push_back(task_id);
2875 let mut counter = 0;
2876 while let Some(task_id) = queue.pop_front() {
2877 counter += 1;
2878 if counter % 100000 == 0 {
2879 println!(
2880 "queue={}, visited={}, aggregated_nodes={}",
2881 queue.len(),
2882 visited.len(),
2883 aggregated_nodes.len()
2884 );
2885 }
2886 let task = ctx.task(task_id, TaskDataCategory::All);
2887 if idle && !self.is_idle.load(Ordering::Relaxed) {
2888 return;
2889 }
2890
2891 let uppers = get_uppers(&task);
2892 if task_id != root_task_id
2893 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2894 {
2895 panic!(
2896 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2897 {:?})",
2898 task_id,
2899 ctx.get_task_description(task_id),
2900 uppers
2901 );
2902 }
2903
2904 let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2905 for collectible in aggregated_collectibles {
2906 collectibles
2907 .entry(collectible)
2908 .or_insert_with(|| (false, Vec::new()))
2909 .1
2910 .push(task_id);
2911 }
2912
2913 let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2914 for collectible in own_collectibles {
2915 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2916 *flag = true
2917 } else {
2918 panic!(
2919 "Task {} has a collectible {:?} that is not in any upper task",
2920 task_id, collectible
2921 );
2922 }
2923 }
2924
2925 let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2926 let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2927 .is_some_and(|count| count.get(self.session_id) > 0);
2928 let should_be_in_upper = is_dirty || has_dirty_container;
2929
2930 let aggregation_number = get_aggregation_number(&task);
2931 if is_aggregating_node(aggregation_number) {
2932 aggregated_nodes.insert(task_id);
2933 }
2934 for child_id in iter_many!(task, Child { task } => task) {
2941 if visited.insert(child_id) {
2943 queue.push_back(child_id);
2944 }
2945 }
2946 drop(task);
2947
2948 if should_be_in_upper {
2949 for upper_id in uppers {
2950 let task = ctx.task(task_id, TaskDataCategory::All);
2951 let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2952 .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2953 if !in_upper {
2954 panic!(
2955 "Task {} ({}) is dirty, but is not listed in the upper task {} \
2956 ({})",
2957 task_id,
2958 ctx.get_task_description(task_id),
2959 upper_id,
2960 ctx.get_task_description(upper_id)
2961 );
2962 }
2963 }
2964 }
2965 }
2966
2967 for (collectible, (flag, task_ids)) in collectibles {
2968 if !flag {
2969 use std::io::Write;
2970 let mut stdout = stdout().lock();
2971 writeln!(
2972 stdout,
2973 "{:?} that is not emitted in any child task but in these aggregated \
2974 tasks: {:#?}",
2975 collectible,
2976 task_ids
2977 .iter()
2978 .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
2979 .collect::<Vec<_>>()
2980 )
2981 .unwrap();
2982
2983 let task_id = collectible.cell.task;
2984 let mut queue = {
2985 let task = ctx.task(task_id, TaskDataCategory::All);
2986 get_uppers(&task)
2987 };
2988 let mut visited = FxHashSet::default();
2989 for &upper_id in queue.iter() {
2990 visited.insert(upper_id);
2991 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
2992 }
2993 while let Some(task_id) = queue.pop() {
2994 let desc = ctx.get_task_description(task_id);
2995 let task = ctx.task(task_id, TaskDataCategory::All);
2996 let aggregated_collectible =
2997 get!(task, AggregatedCollectible { collectible })
2998 .copied()
2999 .unwrap_or_default();
3000 let uppers = get_uppers(&task);
3001 drop(task);
3002 writeln!(
3003 stdout,
3004 "upper {task_id} {desc} collectible={aggregated_collectible}"
3005 )
3006 .unwrap();
3007 if task_ids.contains(&task_id) {
3008 writeln!(
3009 stdout,
3010 "Task has an upper connection to an aggregated task that doesn't \
3011 reference it. Upper connection is invalid!"
3012 )
3013 .unwrap();
3014 }
3015 for upper_id in uppers {
3016 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3017 if !visited.contains(&upper_id) {
3018 queue.push(upper_id);
3019 }
3020 }
3021 }
3022 panic!("See stdout for more details");
3023 }
3024 }
3025 }
3026 }
3027
3028 fn assert_not_persistent_calling_transient(
3029 &self,
3030 parent_id: Option<TaskId>,
3031 child_id: TaskId,
3032 cell_id: Option<CellId>,
3033 ) {
3034 if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
3035 self.panic_persistent_calling_transient(
3036 parent_id
3037 .and_then(|id| self.lookup_task_type(id))
3038 .as_deref(),
3039 self.lookup_task_type(child_id).as_deref(),
3040 cell_id,
3041 );
3042 }
3043 }
3044
3045 fn panic_persistent_calling_transient(
3046 &self,
3047 parent: Option<&CachedTaskType>,
3048 child: Option<&CachedTaskType>,
3049 cell_id: Option<CellId>,
3050 ) {
3051 let transient_reason = if let Some(child) = child {
3052 Cow::Owned(format!(
3053 " The callee is transient because it depends on:\n{}",
3054 self.debug_trace_transient_task(child, cell_id),
3055 ))
3056 } else {
3057 Cow::Borrowed("")
3058 };
3059 panic!(
3060 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3061 parent.map_or("unknown", |t| t.get_name()),
3062 child.map_or("unknown", |t| t.get_name()),
3063 transient_reason,
3064 );
3065 }
3066
3067 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3068 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3070 let task_info = if let Some(col_task_ty) = collectible
3072 .try_get_task_id()
3073 .and_then(|t| self.lookup_task_type(t))
3074 {
3075 Cow::Owned(format!(" (return type of {col_task_ty})"))
3076 } else {
3077 Cow::Borrowed("")
3078 };
3079 panic!("Collectible{task_info} must be a ResolvedVc")
3080 };
3081 if col_task_id.is_transient() && !task_id.is_transient() {
3082 let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
3083 Cow::Owned(format!(
3084 ". The collectible is transient because it depends on:\n{}",
3085 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3086 ))
3087 } else {
3088 Cow::Borrowed("")
3089 };
3090 panic!(
3092 "Collectible is transient, transient collectibles cannot be emitted from \
3093 persistent tasks{transient_reason}",
3094 )
3095 }
3096 }
3097}
3098
3099impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3100 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3101 self.0.startup(turbo_tasks);
3102 }
3103
3104 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3105 self.0.stopping();
3106 }
3107
3108 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3109 self.0.stop(turbo_tasks);
3110 }
3111
3112 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3113 self.0.idle_start(turbo_tasks);
3114 }
3115
3116 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3117 self.0.idle_end();
3118 }
3119
3120 fn get_or_create_persistent_task(
3121 &self,
3122 task_type: CachedTaskType,
3123 parent_task: Option<TaskId>,
3124 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3125 ) -> TaskId {
3126 self.0
3127 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3128 }
3129
3130 fn get_or_create_transient_task(
3131 &self,
3132 task_type: CachedTaskType,
3133 parent_task: Option<TaskId>,
3134 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3135 ) -> TaskId {
3136 self.0
3137 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3138 }
3139
3140 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3141 self.0.invalidate_task(task_id, turbo_tasks);
3142 }
3143
3144 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3145 self.0.invalidate_tasks(tasks, turbo_tasks);
3146 }
3147
3148 fn invalidate_tasks_set(
3149 &self,
3150 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3151 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3152 ) {
3153 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3154 }
3155
3156 fn invalidate_serialization(
3157 &self,
3158 task_id: TaskId,
3159 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3160 ) {
3161 self.0.invalidate_serialization(task_id, turbo_tasks);
3162 }
3163
3164 fn get_task_description(&self, task: TaskId) -> String {
3165 self.0.get_task_description(task)
3166 }
3167
3168 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3169 self.0.task_execution_canceled(task, turbo_tasks)
3170 }
3171
3172 fn try_start_task_execution(
3173 &self,
3174 task_id: TaskId,
3175 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3176 ) -> Option<TaskExecutionSpec<'_>> {
3177 self.0.try_start_task_execution(task_id, turbo_tasks)
3178 }
3179
3180 fn task_execution_completed(
3181 &self,
3182 task_id: TaskId,
3183 result: Result<RawVc, TurboTasksExecutionError>,
3184 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3185 stateful: bool,
3186 has_invalidator: bool,
3187 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3188 ) -> bool {
3189 self.0.task_execution_completed(
3190 task_id,
3191 result,
3192 cell_counters,
3193 stateful,
3194 has_invalidator,
3195 turbo_tasks,
3196 )
3197 }
3198
3199 type BackendJob = TurboTasksBackendJob;
3200
3201 fn run_backend_job<'a>(
3202 &'a self,
3203 job: Self::BackendJob,
3204 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3205 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3206 self.0.run_backend_job(job, turbo_tasks)
3207 }
3208
3209 fn try_read_task_output(
3210 &self,
3211 task_id: TaskId,
3212 reader: Option<TaskId>,
3213 options: ReadOutputOptions,
3214 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3215 ) -> Result<Result<RawVc, EventListener>> {
3216 self.0
3217 .try_read_task_output(task_id, reader, options, turbo_tasks)
3218 }
3219
3220 fn try_read_task_cell(
3221 &self,
3222 task_id: TaskId,
3223 cell: CellId,
3224 reader: Option<TaskId>,
3225 options: ReadCellOptions,
3226 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3227 ) -> Result<Result<TypedCellContent, EventListener>> {
3228 self.0
3229 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3230 }
3231
3232 fn try_read_own_task_cell(
3233 &self,
3234 task_id: TaskId,
3235 cell: CellId,
3236 options: ReadCellOptions,
3237 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3238 ) -> Result<TypedCellContent> {
3239 self.0
3240 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3241 }
3242
3243 fn read_task_collectibles(
3244 &self,
3245 task_id: TaskId,
3246 collectible_type: TraitTypeId,
3247 reader: Option<TaskId>,
3248 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3249 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3250 self.0
3251 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3252 }
3253
3254 fn emit_collectible(
3255 &self,
3256 collectible_type: TraitTypeId,
3257 collectible: RawVc,
3258 task_id: TaskId,
3259 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3260 ) {
3261 self.0
3262 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3263 }
3264
3265 fn unemit_collectible(
3266 &self,
3267 collectible_type: TraitTypeId,
3268 collectible: RawVc,
3269 count: u32,
3270 task_id: TaskId,
3271 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3272 ) {
3273 self.0
3274 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3275 }
3276
3277 fn update_task_cell(
3278 &self,
3279 task_id: TaskId,
3280 cell: CellId,
3281 content: CellContent,
3282 verification_mode: VerificationMode,
3283 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3284 ) {
3285 self.0
3286 .update_task_cell(task_id, cell, content, verification_mode, turbo_tasks);
3287 }
3288
3289 fn mark_own_task_as_finished(
3290 &self,
3291 task_id: TaskId,
3292 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3293 ) {
3294 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3295 }
3296
3297 fn set_own_task_aggregation_number(
3298 &self,
3299 task: TaskId,
3300 aggregation_number: u32,
3301 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3302 ) {
3303 self.0
3304 .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3305 }
3306
3307 fn mark_own_task_as_session_dependent(
3308 &self,
3309 task: TaskId,
3310 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3311 ) {
3312 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3313 }
3314
3315 fn connect_task(
3316 &self,
3317 task: TaskId,
3318 parent_task: Option<TaskId>,
3319 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3320 ) {
3321 self.0.connect_task(task, parent_task, turbo_tasks);
3322 }
3323
3324 fn create_transient_task(
3325 &self,
3326 task_type: TransientTaskType,
3327 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3328 ) -> TaskId {
3329 self.0.create_transient_task(task_type)
3330 }
3331
3332 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3333 self.0.dispose_root_task(task_id, turbo_tasks);
3334 }
3335
3336 fn task_statistics(&self) -> &TaskStatisticsApi {
3337 &self.0.task_statistics
3338 }
3339
3340 fn is_tracking_dependencies(&self) -> bool {
3341 self.0.options.dependency_tracking
3342 }
3343}
3344
3345enum DebugTraceTransientTask {
3346 Cached {
3347 task_name: &'static str,
3348 cell_type_id: Option<ValueTypeId>,
3349 cause_self: Option<Box<DebugTraceTransientTask>>,
3350 cause_args: Vec<DebugTraceTransientTask>,
3351 },
3352 Collapsed {
3354 task_name: &'static str,
3355 cell_type_id: Option<ValueTypeId>,
3356 },
3357 Uncached {
3358 cell_type_id: Option<ValueTypeId>,
3359 },
3360}
3361
3362impl DebugTraceTransientTask {
3363 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3364 let indent = " ".repeat(level);
3365 f.write_str(&indent)?;
3366
3367 fn fmt_cell_type_id(
3368 f: &mut fmt::Formatter<'_>,
3369 cell_type_id: Option<ValueTypeId>,
3370 ) -> fmt::Result {
3371 if let Some(ty) = cell_type_id {
3372 write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3373 } else {
3374 Ok(())
3375 }
3376 }
3377
3378 match self {
3380 Self::Cached {
3381 task_name,
3382 cell_type_id,
3383 ..
3384 }
3385 | Self::Collapsed {
3386 task_name,
3387 cell_type_id,
3388 ..
3389 } => {
3390 f.write_str(task_name)?;
3391 fmt_cell_type_id(f, *cell_type_id)?;
3392 if matches!(self, Self::Collapsed { .. }) {
3393 f.write_str(" (collapsed)")?;
3394 }
3395 }
3396 Self::Uncached { cell_type_id } => {
3397 f.write_str("unknown transient task")?;
3398 fmt_cell_type_id(f, *cell_type_id)?;
3399 }
3400 }
3401 f.write_char('\n')?;
3402
3403 if let Self::Cached {
3405 cause_self,
3406 cause_args,
3407 ..
3408 } = self
3409 {
3410 if let Some(c) = cause_self {
3411 writeln!(f, "{indent} self:")?;
3412 c.fmt_indented(f, level + 1)?;
3413 }
3414 if !cause_args.is_empty() {
3415 writeln!(f, "{indent} args:")?;
3416 for c in cause_args {
3417 c.fmt_indented(f, level + 1)?;
3418 }
3419 }
3420 }
3421 Ok(())
3422 }
3423}
3424
3425impl fmt::Display for DebugTraceTransientTask {
3426 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3427 self.fmt_indented(f, 0)
3428 }
3429}
3430
3431fn far_future() -> Instant {
3433 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3438}