1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9 borrow::Cow,
10 fmt::{self, Write},
11 future::Future,
12 hash::BuildHasherDefault,
13 mem::take,
14 pin::Pin,
15 sync::{
16 Arc, LazyLock,
17 atomic::{AtomicBool, Ordering},
18 },
19 time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32 CellId, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions,
33 ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, TaskExecutionReason,
34 TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic,
35 ValueTypeId,
36 backend::{
37 Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
38 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40 VerificationMode,
41 },
42 event::{Event, EventDescription, EventListener},
43 macro_helpers::NativeFunction,
44 message_queue::{TimingEvent, TraceEvent},
45 registry::get_value_type,
46 scope::scope_and_block,
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51
52pub use self::{
53 operation::AnyOperation,
54 storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
55};
56#[cfg(feature = "trace_task_dirty")]
57use crate::backend::operation::TaskDirtyCause;
58use crate::{
59 backend::{
60 operation::{
61 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64 connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65 prepare_new_children,
66 },
67 snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68 storage::Storage,
69 storage_schema::{TaskStorage, TaskStorageAccessors},
70 },
71 backing_storage::{BackingStorage, SnapshotItem, SnapshotMeta, compute_task_type_hash},
72 data::{
73 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
74 InProgressState, InProgressStateInner, OutputValue, TransientTask,
75 },
76 error::TaskError,
77 utils::{
78 dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79 shard_amount::compute_shard_amount,
80 },
81};
82
83const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92 .ok()
93 .and_then(|v| v.parse::<u64>().ok())
94 .map(Duration::from_millis)
95 .unwrap_or(Duration::from_secs(2))
96});
97
98fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105 TaskPriority::invalidation(
106 task.get_leaf_distance()
107 .copied()
108 .unwrap_or_default()
109 .distance,
110 )
111 .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115 ReadOnly,
117 ReadWrite,
120 ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126 pub dependency_tracking: bool,
131
132 pub active_tracking: bool,
138
139 pub storage_mode: Option<StorageMode>,
141
142 pub num_workers: Option<usize>,
145
146 pub small_preallocation: bool,
148
149 pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156 fn default() -> Self {
157 Self {
158 dependency_tracking: true,
159 active_tracking: true,
160 storage_mode: Some(StorageMode::ReadWrite),
161 num_workers: None,
162 small_preallocation: false,
163 evict_after_snapshot: false,
164 }
165 }
166}
167
168pub enum TurboTasksBackendJob {
169 Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175 options: BackendOptions,
176
177 start_time: Instant,
178
179 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182 storage: Storage,
183
184 snapshot_coord: SnapshotCoordinator,
187 snapshot_in_progress: Mutex<()>,
192
193 stopping: AtomicBool,
194 stopping_event: Event,
195 idle_start_event: Event,
196 idle_end_event: Event,
197 #[cfg(feature = "verify_aggregation_graph")]
198 is_idle: AtomicBool,
199
200 task_statistics: TaskStatisticsApi,
201
202 backing_storage: B,
203
204 #[cfg(feature = "verify_aggregation_graph")]
205 root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210 Self(Arc::new(TurboTasksBackendInner::new(
211 options,
212 backing_storage,
213 )))
214 }
215
216 pub fn backing_storage(&self) -> &B {
217 &self.0.backing_storage
218 }
219
220 pub fn snapshot_and_evict_for_testing(
227 &self,
228 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
229 ) -> (bool, EvictionCounts) {
230 self.0.snapshot_and_evict_for_testing(turbo_tasks)
231 }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237 if !options.dependency_tracking {
238 options.active_tracking = false;
239 }
240 let small_preallocation = options.small_preallocation;
241 let next_task_id = backing_storage
242 .next_free_task_id()
243 .expect("Failed to get task id");
244 Self {
245 options,
246 start_time: Instant::now(),
247 persisted_task_id_factory: IdFactoryWithReuse::new(
248 next_task_id,
249 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250 ),
251 transient_task_id_factory: IdFactoryWithReuse::new(
252 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253 TaskId::MAX,
254 ),
255 storage: Storage::new(shard_amount, small_preallocation),
256 snapshot_coord: SnapshotCoordinator::new(),
257 snapshot_in_progress: Mutex::new(()),
258 stopping: AtomicBool::new(false),
259 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262 #[cfg(feature = "verify_aggregation_graph")]
263 is_idle: AtomicBool::new(false),
264 task_statistics: TaskStatisticsApi::default(),
265 backing_storage,
266 #[cfg(feature = "verify_aggregation_graph")]
267 root_tasks: Default::default(),
268 }
269 }
270
271 fn execute_context<'a>(
272 &'a self,
273 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
274 ) -> impl ExecuteContext<'a> {
275 ExecuteContextImpl::new(self, turbo_tasks)
276 }
277
278 fn suspending_requested(&self) -> bool {
279 self.should_persist() && self.snapshot_coord.snapshot_pending()
280 }
281
282 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283 if self.should_persist() {
284 self.snapshot_coord.suspend_point(suspend);
285 }
286 }
287
288 pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289 if !self.should_persist() {
290 return OperationGuard::noop();
291 }
292 self.snapshot_coord.begin_operation()
293 }
294
295 fn should_persist(&self) -> bool {
296 matches!(
297 self.options.storage_mode,
298 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299 )
300 }
301
302 fn should_evict(&self) -> bool {
303 self.options.evict_after_snapshot && self.should_persist()
304 }
305
306 #[doc(hidden)]
313 pub fn snapshot_and_evict_for_testing(
314 &self,
315 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
316 ) -> (bool, EvictionCounts) {
317 assert!(
318 self.should_persist(),
319 "snapshot_and_evict requires persistence"
320 );
321 let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322 let had_new_data = match snapshot_result {
323 Ok((_, new_data)) => new_data,
324 Err(_) => {
325 return (false, EvictionCounts::default());
329 }
330 };
331 let counts = self.storage.evict_after_snapshot(None);
332 (had_new_data, counts)
333 }
334
335 fn should_restore(&self) -> bool {
336 self.options.storage_mode.is_some()
337 }
338
339 fn should_track_dependencies(&self) -> bool {
340 self.options.dependency_tracking
341 }
342
343 fn should_track_activeness(&self) -> bool {
344 self.options.active_tracking
345 }
346
347 fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348 self.task_statistics
349 .map(|stats| stats.increment_cache_hit(native_fn));
350 }
351
352 fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353 self.task_statistics
354 .map(|stats| stats.increment_cache_miss(native_fn));
355 }
356
357 fn task_error_to_turbo_tasks_execution_error(
362 &self,
363 error: &TaskError,
364 ctx: &mut impl ExecuteContext<'_>,
365 ) -> TurboTasksExecutionError {
366 match error {
367 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369 message: item.message.clone(),
370 source: item
371 .source
372 .as_ref()
373 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374 })),
375 TaskError::LocalTaskContext(local_task_context) => {
376 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377 name: local_task_context.name.clone(),
378 source: local_task_context
379 .source
380 .as_ref()
381 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382 }))
383 }
384 TaskError::TaskChain(chain) => {
385 let task_id = chain.last().unwrap();
386 let error = {
387 let task = ctx.task(*task_id, TaskDataCategory::Meta);
388 if let Some(OutputValue::Error(error)) = task.get_output() {
389 Some(error.clone())
390 } else {
391 None
392 }
393 };
394 let error = error.map_or_else(
395 || {
396 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399 "Error no longer available",
400 )),
401 location: None,
402 }))
403 },
404 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405 );
406 let mut current_error = error;
407 for &task_id in chain.iter().rev() {
408 current_error =
409 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410 task_id,
411 source: Some(current_error),
412 turbo_tasks: ctx.turbo_tasks(),
413 }));
414 }
415 current_error
416 }
417 }
418 }
419}
420
421struct TaskExecutionCompletePrepareResult {
423 pub new_children: FxHashSet<TaskId>,
424 pub is_now_immutable: bool,
425 #[cfg(feature = "verify_determinism")]
426 pub no_output_set: bool,
427 pub new_output: Option<OutputValue>,
428 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
429 pub is_recomputation: bool,
430 pub is_session_dependent: bool,
431}
432
433impl<B: BackingStorage> TurboTasksBackendInner<B> {
435 fn try_read_task_output(
436 self: &Arc<Self>,
437 task_id: TaskId,
438 reader: Option<TaskId>,
439 options: ReadOutputOptions,
440 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
441 ) -> Result<Result<RawVc, EventListener>> {
442 self.assert_not_persistent_calling_transient(reader, task_id, None);
443
444 let mut ctx = self.execute_context(turbo_tasks);
445 let need_reader_task = if self.should_track_dependencies()
446 && !matches!(options.tracking, ReadTracking::Untracked)
447 && reader.is_some_and(|reader_id| reader_id != task_id)
448 && let Some(reader_id) = reader
449 && reader_id != task_id
450 {
451 Some(reader_id)
452 } else {
453 None
454 };
455 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
456 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
460 (task, Some(reader))
461 } else {
462 (ctx.task(task_id, TaskDataCategory::All), None)
463 };
464
465 fn listen_to_done_event(
466 reader_description: Option<EventDescription>,
467 tracking: ReadTracking,
468 done_event: &Event,
469 ) -> EventListener {
470 done_event.listen_with_note(move || {
471 move || {
472 if let Some(reader_description) = reader_description.as_ref() {
473 format!(
474 "try_read_task_output from {} ({})",
475 reader_description, tracking
476 )
477 } else {
478 format!("try_read_task_output ({})", tracking)
479 }
480 }
481 })
482 }
483
484 fn check_in_progress(
485 task: &impl TaskGuard,
486 reader_description: Option<EventDescription>,
487 tracking: ReadTracking,
488 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
489 {
490 match task.get_in_progress() {
491 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
492 listen_to_done_event(reader_description, tracking, done_event),
493 ))),
494 Some(InProgressState::InProgress(box InProgressStateInner {
495 done_event, ..
496 })) => Some(Ok(Err(listen_to_done_event(
497 reader_description,
498 tracking,
499 done_event,
500 )))),
501 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
502 "{} was canceled",
503 task.get_task_description()
504 ))),
505 None => None,
506 }
507 }
508
509 if matches!(options.consistency, ReadConsistency::Strong) {
510 if task
511 .get_persistent_task_type()
512 .is_some_and(|t| !t.native_fn.is_root)
513 {
514 drop(task);
515 drop(reader_task);
516 panic!(
517 "Strongly consistent read of non-root task {} (reader: {}). The `root` \
518 attribute is missing on the task.",
519 self.debug_get_task_description(task_id),
520 reader.map_or_else(
521 || "unknown".to_string(),
522 |r| self.debug_get_task_description(r)
523 )
524 );
525 }
526
527 let is_dirty = task.is_dirty();
528
529 let has_dirty_containers = task.has_dirty_containers();
531 if has_dirty_containers || is_dirty.is_some() {
532 let activeness = task.get_activeness_mut();
533 let mut task_ids_to_schedule: Vec<_> = Vec::new();
534 let activeness = if let Some(activeness) = activeness {
536 activeness.set_active_until_clean();
540 activeness
541 } else {
542 if ctx.should_track_activeness() {
546 task_ids_to_schedule = task.dirty_containers().collect();
548 task_ids_to_schedule.push(task_id);
549 }
550 let activeness =
551 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
552 activeness.set_active_until_clean();
553 activeness
554 };
555 let listener = activeness.all_clean_event.listen_with_note(move || {
556 let this = self.clone();
557 let tt = turbo_tasks.pin();
558 move || {
559 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
560 let mut ctx = this.execute_context(tt);
561 let mut visited = FxHashSet::default();
562 fn indent(s: &str) -> String {
563 s.split_inclusive('\n')
564 .flat_map(|line: &str| [" ", line].into_iter())
565 .collect::<String>()
566 }
567 fn get_info(
568 ctx: &mut impl ExecuteContext<'_>,
569 task_id: TaskId,
570 parent_and_count: Option<(TaskId, i32)>,
571 visited: &mut FxHashSet<TaskId>,
572 ) -> String {
573 let task = ctx.task(task_id, TaskDataCategory::All);
574 let is_dirty = task.is_dirty();
575 let in_progress =
576 task.get_in_progress()
577 .map_or("not in progress", |p| match p {
578 InProgressState::InProgress(_) => "in progress",
579 InProgressState::Scheduled { .. } => "scheduled",
580 InProgressState::Canceled => "canceled",
581 });
582 let activeness = task.get_activeness().map_or_else(
583 || "not active".to_string(),
584 |activeness| format!("{activeness:?}"),
585 );
586 let aggregation_number = get_aggregation_number(&task);
587 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
588 {
589 let uppers = get_uppers(&task);
590 !uppers.contains(&parent_task_id)
591 } else {
592 false
593 };
594
595 let has_dirty_containers = task.has_dirty_containers();
597
598 let task_description = task.get_task_description();
599 let is_dirty_label = if let Some(parent_priority) = is_dirty {
600 format!(", dirty({parent_priority})")
601 } else {
602 String::new()
603 };
604 let has_dirty_containers_label = if has_dirty_containers {
605 ", dirty containers"
606 } else {
607 ""
608 };
609 let count = if let Some((_, count)) = parent_and_count {
610 format!(" {count}")
611 } else {
612 String::new()
613 };
614 let mut info = format!(
615 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
616 {in_progress}, \
617 {activeness}{is_dirty_label}{has_dirty_containers_label})",
618 );
619 let children: Vec<_> = task.dirty_containers_with_count().collect();
620 drop(task);
621
622 if missing_upper {
623 info.push_str("\n ERROR: missing upper connection");
624 }
625
626 if has_dirty_containers || !children.is_empty() {
627 writeln!(info, "\n dirty tasks:").unwrap();
628
629 for (child_task_id, count) in children {
630 let task_description = ctx
631 .task(child_task_id, TaskDataCategory::Data)
632 .get_task_description();
633 if visited.insert(child_task_id) {
634 let child_info = get_info(
635 ctx,
636 child_task_id,
637 Some((task_id, count)),
638 visited,
639 );
640 info.push_str(&indent(&child_info));
641 if !info.ends_with('\n') {
642 info.push('\n');
643 }
644 } else {
645 writeln!(
646 info,
647 " {child_task_id} {task_description} {count} \
648 (already visited)"
649 )
650 .unwrap();
651 }
652 }
653 }
654 info
655 }
656 let info = get_info(&mut ctx, task_id, None, &mut visited);
657 format!(
658 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
659 )
660 }
661 });
662 drop(reader_task);
663 drop(task);
664 if !task_ids_to_schedule.is_empty() {
665 let mut queue = AggregationUpdateQueue::new();
666 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
667 queue.execute(&mut ctx);
668 }
669
670 return Ok(Err(listener));
671 }
672 }
673
674 let reader_description = reader_task
675 .as_ref()
676 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
677 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
678 {
679 return value;
680 }
681
682 if let Some(output) = task.get_output() {
683 let result = match output {
684 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
685 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
686 OutputValue::Error(error) => Err(error.clone()),
687 };
688 if let Some(mut reader_task) = reader_task.take()
689 && options.tracking.should_track(result.is_err())
690 && (!task.immutable() || cfg!(feature = "verify_immutable"))
691 {
692 #[cfg(feature = "trace_task_output_dependencies")]
693 let _span = tracing::trace_span!(
694 "add output dependency",
695 task = %task_id,
696 dependent_task = ?reader
697 )
698 .entered();
699 let mut queue = LeafDistanceUpdateQueue::new();
700 let reader = reader.unwrap();
701 if task.add_output_dependent(reader) {
702 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
704 let reader_leaf_distance =
705 reader_task.get_leaf_distance().copied().unwrap_or_default();
706 if reader_leaf_distance.distance <= leaf_distance.distance {
707 queue.push(
708 reader,
709 leaf_distance.distance,
710 leaf_distance.max_distance_in_buffer,
711 );
712 }
713 }
714
715 drop(task);
716
717 if !reader_task.remove_outdated_output_dependencies(&task_id) {
723 let _ = reader_task.add_output_dependencies(task_id);
724 }
725 drop(reader_task);
726
727 queue.execute(&mut ctx);
728 } else {
729 drop(task);
730 }
731
732 return result.map_err(|error| {
733 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
734 .with_task_context(task_id, turbo_tasks.pin())
735 .into()
736 });
737 }
738 drop(reader_task);
739
740 let note = EventDescription::new(|| {
741 move || {
742 if let Some(reader) = reader_description.as_ref() {
743 format!("try_read_task_output (recompute) from {reader}",)
744 } else {
745 "try_read_task_output (recompute, untracked)".to_string()
746 }
747 }
748 });
749
750 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
752 TaskExecutionReason::OutputNotAvailable,
753 EventDescription::new(|| task.get_task_desc_fn()),
754 note,
755 );
756
757 let old = task.set_in_progress(in_progress_state);
760 debug_assert!(old.is_none(), "InProgress already exists");
761 ctx.schedule_task(task, TaskPriority::Recomputation);
762
763 Ok(Err(listener))
764 }
765
766 fn try_read_task_cell(
767 &self,
768 task_id: TaskId,
769 reader: Option<TaskId>,
770 cell: CellId,
771 options: ReadCellOptions,
772 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
773 ) -> Result<Result<TypedCellContent, EventListener>> {
774 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
775
776 fn add_cell_dependency(
777 task_id: TaskId,
778 mut task: impl TaskGuard,
779 reader: Option<TaskId>,
780 reader_task: Option<impl TaskGuard>,
781 cell: CellId,
782 key: Option<u64>,
783 ) {
784 if let Some(mut reader_task) = reader_task
785 && (!task.immutable() || cfg!(feature = "verify_immutable"))
786 {
787 let reader = reader.unwrap();
788 let _ = task.add_cell_dependents((cell, key, reader));
789 drop(task);
790
791 let target = CellRef {
797 task: task_id,
798 cell,
799 };
800 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
801 let _ = reader_task.add_cell_dependencies((target, key));
802 }
803 drop(reader_task);
804 }
805 }
806
807 let ReadCellOptions {
808 tracking,
809 final_read_hint,
810 } = options;
811
812 let mut ctx = self.execute_context(turbo_tasks);
813 let (mut task, reader_task) = if self.should_track_dependencies()
814 && !matches!(tracking, ReadCellTracking::Untracked)
815 && let Some(reader_id) = reader
816 && reader_id != task_id
817 {
818 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
822 (task, Some(reader))
823 } else {
824 (ctx.task(task_id, TaskDataCategory::All), None)
825 };
826
827 let content = if final_read_hint {
828 task.remove_cell_data(&cell)
829 } else {
830 task.get_cell_data(&cell).cloned()
831 };
832 if let Some(content) = content {
833 if tracking.should_track(false) {
834 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
835 }
836 return Ok(Ok(TypedCellContent(
837 cell.type_id,
838 CellContent(Some(content)),
839 )));
840 }
841
842 let in_progress = task.get_in_progress();
843 if matches!(
844 in_progress,
845 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
846 ) {
847 return Ok(Err(self
848 .listen_to_cell(&mut task, task_id, &reader_task, cell)
849 .0));
850 }
851 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
852
853 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
855 let Some(max_id) = max_id else {
856 let task_desc = task.get_task_description();
857 if tracking.should_track(true) {
858 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
859 }
860 bail!(
861 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
862 );
863 };
864 if cell.index >= max_id {
865 let task_desc = task.get_task_description();
866 if tracking.should_track(true) {
867 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
868 }
869 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
870 }
871
872 if is_cancelled {
878 bail!("{} was canceled", task.get_task_description());
879 }
880
881 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
883 drop(reader_task);
884 if !new_listener {
885 return Ok(Err(listener));
886 }
887
888 let _span = tracing::trace_span!(
889 "recomputation",
890 cell_type = get_value_type(cell.type_id).ty.global_name,
891 cell_index = cell.index
892 )
893 .entered();
894
895 let _ = task.add_scheduled(
896 TaskExecutionReason::CellNotAvailable,
897 EventDescription::new(|| task.get_task_desc_fn()),
898 );
899 ctx.schedule_task(task, TaskPriority::Recomputation);
900
901 Ok(Err(listener))
902 }
903
904 fn listen_to_cell(
905 &self,
906 task: &mut impl TaskGuard,
907 task_id: TaskId,
908 reader_task: &Option<impl TaskGuard>,
909 cell: CellId,
910 ) -> (EventListener, bool) {
911 let note = || {
912 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
913 move || {
914 if let Some(reader_desc) = reader_desc.as_ref() {
915 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
916 } else {
917 "try_read_task_cell (in progress, untracked)".to_string()
918 }
919 }
920 };
921 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
922 let listener = in_progress.event.listen_with_note(note);
924 return (listener, false);
925 }
926 let in_progress = InProgressCellState::new(task_id, cell);
927 let listener = in_progress.event.listen_with_note(note);
928 let old = task.insert_in_progress_cells(cell, in_progress);
929 debug_assert!(old.is_none(), "InProgressCell already exists");
930 (listener, true)
931 }
932
933 fn snapshot_and_persist(
934 &self,
935 parent_span: Option<tracing::Id>,
936 reason: &str,
937 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
938 ) -> Result<(Instant, bool), anyhow::Error> {
939 let snapshot_span =
940 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
941 .entered();
942 let _snapshot_in_progress = self.snapshot_in_progress.lock();
946 let start = Instant::now();
947 let wall_start = SystemTime::now();
951 debug_assert!(self.should_persist());
952
953 let mut snapshot_phase = {
954 let _span = tracing::info_span!("blocking").entered();
955 self.snapshot_coord.begin_snapshot()
956 };
957 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
960
961 let suspended_operations = snapshot_phase.take_suspended_operations();
962
963 let snapshot_time = Instant::now();
964 drop(snapshot_phase);
965
966 if !has_modifications {
967 drop(snapshot_guard);
970 return Ok((start, false));
971 }
972
973 #[cfg(feature = "print_cache_item_size")]
974 #[derive(Default)]
975 struct TaskCacheStats {
976 data: usize,
977 #[cfg(feature = "print_cache_item_size_with_compressed")]
978 data_compressed: usize,
979 data_count: usize,
980 meta: usize,
981 #[cfg(feature = "print_cache_item_size_with_compressed")]
982 meta_compressed: usize,
983 meta_count: usize,
984 upper_count: usize,
985 collectibles_count: usize,
986 aggregated_collectibles_count: usize,
987 children_count: usize,
988 followers_count: usize,
989 collectibles_dependents_count: usize,
990 aggregated_dirty_containers_count: usize,
991 output_size: usize,
992 }
993 #[cfg(feature = "print_cache_item_size")]
996 struct FormatSizes {
997 size: usize,
998 #[cfg(feature = "print_cache_item_size_with_compressed")]
999 compressed_size: usize,
1000 }
1001 #[cfg(feature = "print_cache_item_size")]
1002 impl std::fmt::Display for FormatSizes {
1003 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1004 use turbo_tasks::util::FormatBytes;
1005 #[cfg(feature = "print_cache_item_size_with_compressed")]
1006 {
1007 write!(
1008 f,
1009 "{} ({} compressed)",
1010 FormatBytes(self.size),
1011 FormatBytes(self.compressed_size)
1012 )
1013 }
1014 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1015 {
1016 write!(f, "{}", FormatBytes(self.size))
1017 }
1018 }
1019 }
1020 #[cfg(feature = "print_cache_item_size")]
1021 impl TaskCacheStats {
1022 #[cfg(feature = "print_cache_item_size_with_compressed")]
1023 fn compressed_size(data: &[u8]) -> Result<usize> {
1024 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1025 data,
1026 &mut Vec::new(),
1027 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1028 )?)
1029 }
1030
1031 fn add_data(&mut self, data: &[u8]) {
1032 self.data += data.len();
1033 #[cfg(feature = "print_cache_item_size_with_compressed")]
1034 {
1035 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1036 }
1037 self.data_count += 1;
1038 }
1039
1040 fn add_meta(&mut self, data: &[u8]) {
1041 self.meta += data.len();
1042 #[cfg(feature = "print_cache_item_size_with_compressed")]
1043 {
1044 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1045 }
1046 self.meta_count += 1;
1047 }
1048
1049 fn add_counts(&mut self, storage: &TaskStorage) {
1050 let counts = storage.meta_counts();
1051 self.upper_count += counts.upper;
1052 self.collectibles_count += counts.collectibles;
1053 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1054 self.children_count += counts.children;
1055 self.followers_count += counts.followers;
1056 self.collectibles_dependents_count += counts.collectibles_dependents;
1057 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1058 if let Some(output) = storage.get_output() {
1059 use turbo_bincode::turbo_bincode_encode;
1060
1061 self.output_size += turbo_bincode_encode(&output)
1062 .map(|data| data.len())
1063 .unwrap_or(0);
1064 }
1065 }
1066
1067 fn task_name(storage: &TaskStorage) -> String {
1069 storage
1070 .get_persistent_task_type()
1071 .map(|t| t.to_string())
1072 .unwrap_or_else(|| "<unknown>".to_string())
1073 }
1074
1075 fn sort_key(&self) -> usize {
1078 #[cfg(feature = "print_cache_item_size_with_compressed")]
1079 {
1080 self.data_compressed + self.meta_compressed
1081 }
1082 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1083 {
1084 self.data + self.meta
1085 }
1086 }
1087
1088 fn format_total(&self) -> FormatSizes {
1089 FormatSizes {
1090 size: self.data + self.meta,
1091 #[cfg(feature = "print_cache_item_size_with_compressed")]
1092 compressed_size: self.data_compressed + self.meta_compressed,
1093 }
1094 }
1095
1096 fn format_data(&self) -> FormatSizes {
1097 FormatSizes {
1098 size: self.data,
1099 #[cfg(feature = "print_cache_item_size_with_compressed")]
1100 compressed_size: self.data_compressed,
1101 }
1102 }
1103
1104 fn format_avg_data(&self) -> FormatSizes {
1105 FormatSizes {
1106 size: self.data.checked_div(self.data_count).unwrap_or(0),
1107 #[cfg(feature = "print_cache_item_size_with_compressed")]
1108 compressed_size: self
1109 .data_compressed
1110 .checked_div(self.data_count)
1111 .unwrap_or(0),
1112 }
1113 }
1114
1115 fn format_meta(&self) -> FormatSizes {
1116 FormatSizes {
1117 size: self.meta,
1118 #[cfg(feature = "print_cache_item_size_with_compressed")]
1119 compressed_size: self.meta_compressed,
1120 }
1121 }
1122
1123 fn format_avg_meta(&self) -> FormatSizes {
1124 FormatSizes {
1125 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1126 #[cfg(feature = "print_cache_item_size_with_compressed")]
1127 compressed_size: self
1128 .meta_compressed
1129 .checked_div(self.meta_count)
1130 .unwrap_or(0),
1131 }
1132 }
1133 }
1134 #[cfg(feature = "print_cache_item_size")]
1135 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1136 Mutex::new(FxHashMap::default());
1137
1138 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1145 let encode_category = |task_id: TaskId,
1146 data: &TaskStorage,
1147 category: SpecificTaskDataCategory,
1148 buffer: &mut TurboBincodeBuffer|
1149 -> Option<TurboBincodeBuffer> {
1150 match encode_task_data(task_id, data, category, buffer) {
1151 Ok(encoded) => {
1152 #[cfg(feature = "print_cache_item_size")]
1153 {
1154 let mut stats = task_cache_stats.lock();
1155 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1156 match category {
1157 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1158 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1159 }
1160 }
1161 Some(encoded)
1162 }
1163 Err(err) => {
1164 panic!(
1165 "Serializing task {} failed ({:?}): {:?}",
1166 self.debug_get_task_description(task_id),
1167 category,
1168 err
1169 );
1170 }
1171 }
1172 };
1173 if task_id.is_transient() {
1174 unreachable!("transient task_ids should never be enqueued to be persisted");
1175 }
1176
1177 let encode_meta = inner.flags.meta_modified();
1178 let encode_data = inner.flags.data_modified();
1179
1180 #[cfg(feature = "print_cache_item_size")]
1181 if encode_data || encode_meta {
1182 task_cache_stats
1183 .lock()
1184 .entry(TaskCacheStats::task_name(inner))
1185 .or_default()
1186 .add_counts(inner);
1187 }
1188
1189 let meta = if encode_meta {
1190 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1191 } else {
1192 None
1193 };
1194
1195 let data = if encode_data {
1196 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1197 } else {
1198 None
1199 };
1200 let task_type_hash = if inner.flags.new_task() {
1201 let task_type = inner.get_persistent_task_type().expect(
1202 "It is not possible for a new_task to not have a persistent_task_type. Task \
1203 creation for persistent tasks uses a single ExecutionContextImpl for \
1204 creating the task (which sets new_task) and connect_child (which sets \
1205 persistent_task_type) and take_snapshot waits for all operations to complete \
1206 or suspend before we start snapshotting. So task creation will always set \
1207 the task_type.",
1208 );
1209 Some(compute_task_type_hash(task_type))
1210 } else {
1211 None
1212 };
1213
1214 SnapshotItem {
1215 task_id,
1216 meta,
1217 data,
1218 task_type_hash,
1219 }
1220 };
1221
1222 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1223
1224 drop(snapshot_span);
1225 let snapshot_duration = start.elapsed();
1226 let task_count = task_snapshots.len();
1227
1228 if task_snapshots.is_empty() {
1229 std::hint::cold_path();
1232 return Ok((snapshot_time, false));
1233 }
1234
1235 let persist_start = Instant::now();
1236 let span = tracing::info_span!(
1237 parent: parent_span,
1238 "persist",
1239 reason = reason,
1240 data_items= tracing::field::Empty,
1241 meta_items= tracing::field::Empty,
1242 task_cache_items= tracing::field::Empty,
1243 next_task_id= tracing::field::Empty,)
1244 .entered();
1245 {
1246 let SnapshotMeta {
1250 task_cache_items,
1251 data_items,
1252 meta_items,
1253 max_next_task_id,
1254 } = self
1255 .backing_storage
1256 .save_snapshot(suspended_operations, task_snapshots)?;
1257 span.record("data_items", data_items);
1258 span.record("meta_items", meta_items);
1259 span.record("task_cache_items", task_cache_items);
1260 span.record("next_task_id", max_next_task_id);
1261
1262 #[cfg(feature = "print_cache_item_size")]
1263 {
1264 let mut task_cache_stats = task_cache_stats
1265 .into_inner()
1266 .into_iter()
1267 .collect::<Vec<_>>();
1268 if !task_cache_stats.is_empty() {
1269 use turbo_tasks::util::FormatBytes;
1270
1271 use crate::utils::markdown_table::print_markdown_table;
1272
1273 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1274 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1275 });
1276
1277 println!(
1278 "Task cache stats: {}",
1279 FormatSizes {
1280 size: task_cache_stats
1281 .iter()
1282 .map(|(_, s)| s.data + s.meta)
1283 .sum::<usize>(),
1284 #[cfg(feature = "print_cache_item_size_with_compressed")]
1285 compressed_size: task_cache_stats
1286 .iter()
1287 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1288 .sum::<usize>()
1289 },
1290 );
1291
1292 print_markdown_table(
1293 [
1294 "Task",
1295 " Total Size",
1296 " Data Size",
1297 " Data Count x Avg",
1298 " Data Count x Avg",
1299 " Meta Size",
1300 " Meta Count x Avg",
1301 " Meta Count x Avg",
1302 " Uppers",
1303 " Coll",
1304 " Agg Coll",
1305 " Children",
1306 " Followers",
1307 " Coll Deps",
1308 " Agg Dirty",
1309 " Output Size",
1310 ],
1311 task_cache_stats.iter(),
1312 |(task_desc, stats)| {
1313 [
1314 task_desc.to_string(),
1315 format!(" {}", stats.format_total()),
1316 format!(" {}", stats.format_data()),
1317 format!(" {} x", stats.data_count),
1318 format!("{}", stats.format_avg_data()),
1319 format!(" {}", stats.format_meta()),
1320 format!(" {} x", stats.meta_count),
1321 format!("{}", stats.format_avg_meta()),
1322 format!(" {}", stats.upper_count),
1323 format!(" {}", stats.collectibles_count),
1324 format!(" {}", stats.aggregated_collectibles_count),
1325 format!(" {}", stats.children_count),
1326 format!(" {}", stats.followers_count),
1327 format!(" {}", stats.collectibles_dependents_count),
1328 format!(" {}", stats.aggregated_dirty_containers_count),
1329 format!(" {}", FormatBytes(stats.output_size)),
1330 ]
1331 },
1332 );
1333 }
1334 }
1335 }
1336
1337 let elapsed = start.elapsed();
1338 let persist_duration = persist_start.elapsed();
1339 if elapsed > Duration::from_secs(10) {
1341 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1342 "Finished writing to filesystem cache".to_string(),
1343 elapsed,
1344 )));
1345 }
1346
1347 let wall_start_ms = wall_start
1348 .duration_since(SystemTime::UNIX_EPOCH)
1349 .unwrap_or_default()
1350 .as_secs_f64()
1352 * 1000.0;
1353 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1354 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1355 "turbopack-persistence",
1356 wall_start_ms,
1357 wall_end_ms,
1358 vec![
1359 ("reason", serde_json::Value::from(reason)),
1360 (
1361 "snapshot_duration_ms",
1362 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1363 ),
1364 (
1365 "persist_duration_ms",
1366 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1367 ),
1368 ("task_count", serde_json::Value::from(task_count)),
1369 ],
1370 )));
1371
1372 Ok((snapshot_time, true))
1373 }
1374
1375 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1376 if self.should_restore() {
1377 let uncompleted_operations = self
1381 .backing_storage
1382 .uncompleted_operations()
1383 .expect("Failed to get uncompleted operations");
1384 if !uncompleted_operations.is_empty() {
1385 let mut ctx = self.execute_context(turbo_tasks);
1386 for op in uncompleted_operations {
1387 op.execute(&mut ctx);
1388 }
1389 }
1390 }
1391
1392 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1395 let _span = trace_span!("persisting background job").entered();
1397 let _span = tracing::info_span!("thread").entered();
1398 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1399 }
1400 }
1401
1402 fn stopping(&self) {
1403 self.stopping.store(true, Ordering::Release);
1404 self.stopping_event.notify(usize::MAX);
1405 }
1406
1407 #[allow(unused_variables)]
1408 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1409 #[cfg(feature = "verify_aggregation_graph")]
1410 {
1411 self.is_idle.store(false, Ordering::Release);
1412 self.verify_aggregation_graph(turbo_tasks, false);
1413 }
1414 if self.should_persist()
1415 && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1416 {
1417 eprintln!("Persisting failed during shutdown: {err:?}");
1418 }
1419 self.storage.drop_contents();
1420 if let Err(err) = self.backing_storage.shutdown() {
1421 println!("Shutting down failed: {err}");
1422 }
1423 }
1424
1425 #[allow(unused_variables)]
1426 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1427 self.idle_start_event.notify(usize::MAX);
1428
1429 #[cfg(feature = "verify_aggregation_graph")]
1430 {
1431 use tokio::select;
1432
1433 self.is_idle.store(true, Ordering::Release);
1434 let this = self.clone();
1435 let turbo_tasks = turbo_tasks.pin();
1436 tokio::task::spawn(async move {
1437 select! {
1438 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1439 }
1441 _ = this.idle_end_event.listen() => {
1442 return;
1443 }
1444 }
1445 if !this.is_idle.load(Ordering::Relaxed) {
1446 return;
1447 }
1448 this.verify_aggregation_graph(&*turbo_tasks, true);
1449 });
1450 }
1451 }
1452
1453 fn idle_end(&self) {
1454 #[cfg(feature = "verify_aggregation_graph")]
1455 self.is_idle.store(false, Ordering::Release);
1456 self.idle_end_event.notify(usize::MAX);
1457 }
1458
1459 fn get_or_create_task(
1460 &self,
1461 native_fn: &'static NativeFunction,
1462 this: Option<RawVc>,
1463 arg: &mut dyn StackDynTaskInputs,
1464 parent_task: Option<TaskId>,
1465 persistence: TaskPersistence,
1466 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1467 ) -> TaskId {
1468 let transient = matches!(persistence, TaskPersistence::Transient);
1469
1470 if transient
1471 && let Some(parent_task) = parent_task
1472 && !parent_task.is_transient()
1473 {
1474 let task_type = CachedTaskType {
1475 native_fn,
1476 this,
1477 arg: arg.take_box(),
1478 };
1479 self.panic_persistent_calling_transient(
1480 self.debug_get_task_description(parent_task),
1481 Some(&task_type),
1482 None,
1483 );
1484 }
1485
1486 let is_root = native_fn.is_root;
1487
1488 let arg_ref = arg.as_ref();
1490 let hash = CachedTaskType::hash_from_components(
1491 self.storage.task_cache.hasher(),
1492 native_fn,
1493 this,
1494 arg_ref,
1495 );
1496 let shard = get_shard(&self.storage.task_cache, hash);
1500
1501 let mut ctx = self.execute_context(turbo_tasks);
1502 if let Some(task_id) =
1506 raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1507 {
1508 self.track_cache_hit_by_fn(native_fn);
1509 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1510 return task_id;
1511 }
1512
1513 let task_id = if !transient
1518 && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1519 {
1520 self.track_cache_hit_by_fn(native_fn);
1521 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1524 k.eq_components(native_fn, this, arg_ref)
1525 }) {
1526 RawEntry::Occupied(_) => {}
1527 RawEntry::Vacant(e) => {
1528 e.insert(stored_type, task_id);
1529 }
1530 };
1531 task_id
1532 } else {
1533 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1534 k.eq_components(native_fn, this, arg_ref)
1535 }) {
1536 RawEntry::Occupied(e) => {
1537 let task_id = *e.get();
1540 drop(e);
1541 self.track_cache_hit_by_fn(native_fn);
1542 task_id
1543 }
1544 RawEntry::Vacant(e) => {
1545 let task_type = Arc::new(CachedTaskType {
1549 native_fn,
1550 this,
1551 arg: arg.take_box(),
1552 });
1553 let task_id = if transient {
1554 self.transient_task_id_factory.get()
1555 } else {
1556 self.persisted_task_id_factory.get()
1557 };
1558 self.storage
1562 .initialize_new_task(task_id, Some(task_type.clone()));
1563 e.insert(task_type, task_id);
1565 self.track_cache_miss_by_fn(native_fn);
1566 if is_root {
1570 AggregationUpdateQueue::run(
1571 AggregationUpdateJob::UpdateAggregationNumber {
1572 task_id,
1573 base_aggregation_number: u32::MAX,
1574 distance: None,
1575 },
1576 &mut ctx,
1577 );
1578 } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1579 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1580 AggregationUpdateQueue::run(
1581 AggregationUpdateJob::UpdateAggregationNumber {
1582 task_id,
1583 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1584 distance: None,
1585 },
1586 &mut ctx,
1587 );
1588 };
1589
1590 task_id
1591 }
1592 }
1593 };
1594
1595 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1596
1597 task_id
1598 }
1599
1600 fn debug_trace_transient_task(
1603 &self,
1604 task_type: &CachedTaskType,
1605 cell_id: Option<CellId>,
1606 ) -> DebugTraceTransientTask {
1607 fn inner_id(
1610 backend: &TurboTasksBackendInner<impl BackingStorage>,
1611 task_id: TaskId,
1612 cell_type_id: Option<ValueTypeId>,
1613 visited_set: &mut FxHashSet<TaskId>,
1614 ) -> DebugTraceTransientTask {
1615 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1616 if visited_set.contains(&task_id) {
1617 let task_name = task_type.get_name();
1618 DebugTraceTransientTask::Collapsed {
1619 task_name,
1620 cell_type_id,
1621 }
1622 } else {
1623 inner_cached(backend, &task_type, cell_type_id, visited_set)
1624 }
1625 } else {
1626 DebugTraceTransientTask::Uncached { cell_type_id }
1627 }
1628 }
1629 fn inner_cached(
1630 backend: &TurboTasksBackendInner<impl BackingStorage>,
1631 task_type: &CachedTaskType,
1632 cell_type_id: Option<ValueTypeId>,
1633 visited_set: &mut FxHashSet<TaskId>,
1634 ) -> DebugTraceTransientTask {
1635 let task_name = task_type.get_name();
1636
1637 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1638 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1639 return None;
1643 };
1644 if task_id.is_transient() {
1645 Some(Box::new(inner_id(
1646 backend,
1647 task_id,
1648 cause_self_raw_vc.try_get_type_id(),
1649 visited_set,
1650 )))
1651 } else {
1652 None
1653 }
1654 });
1655 let cause_args = task_type
1656 .arg
1657 .get_raw_vcs()
1658 .into_iter()
1659 .filter_map(|raw_vc| {
1660 let Some(task_id) = raw_vc.try_get_task_id() else {
1661 return None;
1663 };
1664 if !task_id.is_transient() {
1665 return None;
1666 }
1667 Some((task_id, raw_vc.try_get_type_id()))
1668 })
1669 .collect::<IndexSet<_>>() .into_iter()
1671 .map(|(task_id, cell_type_id)| {
1672 inner_id(backend, task_id, cell_type_id, visited_set)
1673 })
1674 .collect();
1675
1676 DebugTraceTransientTask::Cached {
1677 task_name,
1678 cell_type_id,
1679 cause_self,
1680 cause_args,
1681 }
1682 }
1683 inner_cached(
1684 self,
1685 task_type,
1686 cell_id.map(|c| c.type_id),
1687 &mut FxHashSet::default(),
1688 )
1689 }
1690
1691 fn invalidate_task(
1692 &self,
1693 task_id: TaskId,
1694 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1695 ) {
1696 if !self.should_track_dependencies() {
1697 panic!("Dependency tracking is disabled so invalidation is not allowed");
1698 }
1699 operation::InvalidateOperation::run(
1700 smallvec![task_id],
1701 #[cfg(feature = "trace_task_dirty")]
1702 TaskDirtyCause::Invalidator,
1703 self.execute_context(turbo_tasks),
1704 );
1705 }
1706
1707 fn invalidate_tasks(
1708 &self,
1709 tasks: &[TaskId],
1710 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1711 ) {
1712 if !self.should_track_dependencies() {
1713 panic!("Dependency tracking is disabled so invalidation is not allowed");
1714 }
1715 operation::InvalidateOperation::run(
1716 tasks.iter().copied().collect(),
1717 #[cfg(feature = "trace_task_dirty")]
1718 TaskDirtyCause::Unknown,
1719 self.execute_context(turbo_tasks),
1720 );
1721 }
1722
1723 fn invalidate_tasks_set(
1724 &self,
1725 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1726 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1727 ) {
1728 if !self.should_track_dependencies() {
1729 panic!("Dependency tracking is disabled so invalidation is not allowed");
1730 }
1731 operation::InvalidateOperation::run(
1732 tasks.iter().copied().collect(),
1733 #[cfg(feature = "trace_task_dirty")]
1734 TaskDirtyCause::Unknown,
1735 self.execute_context(turbo_tasks),
1736 );
1737 }
1738
1739 fn invalidate_serialization(
1740 &self,
1741 task_id: TaskId,
1742 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1743 ) {
1744 if task_id.is_transient() {
1745 return;
1746 }
1747 let mut ctx = self.execute_context(turbo_tasks);
1748 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1749 task.invalidate_serialization();
1750 }
1751
1752 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1753 let task = self.storage.access_mut(task_id);
1754 if let Some(value) = task.get_persistent_task_type() {
1755 format!("{task_id:?} {}", value)
1756 } else if let Some(value) = task.get_transient_task_type() {
1757 format!("{task_id:?} {}", value)
1758 } else {
1759 format!("{task_id:?} unknown")
1760 }
1761 }
1762
1763 fn get_task_name(
1764 &self,
1765 task_id: TaskId,
1766 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1767 ) -> String {
1768 let mut ctx = self.execute_context(turbo_tasks);
1769 let task = ctx.task(task_id, TaskDataCategory::Data);
1770 if let Some(value) = task.get_persistent_task_type() {
1771 value.to_string()
1772 } else if let Some(value) = task.get_transient_task_type() {
1773 value.to_string()
1774 } else {
1775 "unknown".to_string()
1776 }
1777 }
1778
1779 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1780 let task = self.storage.access_mut(task_id);
1781 task.get_persistent_task_type().cloned()
1782 }
1783
1784 fn task_execution_canceled(
1785 &self,
1786 task_id: TaskId,
1787 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1788 ) {
1789 let mut ctx = self.execute_context(turbo_tasks);
1790 let mut task = ctx.task(task_id, TaskDataCategory::All);
1791 if let Some(in_progress) = task.take_in_progress() {
1792 match in_progress {
1793 InProgressState::Scheduled {
1794 done_event,
1795 reason: _,
1796 } => done_event.notify(usize::MAX),
1797 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1798 done_event.notify(usize::MAX)
1799 }
1800 InProgressState::Canceled => {}
1801 }
1802 }
1803 let in_progress_cells = task.take_in_progress_cells();
1806 if let Some(ref cells) = in_progress_cells {
1807 for state in cells.values() {
1808 state.event.notify(usize::MAX);
1809 }
1810 }
1811
1812 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1818 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1819 } else {
1820 None
1821 };
1822
1823 let old = task.set_in_progress(InProgressState::Canceled);
1824 debug_assert!(old.is_none(), "InProgress already exists");
1825 drop(task);
1826
1827 if let Some(data_update) = data_update {
1828 AggregationUpdateQueue::run(data_update, &mut ctx);
1829 }
1830
1831 drop(in_progress_cells);
1832 }
1833
1834 fn try_start_task_execution(
1835 &self,
1836 task_id: TaskId,
1837 priority: TaskPriority,
1838 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1839 ) -> Option<TaskExecutionSpec<'_>> {
1840 let execution_reason;
1841 let task_type;
1842 {
1843 let mut ctx = self.execute_context(turbo_tasks);
1844 let mut task = ctx.task(task_id, TaskDataCategory::All);
1845 task_type = task.get_task_type().to_owned();
1846 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1847 if let Some(tasks) = task.prefetch() {
1848 drop(task);
1849 ctx.prepare_tasks(tasks, "prefetch");
1850 task = ctx.task(task_id, TaskDataCategory::All);
1851 }
1852 let in_progress = task.take_in_progress()?;
1853 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1854 let old = task.set_in_progress(in_progress);
1855 debug_assert!(old.is_none(), "InProgress already exists");
1856 return None;
1857 };
1858 execution_reason = reason;
1859 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1860 InProgressStateInner {
1861 stale: false,
1862 once_task,
1863 done_event,
1864 marked_as_completed: false,
1865 new_children: Default::default(),
1866 },
1867 )));
1868 debug_assert!(old.is_none(), "InProgress already exists");
1869
1870 enum Collectible {
1872 Current(CollectibleRef, i32),
1873 Outdated(CollectibleRef),
1874 }
1875 let collectibles = task
1876 .iter_collectibles()
1877 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1878 .chain(
1879 task.iter_outdated_collectibles()
1880 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1881 )
1882 .collect::<Vec<_>>();
1883 for collectible in collectibles {
1884 match collectible {
1885 Collectible::Current(collectible, value) => {
1886 let _ = task.insert_outdated_collectible(collectible, value);
1887 }
1888 Collectible::Outdated(collectible) => {
1889 if task
1890 .collectibles()
1891 .is_none_or(|m| m.get(&collectible).is_none())
1892 {
1893 task.remove_outdated_collectibles(&collectible);
1894 }
1895 }
1896 }
1897 }
1898
1899 if self.should_track_dependencies() {
1900 let cell_dependencies = task.iter_cell_dependencies().collect();
1902 task.set_outdated_cell_dependencies(cell_dependencies);
1903
1904 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1905 task.set_outdated_output_dependencies(outdated_output_dependencies);
1906 }
1907 }
1908
1909 let (span, future) = match task_type {
1910 TaskType::Cached(task_type) => {
1911 let CachedTaskType {
1912 native_fn,
1913 this,
1914 arg,
1915 } = &*task_type;
1916 (
1917 native_fn.span(task_id.persistence(), execution_reason, priority),
1918 native_fn.execute(*this, &**arg),
1919 )
1920 }
1921 TaskType::Transient(task_type) => {
1922 let span = tracing::trace_span!("turbo_tasks::root_task");
1923 let future = match &*task_type {
1924 TransientTask::Root(f) => f(),
1925 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1926 };
1927 (span, future)
1928 }
1929 };
1930 Some(TaskExecutionSpec { future, span })
1931 }
1932
1933 fn task_execution_completed(
1937 &self,
1938 task_id: TaskId,
1939 result: Result<RawVc, TurboTasksExecutionError>,
1940 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1941 #[cfg(feature = "verify_determinism")] stateful: bool,
1942 has_invalidator: bool,
1943 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1944 ) -> Option<TaskPriority> {
1945 #[cfg(not(feature = "trace_task_details"))]
1960 let span = tracing::trace_span!(
1961 "task execution completed",
1962 new_children = tracing::field::Empty
1963 )
1964 .entered();
1965 #[cfg(feature = "trace_task_details")]
1966 let span = tracing::trace_span!(
1967 "task execution completed",
1968 task_id = display(task_id),
1969 result = match result.as_ref() {
1970 Ok(value) => display(either::Either::Left(value)),
1971 Err(err) => display(either::Either::Right(err)),
1972 },
1973 new_children = tracing::field::Empty,
1974 immutable = tracing::field::Empty,
1975 new_output = tracing::field::Empty,
1976 output_dependents = tracing::field::Empty,
1977 aggregation_number = tracing::field::Empty,
1978 stale = tracing::field::Empty,
1979 )
1980 .entered();
1981
1982 let is_error = result.is_err();
1983
1984 let mut ctx = self.execute_context(turbo_tasks);
1985
1986 let TaskExecutionCompletePrepareResult {
1987 new_children,
1988 is_now_immutable,
1989 #[cfg(feature = "verify_determinism")]
1990 no_output_set,
1991 new_output,
1992 output_dependent_tasks,
1993 is_recomputation,
1994 is_session_dependent,
1995 } = match self.task_execution_completed_prepare(
1996 &mut ctx,
1997 #[cfg(feature = "trace_task_details")]
1998 &span,
1999 task_id,
2000 result,
2001 cell_counters,
2002 #[cfg(feature = "verify_determinism")]
2003 stateful,
2004 has_invalidator,
2005 ) {
2006 Ok(r) => r,
2007 Err(stale_priority) => {
2008 #[cfg(feature = "trace_task_details")]
2010 span.record("stale", "prepare");
2011 return Some(stale_priority);
2012 }
2013 };
2014
2015 #[cfg(feature = "trace_task_details")]
2016 span.record("new_output", new_output.is_some());
2017 #[cfg(feature = "trace_task_details")]
2018 span.record("output_dependents", output_dependent_tasks.len());
2019
2020 if !output_dependent_tasks.is_empty() {
2025 self.task_execution_completed_invalidate_output_dependent(
2026 &mut ctx,
2027 task_id,
2028 output_dependent_tasks,
2029 );
2030 }
2031
2032 let has_new_children = !new_children.is_empty();
2033 span.record("new_children", new_children.len());
2034
2035 if has_new_children {
2036 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2037 }
2038
2039 if has_new_children
2040 && let Some(stale_priority) =
2041 self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2042 {
2043 #[cfg(feature = "trace_task_details")]
2045 span.record("stale", "connect");
2046 return Some(stale_priority);
2047 }
2048
2049 let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2050 &mut ctx,
2051 task_id,
2052 #[cfg(feature = "verify_determinism")]
2053 no_output_set,
2054 new_output,
2055 is_now_immutable,
2056 is_session_dependent,
2057 );
2058 if let Some(stale_priority) = stale_priority {
2059 #[cfg(feature = "trace_task_details")]
2061 span.record("stale", "finish");
2062 return Some(stale_priority);
2063 }
2064
2065 let removed_data = self.task_execution_completed_cleanup(
2066 &mut ctx,
2067 task_id,
2068 cell_counters,
2069 is_error,
2070 is_recomputation,
2071 );
2072
2073 drop(removed_data);
2075 drop(in_progress_cells);
2076
2077 None
2078 }
2079
2080 fn task_execution_completed_prepare(
2081 &self,
2082 ctx: &mut impl ExecuteContext<'_>,
2083 #[cfg(feature = "trace_task_details")] span: &Span,
2084 task_id: TaskId,
2085 result: Result<RawVc, TurboTasksExecutionError>,
2086 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2087 #[cfg(feature = "verify_determinism")] stateful: bool,
2088 has_invalidator: bool,
2089 ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2090 let mut task = ctx.task(task_id, TaskDataCategory::All);
2091 let is_recomputation = task.is_dirty().is_none();
2092 let is_session_dependent = self.should_track_dependencies()
2095 && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2096 let Some(in_progress) = task.get_in_progress_mut() else {
2097 panic!("Task execution completed, but task is not in progress: {task:#?}");
2098 };
2099 if matches!(in_progress, InProgressState::Canceled) {
2100 return Ok(TaskExecutionCompletePrepareResult {
2101 new_children: Default::default(),
2102 is_now_immutable: false,
2103 #[cfg(feature = "verify_determinism")]
2104 no_output_set: false,
2105 new_output: None,
2106 output_dependent_tasks: Default::default(),
2107 is_recomputation,
2108 is_session_dependent,
2109 });
2110 }
2111 let &mut InProgressState::InProgress(box InProgressStateInner {
2112 stale,
2113 ref mut new_children,
2114 once_task: is_once_task,
2115 ..
2116 }) = in_progress
2117 else {
2118 panic!("Task execution completed, but task is not in progress: {task:#?}");
2119 };
2120
2121 #[cfg(not(feature = "no_fast_stale"))]
2123 if stale && !is_once_task {
2124 let stale_priority = compute_stale_priority(&task);
2125 let Some(InProgressState::InProgress(box InProgressStateInner {
2126 done_event,
2127 mut new_children,
2128 ..
2129 })) = task.take_in_progress()
2130 else {
2131 unreachable!();
2132 };
2133 let old = task.set_in_progress(InProgressState::Scheduled {
2134 done_event,
2135 reason: TaskExecutionReason::Stale,
2136 });
2137 debug_assert!(old.is_none(), "InProgress already exists");
2138 for task in task.iter_children() {
2141 new_children.remove(&task);
2142 }
2143 drop(task);
2144
2145 AggregationUpdateQueue::run(
2148 AggregationUpdateJob::DecreaseActiveCounts {
2149 task_ids: new_children.into_iter().collect(),
2150 },
2151 ctx,
2152 );
2153 return Err(stale_priority);
2154 }
2155
2156 let mut new_children = take(new_children);
2158
2159 #[cfg(feature = "verify_determinism")]
2161 if stateful {
2162 task.set_stateful(true);
2163 }
2164
2165 if has_invalidator {
2167 task.set_invalidator(true);
2168 }
2169
2170 if result.is_ok() || is_recomputation {
2180 let old_counters: FxHashMap<_, _> = task
2181 .iter_cell_type_max_index()
2182 .map(|(&k, &v)| (k, v))
2183 .collect();
2184 let mut counters_to_remove = old_counters.clone();
2185
2186 for (&cell_type, &max_index) in cell_counters.iter() {
2187 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2188 if old_max_index != max_index {
2189 task.insert_cell_type_max_index(cell_type, max_index);
2190 }
2191 } else {
2192 task.insert_cell_type_max_index(cell_type, max_index);
2193 }
2194 }
2195 for (cell_type, _) in counters_to_remove {
2196 task.remove_cell_type_max_index(&cell_type);
2197 }
2198 }
2199
2200 let mut queue = AggregationUpdateQueue::new();
2201
2202 let mut old_edges = Vec::new();
2203
2204 let has_children = !new_children.is_empty();
2205 let is_immutable = task.immutable();
2206 let task_dependencies_for_immutable =
2207 if !is_immutable
2209 && !is_session_dependent
2211 && !task.invalidator()
2213 && task.is_collectibles_dependencies_empty()
2215 {
2216 Some(
2217 task.iter_output_dependencies()
2219 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2220 .collect::<FxHashSet<_>>(),
2221 )
2222 } else {
2223 None
2224 };
2225
2226 if has_children {
2227 let _aggregation_number =
2229 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2230
2231 #[cfg(feature = "trace_task_details")]
2232 span.record("aggregation_number", _aggregation_number);
2233
2234 old_edges.extend(
2236 task.iter_children()
2237 .filter(|task| !new_children.remove(task))
2238 .map(OutdatedEdge::Child),
2239 );
2240 } else {
2241 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2242 }
2243
2244 old_edges.extend(
2245 task.iter_outdated_collectibles()
2246 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2247 );
2248
2249 if self.should_track_dependencies() {
2250 old_edges.extend(
2257 task.iter_outdated_cell_dependencies()
2258 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2259 );
2260 old_edges.extend(
2261 task.iter_outdated_output_dependencies()
2262 .map(OutdatedEdge::OutputDependency),
2263 );
2264 }
2265
2266 let current_output = task.get_output();
2268 #[cfg(feature = "verify_determinism")]
2269 let no_output_set = current_output.is_none();
2270 let new_output = match result {
2271 Ok(RawVc::TaskOutput(output_task_id)) => {
2272 if let Some(OutputValue::Output(current_task_id)) = current_output
2273 && *current_task_id == output_task_id
2274 {
2275 None
2276 } else {
2277 Some(OutputValue::Output(output_task_id))
2278 }
2279 }
2280 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2281 if let Some(OutputValue::Cell(CellRef {
2282 task: current_task_id,
2283 cell: current_cell,
2284 })) = current_output
2285 && *current_task_id == output_task_id
2286 && *current_cell == cell
2287 {
2288 None
2289 } else {
2290 Some(OutputValue::Cell(CellRef {
2291 task: output_task_id,
2292 cell,
2293 }))
2294 }
2295 }
2296 Ok(RawVc::LocalOutput(..)) => {
2297 panic!("Non-local tasks must not return a local Vc");
2298 }
2299 Err(err) => {
2300 if let Some(OutputValue::Error(old_error)) = current_output
2301 && **old_error == err
2302 {
2303 None
2304 } else {
2305 Some(OutputValue::Error(Arc::new((&err).into())))
2306 }
2307 }
2308 };
2309 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2310 if new_output.is_some() && ctx.should_track_dependencies() {
2312 output_dependent_tasks = task.iter_output_dependent().collect();
2313 }
2314
2315 drop(task);
2316
2317 let mut is_now_immutable = false;
2319 if let Some(dependencies) = task_dependencies_for_immutable
2320 && dependencies
2321 .iter()
2322 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2323 {
2324 is_now_immutable = true;
2325 }
2326 #[cfg(feature = "trace_task_details")]
2327 span.record("immutable", is_immutable || is_now_immutable);
2328
2329 if !queue.is_empty() || !old_edges.is_empty() {
2330 #[cfg(any(
2331 feature = "trace_task_completion",
2332 feature = "trace_aggregation_update_stats"
2333 ))]
2334 let _span =
2335 tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2336 .entered();
2337 #[cfg(feature = "trace_aggregation_update_stats")]
2341 {
2342 let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2343 _span.record("stats", tracing::field::debug(stats));
2344 }
2345 #[cfg(not(feature = "trace_aggregation_update_stats"))]
2346 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2347 }
2348
2349 Ok(TaskExecutionCompletePrepareResult {
2350 new_children,
2351 is_now_immutable,
2352 #[cfg(feature = "verify_determinism")]
2353 no_output_set,
2354 new_output,
2355 output_dependent_tasks,
2356 is_recomputation,
2357 is_session_dependent,
2358 })
2359 }
2360
2361 fn task_execution_completed_invalidate_output_dependent(
2362 &self,
2363 ctx: &mut impl ExecuteContext<'_>,
2364 task_id: TaskId,
2365 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2366 ) {
2367 debug_assert!(!output_dependent_tasks.is_empty());
2368
2369 #[cfg(feature = "trace_task_dirty")]
2370 let task_description = self.debug_get_task_description(task_id);
2371
2372 if output_dependent_tasks.len() > 1 {
2373 ctx.prepare_tasks(
2374 output_dependent_tasks
2375 .iter()
2376 .map(|&id| (id, TaskDataCategory::All)),
2377 "invalidate output dependents",
2378 );
2379 }
2380
2381 fn process_output_dependents(
2382 ctx: &mut impl ExecuteContext<'_>,
2383 task_id: TaskId,
2384 #[cfg(feature = "trace_task_dirty")] task_description: &str,
2385 dependent_task_id: TaskId,
2386 queue: &mut AggregationUpdateQueue,
2387 ) {
2388 #[cfg(feature = "trace_task_output_dependencies")]
2389 let span = tracing::trace_span!(
2390 "invalidate output dependency",
2391 task = %task_id,
2392 dependent_task = %dependent_task_id,
2393 result = tracing::field::Empty,
2394 )
2395 .entered();
2396 let mut make_stale = true;
2397 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2398 let transient_task_type = dependent.get_transient_task_type();
2399 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2400 #[cfg(feature = "trace_task_output_dependencies")]
2402 span.record("result", "once task");
2403 return;
2404 }
2405 if dependent.outdated_output_dependencies_contains(&task_id) {
2406 #[cfg(feature = "trace_task_output_dependencies")]
2407 span.record("result", "outdated dependency");
2408 make_stale = false;
2413 } else if !dependent.output_dependencies_contains(&task_id) {
2414 #[cfg(feature = "trace_task_output_dependencies")]
2417 span.record("result", "no backward dependency");
2418 return;
2419 }
2420 make_task_dirty_internal(
2421 dependent,
2422 dependent_task_id,
2423 make_stale,
2424 #[cfg(feature = "trace_task_dirty")]
2425 TaskDirtyCause::OutputChange {
2426 task_description: task_description.to_string(),
2427 },
2428 queue,
2429 ctx,
2430 );
2431 #[cfg(feature = "trace_task_output_dependencies")]
2432 span.record("result", "marked dirty");
2433 }
2434
2435 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2436 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2437 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2438 let _ = scope_and_block(chunks.len(), |scope| {
2439 for chunk in chunks {
2440 let child_ctx = ctx.child_context();
2441 #[cfg(feature = "trace_task_dirty")]
2442 let task_description = &task_description;
2443 scope.spawn(move || {
2444 let mut ctx = child_ctx.create();
2445 let mut queue = AggregationUpdateQueue::new();
2446 for dependent_task_id in chunk {
2447 process_output_dependents(
2448 &mut ctx,
2449 task_id,
2450 #[cfg(feature = "trace_task_dirty")]
2451 task_description,
2452 dependent_task_id,
2453 &mut queue,
2454 )
2455 }
2456 queue.execute(&mut ctx);
2457 });
2458 }
2459 });
2460 } else {
2461 let mut queue = AggregationUpdateQueue::new();
2462 for dependent_task_id in output_dependent_tasks {
2463 process_output_dependents(
2464 ctx,
2465 task_id,
2466 #[cfg(feature = "trace_task_dirty")]
2467 &task_description,
2468 dependent_task_id,
2469 &mut queue,
2470 );
2471 }
2472 queue.execute(ctx);
2473 }
2474 }
2475
2476 fn task_execution_completed_unfinished_children_dirty(
2477 &self,
2478 ctx: &mut impl ExecuteContext<'_>,
2479 new_children: &FxHashSet<TaskId>,
2480 ) {
2481 debug_assert!(!new_children.is_empty());
2482
2483 let mut queue = AggregationUpdateQueue::new();
2484 ctx.for_each_task_all(
2485 new_children.iter().copied(),
2486 "unfinished children dirty",
2487 |child_task, ctx| {
2488 if !child_task.has_output() {
2489 let child_id = child_task.id();
2490 make_task_dirty_internal(
2491 child_task,
2492 child_id,
2493 false,
2494 #[cfg(feature = "trace_task_dirty")]
2495 TaskDirtyCause::InitialDirty,
2496 &mut queue,
2497 ctx,
2498 );
2499 }
2500 },
2501 );
2502
2503 queue.execute(ctx);
2504 }
2505
2506 fn task_execution_completed_connect(
2507 &self,
2508 ctx: &mut impl ExecuteContext<'_>,
2509 task_id: TaskId,
2510 new_children: FxHashSet<TaskId>,
2511 ) -> Option<TaskPriority> {
2512 debug_assert!(!new_children.is_empty());
2513
2514 let mut task = ctx.task(task_id, TaskDataCategory::All);
2515 let Some(in_progress) = task.get_in_progress() else {
2516 panic!("Task execution completed, but task is not in progress: {task:#?}");
2517 };
2518 if matches!(in_progress, InProgressState::Canceled) {
2519 return None;
2521 }
2522 let InProgressState::InProgress(box InProgressStateInner {
2523 #[cfg(not(feature = "no_fast_stale"))]
2524 stale,
2525 once_task: is_once_task,
2526 ..
2527 }) = in_progress
2528 else {
2529 panic!("Task execution completed, but task is not in progress: {task:#?}");
2530 };
2531
2532 #[cfg(not(feature = "no_fast_stale"))]
2534 if *stale && !is_once_task {
2535 let stale_priority = compute_stale_priority(&task);
2536 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2537 task.take_in_progress()
2538 else {
2539 unreachable!();
2540 };
2541 let old = task.set_in_progress(InProgressState::Scheduled {
2542 done_event,
2543 reason: TaskExecutionReason::Stale,
2544 });
2545 debug_assert!(old.is_none(), "InProgress already exists");
2546 drop(task);
2547
2548 AggregationUpdateQueue::run(
2551 AggregationUpdateJob::DecreaseActiveCounts {
2552 task_ids: new_children.into_iter().collect(),
2553 },
2554 ctx,
2555 );
2556 return Some(stale_priority);
2557 }
2558
2559 let has_active_count = ctx.should_track_activeness()
2560 && task
2561 .get_activeness()
2562 .is_some_and(|activeness| activeness.active_counter > 0);
2563 connect_children(
2564 ctx,
2565 task_id,
2566 task,
2567 new_children,
2568 has_active_count,
2569 ctx.should_track_activeness(),
2570 );
2571
2572 None
2573 }
2574
2575 #[allow(clippy::type_complexity)]
2576 fn task_execution_completed_finish(
2577 &self,
2578 ctx: &mut impl ExecuteContext<'_>,
2579 task_id: TaskId,
2580 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2581 new_output: Option<OutputValue>,
2582 is_now_immutable: bool,
2583 is_session_dependent: bool,
2584 ) -> (
2585 Option<TaskPriority>,
2586 Option<
2587 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2588 >,
2589 ) {
2590 let mut task = ctx.task(task_id, TaskDataCategory::All);
2591 let Some(in_progress) = task.take_in_progress() else {
2592 panic!("Task execution completed, but task is not in progress: {task:#?}");
2593 };
2594 if matches!(in_progress, InProgressState::Canceled) {
2595 return (None, None);
2597 }
2598 let InProgressState::InProgress(box InProgressStateInner {
2599 done_event,
2600 once_task: is_once_task,
2601 stale,
2602 marked_as_completed: _,
2603 new_children,
2604 }) = in_progress
2605 else {
2606 panic!("Task execution completed, but task is not in progress: {task:#?}");
2607 };
2608 debug_assert!(new_children.is_empty());
2609
2610 if stale && !is_once_task {
2612 let stale_priority = compute_stale_priority(&task);
2613 let old = task.set_in_progress(InProgressState::Scheduled {
2614 done_event,
2615 reason: TaskExecutionReason::Stale,
2616 });
2617 debug_assert!(old.is_none(), "InProgress already exists");
2618 return (Some(stale_priority), None);
2619 }
2620
2621 let mut old_content = None;
2623 if let Some(value) = new_output {
2624 old_content = task.set_output(value);
2625 }
2626
2627 if is_now_immutable {
2630 task.set_immutable(true);
2631 }
2632
2633 let in_progress_cells = task.take_in_progress_cells();
2635 if let Some(ref cells) = in_progress_cells {
2636 for state in cells.values() {
2637 state.event.notify(usize::MAX);
2638 }
2639 }
2640
2641 let new_dirtyness = if is_session_dependent {
2643 Some(Dirtyness::SessionDependent)
2644 } else {
2645 None
2646 };
2647 #[cfg(feature = "verify_determinism")]
2648 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2649 let data_update = task.update_dirty_state(new_dirtyness);
2650
2651 #[cfg(feature = "verify_determinism")]
2655 let stale_priority: Option<TaskPriority> =
2656 ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2657 .then(TaskPriority::leaf);
2658 #[cfg(not(feature = "verify_determinism"))]
2659 let stale_priority: Option<TaskPriority> = None;
2660 if stale_priority.is_some() {
2661 let old = task.set_in_progress(InProgressState::Scheduled {
2662 done_event,
2663 reason: TaskExecutionReason::Stale,
2664 });
2665 debug_assert!(old.is_none(), "InProgress already exists");
2666 drop(task);
2667 } else {
2668 drop(task);
2669
2670 done_event.notify(usize::MAX);
2672 }
2673
2674 drop(old_content);
2675
2676 if let Some(data_update) = data_update {
2677 AggregationUpdateQueue::run(data_update, ctx);
2678 }
2679
2680 (stale_priority, in_progress_cells)
2682 }
2683
2684 fn task_execution_completed_cleanup(
2685 &self,
2686 ctx: &mut impl ExecuteContext<'_>,
2687 task_id: TaskId,
2688 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2689 is_error: bool,
2690 is_recomputation: bool,
2691 ) -> Vec<SharedReference> {
2692 let mut task = ctx.task(task_id, TaskDataCategory::All);
2693 let mut removed_cell_data = Vec::new();
2694 if !is_error || is_recomputation {
2700 let to_remove: Vec<_> = task
2706 .iter_cell_data()
2707 .filter_map(|(cell, _)| {
2708 cell_counters
2709 .get(&cell.type_id)
2710 .is_none_or(|start_index| cell.index >= *start_index)
2711 .then_some(*cell)
2712 })
2713 .collect();
2714 removed_cell_data.reserve_exact(to_remove.len());
2715 for cell in to_remove {
2716 if let Some(data) = task.remove_cell_data(&cell) {
2717 removed_cell_data.push(data);
2718 }
2719 }
2720 let to_remove_hash: Vec<_> = task
2722 .iter_cell_data_hash()
2723 .filter_map(|(cell, _)| {
2724 cell_counters
2725 .get(&cell.type_id)
2726 .is_none_or(|start_index| cell.index >= *start_index)
2727 .then_some(*cell)
2728 })
2729 .collect();
2730 for cell in to_remove_hash {
2731 task.remove_cell_data_hash(&cell);
2732 }
2733 }
2734
2735 task.cleanup_after_execution();
2739
2740 drop(task);
2741
2742 removed_cell_data
2744 }
2745
2746 fn log_unrecoverable_persist_error() {
2749 eprintln!(
2750 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2751 background persisting process."
2752 );
2753 }
2754
2755 fn run_backend_job<'a>(
2756 self: &'a Arc<Self>,
2757 job: TurboTasksBackendJob,
2758 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2759 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2760 Box::pin(async move {
2761 match job {
2762 TurboTasksBackendJob::Snapshot => {
2763 debug_assert!(self.should_persist());
2764
2765 let mut last_snapshot = self.start_time;
2766 let mut idle_start_listener = self.idle_start_event.listen();
2767 let mut idle_end_listener = self.idle_end_event.listen();
2768 let mut fresh_idle = true;
2771 let mut evicted = false;
2772 let mut is_first = true;
2773 'outer: loop {
2774 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2775 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2776 let idle_timeout = *IDLE_TIMEOUT;
2777 let (time, mut reason) = if is_first {
2778 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2779 } else {
2780 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2781 };
2782
2783 let until = last_snapshot + time;
2784 if until > Instant::now() {
2785 let mut stop_listener = self.stopping_event.listen();
2786 if self.stopping.load(Ordering::Acquire) {
2787 return;
2788 }
2789 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2790 Instant::now() + idle_timeout
2791 } else {
2792 far_future()
2793 };
2794 loop {
2795 tokio::select! {
2796 _ = &mut stop_listener => {
2797 return;
2798 },
2799 _ = &mut idle_start_listener => {
2800 idle_time = Instant::now() + idle_timeout;
2801 idle_start_listener = self.idle_start_event.listen()
2802 },
2803 _ = &mut idle_end_listener => {
2804 idle_time = far_future();
2805 idle_end_listener = self.idle_end_event.listen()
2806 },
2807 _ = tokio::time::sleep_until(until) => {
2808 break;
2809 },
2810 _ = tokio::time::sleep_until(idle_time) => {
2811 if turbo_tasks.is_idle() {
2812 reason = "idle timeout";
2813 break;
2814 }
2815 },
2816 }
2817 }
2818 }
2819
2820 let this = self.clone();
2821 let background_span =
2825 tracing::info_span!(parent: None, "background snapshot");
2826 match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2827 Err(err) => {
2828 eprintln!("Persisting failed: {err:?}");
2831 Self::log_unrecoverable_persist_error();
2832 return;
2833 }
2834 Ok((snapshot_start, new_data)) => {
2835 fresh_idle = new_data;
2837 is_first = false;
2838 last_snapshot = snapshot_start;
2839
2840 macro_rules! check_idle_ended {
2844 () => {{
2845 tokio::select! {
2846 biased;
2847 _ = &mut idle_end_listener => {
2848 idle_end_listener = self.idle_end_event.listen();
2849 true
2850 },
2851 _ = std::future::ready(()) => false,
2852 }
2853 }};
2854 }
2855 let mut ran_eviction = false;
2873 if this.should_evict() && (new_data || !evicted) {
2874 if check_idle_ended!() {
2875 continue 'outer;
2878 }
2879 evicted = true;
2880 ran_eviction = true;
2881 this.storage.evict_after_snapshot(background_span.id());
2882 }
2883
2884 let mut ran_compaction = false;
2891 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2892 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2893 if check_idle_ended!() {
2894 continue 'outer;
2895 }
2896 let _compact_span = tracing::info_span!(
2900 parent: background_span.id(),
2901 "compact database"
2902 )
2903 .entered();
2904 match self.backing_storage.compact() {
2905 Ok(true) => {
2906 ran_compaction = true;
2907 }
2908 Ok(false) => break,
2909 Err(err) => {
2910 eprintln!("Compaction failed: {err:?}");
2911 if self.backing_storage.has_unrecoverable_write_error()
2912 {
2913 Self::log_unrecoverable_persist_error();
2914 return;
2915 }
2916 break;
2917 }
2918 }
2919 }
2920 if check_idle_ended!() {
2921 continue 'outer;
2922 }
2923 if new_data || ran_compaction || ran_eviction {
2928 turbo_tasks_malloc::TurboMalloc::collect(true);
2929 }
2930 }
2931 }
2932 }
2933 }
2934 }
2935 })
2936 }
2937
2938 fn try_read_own_task_cell(
2939 &self,
2940 task_id: TaskId,
2941 cell: CellId,
2942 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2943 ) -> Result<TypedCellContent> {
2944 let mut ctx = self.execute_context(turbo_tasks);
2945 let task = ctx.task(task_id, TaskDataCategory::Data);
2946 if let Some(content) = task.get_cell_data(&cell).cloned() {
2947 Ok(CellContent(Some(content)).into_typed(cell.type_id))
2948 } else {
2949 Ok(CellContent(None).into_typed(cell.type_id))
2950 }
2951 }
2952
2953 fn read_task_collectibles(
2954 &self,
2955 task_id: TaskId,
2956 collectible_type: TraitTypeId,
2957 reader_id: Option<TaskId>,
2958 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2959 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2960 let mut ctx = self.execute_context(turbo_tasks);
2961 let mut collectibles = AutoMap::default();
2962 {
2963 let mut task = ctx.task(task_id, TaskDataCategory::All);
2964 if task
2965 .get_persistent_task_type()
2966 .is_some_and(|t| !t.native_fn.is_root)
2967 {
2968 drop(task);
2969 panic!(
2970 "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
2971 is missing on the task.",
2972 self.debug_get_task_description(task_id),
2973 reader_id.map_or_else(
2974 || "unknown".to_string(),
2975 |r| self.debug_get_task_description(r)
2976 )
2977 );
2978 }
2979 for (collectible, count) in task.iter_aggregated_collectibles() {
2980 if *count > 0 && collectible.collectible_type == collectible_type {
2981 *collectibles
2982 .entry(RawVc::TaskCell(
2983 collectible.cell.task,
2984 collectible.cell.cell,
2985 ))
2986 .or_insert(0) += 1;
2987 }
2988 }
2989 for (&collectible, &count) in task.iter_collectibles() {
2990 if collectible.collectible_type == collectible_type {
2991 *collectibles
2992 .entry(RawVc::TaskCell(
2993 collectible.cell.task,
2994 collectible.cell.cell,
2995 ))
2996 .or_insert(0) += count;
2997 }
2998 }
2999 if let Some(reader_id) = reader_id {
3000 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3001 }
3002 }
3003 if let Some(reader_id) = reader_id {
3004 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3005 let target = CollectiblesRef {
3006 task: task_id,
3007 collectible_type,
3008 };
3009 if !reader.remove_outdated_collectibles_dependencies(&target) {
3010 let _ = reader.add_collectibles_dependencies(target);
3011 }
3012 }
3013 collectibles
3014 }
3015
3016 fn emit_collectible(
3017 &self,
3018 collectible_type: TraitTypeId,
3019 collectible: RawVc,
3020 task_id: TaskId,
3021 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3022 ) {
3023 self.assert_valid_collectible(task_id, collectible);
3024
3025 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3026 panic!("Collectibles need to be resolved");
3027 };
3028 let cell = CellRef {
3029 task: collectible_task,
3030 cell,
3031 };
3032 operation::UpdateCollectibleOperation::run(
3033 task_id,
3034 CollectibleRef {
3035 collectible_type,
3036 cell,
3037 },
3038 1,
3039 self.execute_context(turbo_tasks),
3040 );
3041 }
3042
3043 fn unemit_collectible(
3044 &self,
3045 collectible_type: TraitTypeId,
3046 collectible: RawVc,
3047 count: u32,
3048 task_id: TaskId,
3049 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3050 ) {
3051 self.assert_valid_collectible(task_id, collectible);
3052
3053 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3054 panic!("Collectibles need to be resolved");
3055 };
3056 let cell = CellRef {
3057 task: collectible_task,
3058 cell,
3059 };
3060 operation::UpdateCollectibleOperation::run(
3061 task_id,
3062 CollectibleRef {
3063 collectible_type,
3064 cell,
3065 },
3066 -(i32::try_from(count).unwrap()),
3067 self.execute_context(turbo_tasks),
3068 );
3069 }
3070
3071 fn update_task_cell(
3072 &self,
3073 task_id: TaskId,
3074 cell: CellId,
3075 content: CellContent,
3076 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3077 content_hash: Option<CellHash>,
3078 verification_mode: VerificationMode,
3079 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3080 ) {
3081 operation::UpdateCellOperation::run(
3082 task_id,
3083 cell,
3084 content,
3085 updated_key_hashes,
3086 content_hash,
3087 verification_mode,
3088 self.execute_context(turbo_tasks),
3089 );
3090 }
3091
3092 fn mark_own_task_as_finished(
3093 &self,
3094 task: TaskId,
3095 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3096 ) {
3097 let mut ctx = self.execute_context(turbo_tasks);
3098 let mut task = ctx.task(task, TaskDataCategory::Data);
3099 if let Some(InProgressState::InProgress(box InProgressStateInner {
3100 marked_as_completed,
3101 ..
3102 })) = task.get_in_progress_mut()
3103 {
3104 *marked_as_completed = true;
3105 }
3110 }
3111
3112 fn connect_task(
3113 &self,
3114 task: TaskId,
3115 parent_task: Option<TaskId>,
3116 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3117 ) {
3118 self.assert_not_persistent_calling_transient(parent_task, task, None);
3119 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3120 }
3121
3122 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3123 let task_id = self.transient_task_id_factory.get();
3124 {
3125 let mut task = self.storage.access_mut(task_id);
3126 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3127 }
3128 #[cfg(feature = "verify_aggregation_graph")]
3129 self.root_tasks.lock().insert(task_id);
3130 task_id
3131 }
3132
3133 fn dispose_root_task(
3134 &self,
3135 task_id: TaskId,
3136 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3137 ) {
3138 #[cfg(feature = "verify_aggregation_graph")]
3139 self.root_tasks.lock().remove(&task_id);
3140
3141 let mut ctx = self.execute_context(turbo_tasks);
3142 let mut task = ctx.task(task_id, TaskDataCategory::All);
3143 let is_dirty = task.is_dirty();
3144 let has_dirty_containers = task.has_dirty_containers();
3145 if is_dirty.is_some() || has_dirty_containers {
3146 if let Some(activeness_state) = task.get_activeness_mut() {
3147 activeness_state.unset_root_type();
3149 activeness_state.set_active_until_clean();
3150 };
3151 } else if let Some(activeness_state) = task.take_activeness() {
3152 activeness_state.all_clean_event.notify(usize::MAX);
3155 }
3156 }
3157
3158 #[cfg(feature = "verify_aggregation_graph")]
3159 fn verify_aggregation_graph(
3160 &self,
3161 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3162 idle: bool,
3163 ) {
3164 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3165 return;
3166 }
3167 use std::{collections::VecDeque, env, io::stdout};
3168
3169 use crate::backend::operation::{get_uppers, is_aggregating_node};
3170
3171 let mut ctx = self.execute_context(turbo_tasks);
3172 let root_tasks = self.root_tasks.lock().clone();
3173
3174 for task_id in root_tasks.into_iter() {
3175 let mut queue = VecDeque::new();
3176 let mut visited = FxHashSet::default();
3177 let mut aggregated_nodes = FxHashSet::default();
3178 let mut collectibles = FxHashMap::default();
3179 let root_task_id = task_id;
3180 visited.insert(task_id);
3181 aggregated_nodes.insert(task_id);
3182 queue.push_back(task_id);
3183 let mut counter = 0;
3184 while let Some(task_id) = queue.pop_front() {
3185 counter += 1;
3186 if counter % 100000 == 0 {
3187 println!(
3188 "queue={}, visited={}, aggregated_nodes={}",
3189 queue.len(),
3190 visited.len(),
3191 aggregated_nodes.len()
3192 );
3193 }
3194 let task = ctx.task(task_id, TaskDataCategory::All);
3195 if idle && !self.is_idle.load(Ordering::Relaxed) {
3196 return;
3197 }
3198
3199 let uppers = get_uppers(&task);
3200 if task_id != root_task_id
3201 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3202 {
3203 panic!(
3204 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3205 {:?})",
3206 task_id,
3207 task.get_task_description(),
3208 uppers
3209 );
3210 }
3211
3212 for (collectible, _) in task.iter_aggregated_collectibles() {
3213 collectibles
3214 .entry(*collectible)
3215 .or_insert_with(|| (false, Vec::new()))
3216 .1
3217 .push(task_id);
3218 }
3219
3220 for (&collectible, &value) in task.iter_collectibles() {
3221 if value > 0 {
3222 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3223 *flag = true
3224 } else {
3225 panic!(
3226 "Task {} has a collectible {:?} that is not in any upper task",
3227 task_id, collectible
3228 );
3229 }
3230 }
3231 }
3232
3233 let is_dirty = task.has_dirty();
3234 let has_dirty_container = task.has_dirty_containers();
3235 let should_be_in_upper = is_dirty || has_dirty_container;
3236
3237 let aggregation_number = get_aggregation_number(&task);
3238 if is_aggregating_node(aggregation_number) {
3239 aggregated_nodes.insert(task_id);
3240 }
3241 for child_id in task.iter_children() {
3248 if visited.insert(child_id) {
3250 queue.push_back(child_id);
3251 }
3252 }
3253 drop(task);
3254
3255 if should_be_in_upper {
3256 for upper_id in uppers {
3257 let upper = ctx.task(upper_id, TaskDataCategory::All);
3258 let in_upper = upper
3259 .get_aggregated_dirty_containers(&task_id)
3260 .is_some_and(|&dirty| dirty > 0);
3261 if !in_upper {
3262 let containers: Vec<_> = upper
3263 .iter_aggregated_dirty_containers()
3264 .map(|(&k, &v)| (k, v))
3265 .collect();
3266 let upper_task_desc = upper.get_task_description();
3267 drop(upper);
3268 panic!(
3269 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3270 ({})\nThese dirty containers are present:\n{:#?}",
3271 task_id,
3272 ctx.task(task_id, TaskDataCategory::Data)
3273 .get_task_description(),
3274 upper_id,
3275 upper_task_desc,
3276 containers,
3277 );
3278 }
3279 }
3280 }
3281 }
3282
3283 for (collectible, (flag, task_ids)) in collectibles {
3284 if !flag {
3285 use std::io::Write;
3286 let mut stdout = stdout().lock();
3287 writeln!(
3288 stdout,
3289 "{:?} that is not emitted in any child task but in these aggregated \
3290 tasks: {:#?}",
3291 collectible,
3292 task_ids
3293 .iter()
3294 .map(|t| format!(
3295 "{t} {}",
3296 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3297 ))
3298 .collect::<Vec<_>>()
3299 )
3300 .unwrap();
3301
3302 let task_id = collectible.cell.task;
3303 let mut queue = {
3304 let task = ctx.task(task_id, TaskDataCategory::All);
3305 get_uppers(&task)
3306 };
3307 let mut visited = FxHashSet::default();
3308 for &upper_id in queue.iter() {
3309 visited.insert(upper_id);
3310 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3311 }
3312 while let Some(task_id) = queue.pop() {
3313 let task = ctx.task(task_id, TaskDataCategory::All);
3314 let desc = task.get_task_description();
3315 let aggregated_collectible = task
3316 .get_aggregated_collectibles(&collectible)
3317 .copied()
3318 .unwrap_or_default();
3319 let uppers = get_uppers(&task);
3320 drop(task);
3321 writeln!(
3322 stdout,
3323 "upper {task_id} {desc} collectible={aggregated_collectible}"
3324 )
3325 .unwrap();
3326 if task_ids.contains(&task_id) {
3327 writeln!(
3328 stdout,
3329 "Task has an upper connection to an aggregated task that doesn't \
3330 reference it. Upper connection is invalid!"
3331 )
3332 .unwrap();
3333 }
3334 for upper_id in uppers {
3335 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3336 if !visited.contains(&upper_id) {
3337 queue.push(upper_id);
3338 }
3339 }
3340 }
3341 panic!("See stdout for more details");
3342 }
3343 }
3344 }
3345 }
3346
3347 fn assert_not_persistent_calling_transient(
3348 &self,
3349 parent_id: Option<TaskId>,
3350 child_id: TaskId,
3351 cell_id: Option<CellId>,
3352 ) {
3353 if let Some(parent_id) = parent_id
3354 && !parent_id.is_transient()
3355 && child_id.is_transient()
3356 {
3357 self.panic_persistent_calling_transient(
3358 self.debug_get_task_description(parent_id),
3359 self.debug_get_cached_task_type(child_id).as_deref(),
3360 cell_id,
3361 );
3362 }
3363 }
3364
3365 fn panic_persistent_calling_transient(
3366 &self,
3367 parent: String,
3368 child: Option<&CachedTaskType>,
3369 cell_id: Option<CellId>,
3370 ) -> ! {
3371 let transient_reason = if let Some(child) = child {
3372 Cow::Owned(format!(
3373 " The callee is transient because it depends on:\n{}",
3374 self.debug_trace_transient_task(child, cell_id),
3375 ))
3376 } else {
3377 Cow::Borrowed("")
3378 };
3379 panic!(
3380 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3381 parent,
3382 child.map_or("unknown", |t| t.get_name()),
3383 transient_reason,
3384 );
3385 }
3386
3387 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3388 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3390 let task_info = if let Some(col_task_ty) = collectible
3392 .try_get_task_id()
3393 .map(|t| self.debug_get_task_description(t))
3394 {
3395 Cow::Owned(format!(" (return type of {col_task_ty})"))
3396 } else {
3397 Cow::Borrowed("")
3398 };
3399 panic!("Collectible{task_info} must be a ResolvedVc")
3400 };
3401 if col_task_id.is_transient() && !task_id.is_transient() {
3402 let transient_reason =
3403 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3404 Cow::Owned(format!(
3405 ". The collectible is transient because it depends on:\n{}",
3406 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3407 ))
3408 } else {
3409 Cow::Borrowed("")
3410 };
3411 panic!(
3413 "Collectible is transient, transient collectibles cannot be emitted from \
3414 persistent tasks{transient_reason}",
3415 )
3416 }
3417 }
3418}
3419
3420impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3421 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3422 self.0.startup(turbo_tasks);
3423 }
3424
3425 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3426 self.0.stopping();
3427 }
3428
3429 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3430 self.0.stop(turbo_tasks);
3431 }
3432
3433 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3434 self.0.idle_start(turbo_tasks);
3435 }
3436
3437 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3438 self.0.idle_end();
3439 }
3440
3441 fn get_or_create_task(
3442 &self,
3443 native_fn: &'static NativeFunction,
3444 this: Option<RawVc>,
3445 arg: &mut dyn StackDynTaskInputs,
3446 parent_task: Option<TaskId>,
3447 persistence: TaskPersistence,
3448 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3449 ) -> TaskId {
3450 self.0
3451 .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3452 }
3453
3454 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3455 self.0.invalidate_task(task_id, turbo_tasks);
3456 }
3457
3458 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3459 self.0.invalidate_tasks(tasks, turbo_tasks);
3460 }
3461
3462 fn invalidate_tasks_set(
3463 &self,
3464 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3465 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3466 ) {
3467 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3468 }
3469
3470 fn invalidate_serialization(
3471 &self,
3472 task_id: TaskId,
3473 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3474 ) {
3475 self.0.invalidate_serialization(task_id, turbo_tasks);
3476 }
3477
3478 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3479 self.0.task_execution_canceled(task, turbo_tasks)
3480 }
3481
3482 fn try_start_task_execution(
3483 &self,
3484 task_id: TaskId,
3485 priority: TaskPriority,
3486 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3487 ) -> Option<TaskExecutionSpec<'_>> {
3488 self.0
3489 .try_start_task_execution(task_id, priority, turbo_tasks)
3490 }
3491
3492 fn task_execution_completed(
3493 &self,
3494 task_id: TaskId,
3495 result: Result<RawVc, TurboTasksExecutionError>,
3496 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3497 #[cfg(feature = "verify_determinism")] stateful: bool,
3498 has_invalidator: bool,
3499 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3500 ) -> Option<TaskPriority> {
3501 self.0.task_execution_completed(
3502 task_id,
3503 result,
3504 cell_counters,
3505 #[cfg(feature = "verify_determinism")]
3506 stateful,
3507 has_invalidator,
3508 turbo_tasks,
3509 )
3510 }
3511
3512 type BackendJob = TurboTasksBackendJob;
3513
3514 fn run_backend_job<'a>(
3515 &'a self,
3516 job: Self::BackendJob,
3517 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3518 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3519 self.0.run_backend_job(job, turbo_tasks)
3520 }
3521
3522 fn try_read_task_output(
3523 &self,
3524 task_id: TaskId,
3525 reader: Option<TaskId>,
3526 options: ReadOutputOptions,
3527 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3528 ) -> Result<Result<RawVc, EventListener>> {
3529 self.0
3530 .try_read_task_output(task_id, reader, options, turbo_tasks)
3531 }
3532
3533 fn try_read_task_cell(
3534 &self,
3535 task_id: TaskId,
3536 cell: CellId,
3537 reader: Option<TaskId>,
3538 options: ReadCellOptions,
3539 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3540 ) -> Result<Result<TypedCellContent, EventListener>> {
3541 self.0
3542 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3543 }
3544
3545 fn try_read_own_task_cell(
3546 &self,
3547 task_id: TaskId,
3548 cell: CellId,
3549 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3550 ) -> Result<TypedCellContent> {
3551 self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3552 }
3553
3554 fn read_task_collectibles(
3555 &self,
3556 task_id: TaskId,
3557 collectible_type: TraitTypeId,
3558 reader: Option<TaskId>,
3559 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3560 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3561 self.0
3562 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3563 }
3564
3565 fn emit_collectible(
3566 &self,
3567 collectible_type: TraitTypeId,
3568 collectible: RawVc,
3569 task_id: TaskId,
3570 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3571 ) {
3572 self.0
3573 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3574 }
3575
3576 fn unemit_collectible(
3577 &self,
3578 collectible_type: TraitTypeId,
3579 collectible: RawVc,
3580 count: u32,
3581 task_id: TaskId,
3582 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3583 ) {
3584 self.0
3585 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3586 }
3587
3588 fn update_task_cell(
3589 &self,
3590 task_id: TaskId,
3591 cell: CellId,
3592 content: CellContent,
3593 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3594 content_hash: Option<CellHash>,
3595 verification_mode: VerificationMode,
3596 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3597 ) {
3598 self.0.update_task_cell(
3599 task_id,
3600 cell,
3601 content,
3602 updated_key_hashes,
3603 content_hash,
3604 verification_mode,
3605 turbo_tasks,
3606 );
3607 }
3608
3609 fn mark_own_task_as_finished(
3610 &self,
3611 task_id: TaskId,
3612 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3613 ) {
3614 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3615 }
3616
3617 fn connect_task(
3618 &self,
3619 task: TaskId,
3620 parent_task: Option<TaskId>,
3621 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3622 ) {
3623 self.0.connect_task(task, parent_task, turbo_tasks);
3624 }
3625
3626 fn create_transient_task(
3627 &self,
3628 task_type: TransientTaskType,
3629 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3630 ) -> TaskId {
3631 self.0.create_transient_task(task_type)
3632 }
3633
3634 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3635 self.0.dispose_root_task(task_id, turbo_tasks);
3636 }
3637
3638 fn task_statistics(&self) -> &TaskStatisticsApi {
3639 &self.0.task_statistics
3640 }
3641
3642 fn is_tracking_dependencies(&self) -> bool {
3643 self.0.options.dependency_tracking
3644 }
3645
3646 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3647 self.0.get_task_name(task, turbo_tasks)
3648 }
3649}
3650
3651enum DebugTraceTransientTask {
3652 Cached {
3653 task_name: &'static str,
3654 cell_type_id: Option<ValueTypeId>,
3655 cause_self: Option<Box<DebugTraceTransientTask>>,
3656 cause_args: Vec<DebugTraceTransientTask>,
3657 },
3658 Collapsed {
3660 task_name: &'static str,
3661 cell_type_id: Option<ValueTypeId>,
3662 },
3663 Uncached {
3664 cell_type_id: Option<ValueTypeId>,
3665 },
3666}
3667
3668impl DebugTraceTransientTask {
3669 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3670 let indent = " ".repeat(level);
3671 f.write_str(&indent)?;
3672
3673 fn fmt_cell_type_id(
3674 f: &mut fmt::Formatter<'_>,
3675 cell_type_id: Option<ValueTypeId>,
3676 ) -> fmt::Result {
3677 if let Some(ty) = cell_type_id {
3678 write!(
3679 f,
3680 " (read cell of type {})",
3681 get_value_type(ty).ty.global_name
3682 )
3683 } else {
3684 Ok(())
3685 }
3686 }
3687
3688 match self {
3690 Self::Cached {
3691 task_name,
3692 cell_type_id,
3693 ..
3694 }
3695 | Self::Collapsed {
3696 task_name,
3697 cell_type_id,
3698 ..
3699 } => {
3700 f.write_str(task_name)?;
3701 fmt_cell_type_id(f, *cell_type_id)?;
3702 if matches!(self, Self::Collapsed { .. }) {
3703 f.write_str(" (collapsed)")?;
3704 }
3705 }
3706 Self::Uncached { cell_type_id } => {
3707 f.write_str("unknown transient task")?;
3708 fmt_cell_type_id(f, *cell_type_id)?;
3709 }
3710 }
3711 f.write_char('\n')?;
3712
3713 if let Self::Cached {
3715 cause_self,
3716 cause_args,
3717 ..
3718 } = self
3719 {
3720 if let Some(c) = cause_self {
3721 writeln!(f, "{indent} self:")?;
3722 c.fmt_indented(f, level + 1)?;
3723 }
3724 if !cause_args.is_empty() {
3725 writeln!(f, "{indent} args:")?;
3726 for c in cause_args {
3727 c.fmt_indented(f, level + 1)?;
3728 }
3729 }
3730 }
3731 Ok(())
3732 }
3733}
3734
3735impl fmt::Display for DebugTraceTransientTask {
3736 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3737 self.fmt_indented(f, 0)
3738 }
3739}
3740
3741fn far_future() -> Instant {
3743 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3748}
3749
3750fn encode_task_data(
3762 task: TaskId,
3763 data: &TaskStorage,
3764 category: SpecificTaskDataCategory,
3765 scratch_buffer: &mut TurboBincodeBuffer,
3766) -> Result<TurboBincodeBuffer> {
3767 scratch_buffer.clear();
3768 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3769 data.encode(category, &mut encoder)?;
3770
3771 if cfg!(feature = "verify_serialization") {
3772 TaskStorage::new()
3773 .decode(
3774 category,
3775 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3776 )
3777 .with_context(|| {
3778 format!(
3779 "expected to be able to decode serialized data for '{category:?}' information \
3780 for {task}"
3781 )
3782 })?;
3783 }
3784 Ok(SmallVec::from_slice(scratch_buffer))
3785}