1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9 borrow::Cow,
10 fmt::{self, Write},
11 future::Future,
12 hash::BuildHasherDefault,
13 mem::take,
14 pin::Pin,
15 sync::{
16 Arc, LazyLock,
17 atomic::{AtomicBool, Ordering},
18 },
19 time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32 CellId, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions,
33 ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, TaskExecutionReason,
34 TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic,
35 ValueTypeId,
36 backend::{
37 Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38 TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40 VerificationMode,
41 },
42 event::{Event, EventDescription, EventListener},
43 macro_helpers::NativeFunction,
44 message_queue::{TimingEvent, TraceEvent},
45 registry::get_value_type,
46 scope::scope_and_block,
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51
52pub use self::{
53 operation::AnyOperation,
54 storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
55};
56#[cfg(feature = "trace_task_dirty")]
57use crate::backend::operation::TaskDirtyCause;
58use crate::{
59 backend::{
60 operation::{
61 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64 connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65 prepare_new_children,
66 },
67 snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68 storage::Storage,
69 storage_schema::{TaskStorage, TaskStorageAccessors},
70 },
71 backing_storage::{BackingStorage, SnapshotItem, SnapshotMeta, compute_task_type_hash},
72 data::{
73 ActivenessState, CellDependency, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
74 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, TransientTask,
75 },
76 error::TaskError,
77 utils::{
78 dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79 shard_amount::compute_shard_amount,
80 },
81};
82
83const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92 .ok()
93 .and_then(|v| v.parse::<u64>().ok())
94 .map(Duration::from_millis)
95 .unwrap_or(Duration::from_secs(2))
96});
97
98fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105 TaskPriority::invalidation(
106 task.get_leaf_distance()
107 .copied()
108 .unwrap_or_default()
109 .distance,
110 )
111 .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115 ReadOnly,
117 ReadWrite,
120 ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126 pub dependency_tracking: bool,
131
132 pub active_tracking: bool,
138
139 pub storage_mode: Option<StorageMode>,
141
142 pub num_workers: Option<usize>,
145
146 pub small_preallocation: bool,
148
149 pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156 fn default() -> Self {
157 Self {
158 dependency_tracking: true,
159 active_tracking: true,
160 storage_mode: Some(StorageMode::ReadWrite),
161 num_workers: None,
162 small_preallocation: false,
163 evict_after_snapshot: false,
164 }
165 }
166}
167
168pub enum TurboTasksBackendJob {
169 Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175 options: BackendOptions,
176
177 start_time: Instant,
178
179 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182 storage: Storage,
183
184 snapshot_coord: SnapshotCoordinator,
187 snapshot_in_progress: Mutex<()>,
192
193 stopping: AtomicBool,
194 stopping_event: Event,
195 idle_start_event: Event,
196 idle_end_event: Event,
197 #[cfg(feature = "verify_aggregation_graph")]
198 is_idle: AtomicBool,
199
200 task_statistics: TaskStatisticsApi,
201
202 backing_storage: B,
203
204 #[cfg(feature = "verify_aggregation_graph")]
205 root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210 Self(Arc::new(TurboTasksBackendInner::new(
211 options,
212 backing_storage,
213 )))
214 }
215
216 pub fn backing_storage(&self) -> &B {
217 &self.0.backing_storage
218 }
219
220 pub fn snapshot_and_evict_for_testing(
227 &self,
228 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
229 ) -> (bool, EvictionCounts) {
230 self.0.snapshot_and_evict_for_testing(turbo_tasks)
231 }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237 if !options.dependency_tracking {
238 options.active_tracking = false;
239 }
240 let small_preallocation = options.small_preallocation;
241 let next_task_id = backing_storage
242 .next_free_task_id()
243 .expect("Failed to get task id");
244 Self {
245 options,
246 start_time: Instant::now(),
247 persisted_task_id_factory: IdFactoryWithReuse::new(
248 next_task_id,
249 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250 ),
251 transient_task_id_factory: IdFactoryWithReuse::new(
252 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253 TaskId::MAX,
254 ),
255 storage: Storage::new(shard_amount, small_preallocation),
256 snapshot_coord: SnapshotCoordinator::new(),
257 snapshot_in_progress: Mutex::new(()),
258 stopping: AtomicBool::new(false),
259 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262 #[cfg(feature = "verify_aggregation_graph")]
263 is_idle: AtomicBool::new(false),
264 task_statistics: TaskStatisticsApi::default(),
265 backing_storage,
266 #[cfg(feature = "verify_aggregation_graph")]
267 root_tasks: Default::default(),
268 }
269 }
270
271 fn execute_context<'a>(
272 &'a self,
273 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
274 ) -> impl ExecuteContext<'a> {
275 ExecuteContextImpl::new(self, turbo_tasks)
276 }
277
278 fn suspending_requested(&self) -> bool {
279 self.should_persist() && self.snapshot_coord.snapshot_pending()
280 }
281
282 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283 if self.should_persist() {
284 self.snapshot_coord.suspend_point(suspend);
285 }
286 }
287
288 pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289 if !self.should_persist() {
290 return OperationGuard::noop();
291 }
292 self.snapshot_coord.begin_operation()
293 }
294
295 fn should_persist(&self) -> bool {
296 matches!(
297 self.options.storage_mode,
298 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299 )
300 }
301
302 fn should_evict(&self) -> bool {
303 self.options.evict_after_snapshot && self.should_persist()
304 }
305
306 #[doc(hidden)]
313 pub fn snapshot_and_evict_for_testing(
314 &self,
315 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
316 ) -> (bool, EvictionCounts) {
317 assert!(
318 self.should_persist(),
319 "snapshot_and_evict requires persistence"
320 );
321 let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322 let had_new_data = match snapshot_result {
323 Ok((_, new_data)) => new_data,
324 Err(_) => {
325 return (false, EvictionCounts::default());
329 }
330 };
331 let counts = self.storage.evict_after_snapshot(None);
332 (had_new_data, counts)
333 }
334
335 fn should_restore(&self) -> bool {
336 self.options.storage_mode.is_some()
337 }
338
339 fn should_track_dependencies(&self) -> bool {
340 self.options.dependency_tracking
341 }
342
343 fn should_track_activeness(&self) -> bool {
344 self.options.active_tracking
345 }
346
347 fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348 self.task_statistics
349 .map(|stats| stats.increment_cache_hit(native_fn));
350 }
351
352 fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353 self.task_statistics
354 .map(|stats| stats.increment_cache_miss(native_fn));
355 }
356
357 fn task_error_to_turbo_tasks_execution_error(
362 &self,
363 error: &TaskError,
364 ctx: &mut impl ExecuteContext<'_>,
365 ) -> TurboTasksExecutionError {
366 match error {
367 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369 message: item.message.clone(),
370 source: item
371 .source
372 .as_ref()
373 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374 })),
375 TaskError::LocalTaskContext(local_task_context) => {
376 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377 name: local_task_context.name.clone(),
378 source: local_task_context
379 .source
380 .as_ref()
381 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382 }))
383 }
384 TaskError::TaskChain(chain) => {
385 let task_id = chain.last().unwrap();
386 let error = {
387 let task = ctx.task(*task_id, TaskDataCategory::Meta);
388 if let Some(OutputValue::Error(error)) = task.get_output() {
389 Some(error.clone())
390 } else {
391 None
392 }
393 };
394 let error = error.map_or_else(
395 || {
396 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399 "Error no longer available",
400 )),
401 location: None,
402 }))
403 },
404 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405 );
406 let mut current_error = error;
407 for &task_id in chain.iter().rev() {
408 current_error =
409 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410 task_id,
411 source: Some(current_error),
412 turbo_tasks: ctx.turbo_tasks(),
413 }));
414 }
415 current_error
416 }
417 }
418 }
419}
420
421struct TaskExecutionCompletePrepareResult {
423 pub new_children: FxHashSet<TaskId>,
424 pub is_now_immutable: bool,
425 #[cfg(feature = "verify_determinism")]
426 pub no_output_set: bool,
427 pub new_output: Option<OutputValue>,
428 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
429 pub is_recomputation: bool,
430 pub is_session_dependent: bool,
431}
432
433impl<B: BackingStorage> TurboTasksBackendInner<B> {
435 fn try_read_task_output(
436 self: &Arc<Self>,
437 task_id: TaskId,
438 reader: Option<TaskId>,
439 options: ReadOutputOptions,
440 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
441 ) -> Result<Result<RawVc, EventListener>> {
442 self.assert_not_persistent_calling_transient(reader, task_id, None);
443
444 let mut ctx = self.execute_context(turbo_tasks);
445 let need_reader_task = if self.should_track_dependencies()
446 && !matches!(options.tracking, ReadTracking::Untracked)
447 && reader.is_some_and(|reader_id| reader_id != task_id)
448 && let Some(reader_id) = reader
449 && reader_id != task_id
450 {
451 Some(reader_id)
452 } else {
453 None
454 };
455 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
456 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
460 (task, Some(reader))
461 } else {
462 (ctx.task(task_id, TaskDataCategory::All), None)
463 };
464
465 fn listen_to_done_event(
466 reader_description: Option<EventDescription>,
467 tracking: ReadTracking,
468 done_event: &Event,
469 ) -> EventListener {
470 done_event.listen_with_note(move || {
471 move || {
472 if let Some(reader_description) = reader_description.as_ref() {
473 format!(
474 "try_read_task_output from {} ({})",
475 reader_description, tracking
476 )
477 } else {
478 format!("try_read_task_output ({})", tracking)
479 }
480 }
481 })
482 }
483
484 fn check_in_progress(
485 task: &impl TaskGuard,
486 reader_description: Option<EventDescription>,
487 tracking: ReadTracking,
488 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
489 {
490 match task.get_in_progress() {
491 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
492 listen_to_done_event(reader_description, tracking, done_event),
493 ))),
494 Some(InProgressState::InProgress(box InProgressStateInner {
495 done_event, ..
496 })) => Some(Ok(Err(listen_to_done_event(
497 reader_description,
498 tracking,
499 done_event,
500 )))),
501 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
502 "{} was canceled",
503 task.get_task_description()
504 ))),
505 None => None,
506 }
507 }
508
509 if matches!(options.consistency, ReadConsistency::Strong) {
510 if task
511 .get_persistent_task_type()
512 .is_some_and(|t| !t.native_fn.is_root)
513 {
514 drop(task);
515 drop(reader_task);
516 panic!(
517 "Strongly consistent read of non-root task {} (reader: {}). The `root` \
518 attribute is missing on the task.",
519 self.debug_get_task_description(task_id),
520 reader.map_or_else(
521 || "unknown".to_string(),
522 |r| self.debug_get_task_description(r)
523 )
524 );
525 }
526
527 let is_dirty = task.is_dirty();
528
529 let has_dirty_containers = task.has_dirty_containers();
531 if has_dirty_containers || is_dirty.is_some() {
532 let activeness = task.get_activeness_mut();
533 let mut task_ids_to_schedule: Vec<_> = Vec::new();
534 let activeness = if let Some(activeness) = activeness {
536 activeness.set_active_until_clean();
540 activeness
541 } else {
542 if ctx.should_track_activeness() {
546 task_ids_to_schedule = task.dirty_containers().collect();
548 task_ids_to_schedule.push(task_id);
549 }
550 let activeness =
551 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
552 activeness.set_active_until_clean();
553 activeness
554 };
555 let listener = activeness.all_clean_event.listen_with_note(move || {
556 let this = self.clone();
557 let tt = turbo_tasks.pin();
558 move || {
559 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
560 let mut ctx = this.execute_context(tt);
561 let mut visited = FxHashSet::default();
562 fn indent(s: &str) -> String {
563 s.split_inclusive('\n')
564 .flat_map(|line: &str| [" ", line].into_iter())
565 .collect::<String>()
566 }
567 fn get_info(
568 ctx: &mut impl ExecuteContext<'_>,
569 task_id: TaskId,
570 parent_and_count: Option<(TaskId, i32)>,
571 visited: &mut FxHashSet<TaskId>,
572 ) -> String {
573 let task = ctx.task(task_id, TaskDataCategory::All);
574 let is_dirty = task.is_dirty();
575 let in_progress =
576 task.get_in_progress()
577 .map_or("not in progress", |p| match p {
578 InProgressState::InProgress(_) => "in progress",
579 InProgressState::Scheduled { .. } => "scheduled",
580 InProgressState::Canceled => "canceled",
581 });
582 let activeness = task.get_activeness().map_or_else(
583 || "not active".to_string(),
584 |activeness| format!("{activeness:?}"),
585 );
586 let aggregation_number = get_aggregation_number(&task);
587 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
588 {
589 let uppers = get_uppers(&task);
590 !uppers.contains(&parent_task_id)
591 } else {
592 false
593 };
594
595 let has_dirty_containers = task.has_dirty_containers();
597
598 let task_description = task.get_task_description();
599 let is_dirty_label = if let Some(parent_priority) = is_dirty {
600 format!(", dirty({parent_priority})")
601 } else {
602 String::new()
603 };
604 let has_dirty_containers_label = if has_dirty_containers {
605 ", dirty containers"
606 } else {
607 ""
608 };
609 let count = if let Some((_, count)) = parent_and_count {
610 format!(" {count}")
611 } else {
612 String::new()
613 };
614 let mut info = format!(
615 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
616 {in_progress}, \
617 {activeness}{is_dirty_label}{has_dirty_containers_label})",
618 );
619 let children: Vec<_> = task.dirty_containers_with_count().collect();
620 drop(task);
621
622 if missing_upper {
623 info.push_str("\n ERROR: missing upper connection");
624 }
625
626 if has_dirty_containers || !children.is_empty() {
627 writeln!(info, "\n dirty tasks:").unwrap();
628
629 for (child_task_id, count) in children {
630 let task_description = ctx
631 .task(child_task_id, TaskDataCategory::Data)
632 .get_task_description();
633 if visited.insert(child_task_id) {
634 let child_info = get_info(
635 ctx,
636 child_task_id,
637 Some((task_id, count)),
638 visited,
639 );
640 info.push_str(&indent(&child_info));
641 if !info.ends_with('\n') {
642 info.push('\n');
643 }
644 } else {
645 writeln!(
646 info,
647 " {child_task_id} {task_description} {count} \
648 (already visited)"
649 )
650 .unwrap();
651 }
652 }
653 }
654 info
655 }
656 let info = get_info(&mut ctx, task_id, None, &mut visited);
657 format!(
658 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
659 )
660 }
661 });
662 drop(reader_task);
663 drop(task);
664 if !task_ids_to_schedule.is_empty() {
665 let mut queue = AggregationUpdateQueue::new();
666 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
667 queue.execute(&mut ctx);
668 }
669
670 return Ok(Err(listener));
671 }
672 }
673
674 let reader_description = reader_task
675 .as_ref()
676 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
677 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
678 {
679 return value;
680 }
681
682 if let Some(output) = task.get_output() {
683 let result = match output {
684 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
685 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
686 OutputValue::Error(error) => Err(error.clone()),
687 };
688 if let Some(mut reader_task) = reader_task.take()
689 && options.tracking.should_track(result.is_err())
690 && (!task.immutable() || cfg!(feature = "verify_immutable"))
691 {
692 #[cfg(feature = "trace_task_output_dependencies")]
693 let _span = tracing::trace_span!(
694 "add output dependency",
695 task = %task_id,
696 dependent_task = ?reader
697 )
698 .entered();
699 let mut queue = LeafDistanceUpdateQueue::new();
700 let reader = reader.unwrap();
701 if task.add_output_dependent(reader) {
702 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
704 let reader_leaf_distance =
705 reader_task.get_leaf_distance().copied().unwrap_or_default();
706 if reader_leaf_distance.distance <= leaf_distance.distance {
707 queue.push(
708 reader,
709 leaf_distance.distance,
710 leaf_distance.max_distance_in_buffer,
711 );
712 }
713 }
714
715 drop(task);
716
717 if !reader_task.remove_outdated_output_dependencies(&task_id) {
723 let _ = reader_task.add_output_dependencies(task_id);
724 }
725 drop(reader_task);
726
727 queue.execute(&mut ctx);
728 } else {
729 drop(task);
730 }
731
732 return result.map_err(|error| {
733 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
734 .with_task_context(task_id, turbo_tasks.pin())
735 .into()
736 });
737 }
738 drop(reader_task);
739
740 let note = EventDescription::new(|| {
741 move || {
742 if let Some(reader) = reader_description.as_ref() {
743 format!("try_read_task_output (recompute) from {reader}",)
744 } else {
745 "try_read_task_output (recompute, untracked)".to_string()
746 }
747 }
748 });
749
750 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
752 TaskExecutionReason::OutputNotAvailable,
753 EventDescription::new(|| task.get_task_desc_fn()),
754 note,
755 );
756
757 let old = task.set_in_progress(in_progress_state);
760 debug_assert!(old.is_none(), "InProgress already exists");
761 ctx.schedule_task(task, TaskPriority::Recomputation);
762
763 Ok(Err(listener))
764 }
765
766 fn try_read_task_cell(
767 &self,
768 task_id: TaskId,
769 reader: Option<TaskId>,
770 cell: CellId,
771 options: ReadCellOptions,
772 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
773 ) -> Result<Result<TypedCellContent, EventListener>> {
774 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
775
776 fn add_cell_dependency(
777 task_id: TaskId,
778 mut task: impl TaskGuard,
779 reader: Option<TaskId>,
780 reader_task: Option<impl TaskGuard>,
781 cell: CellId,
782 key: Option<u64>,
783 ) {
784 if let Some(mut reader_task) = reader_task
785 && (!task.immutable() || cfg!(feature = "verify_immutable"))
786 {
787 let reader = reader.unwrap();
788 let _ = task
789 .add_cell_dependents(CellDependency::new(CellRef { task: reader, cell }, key));
790 drop(task);
791
792 let target = CellRef {
798 task: task_id,
799 cell,
800 };
801 let dep = CellDependency::new(target, key);
802 if !reader_task.remove_outdated_cell_dependencies(&dep) {
803 let _ = reader_task.add_cell_dependencies(dep);
804 }
805 drop(reader_task);
806 }
807 }
808
809 let ReadCellOptions {
810 tracking,
811 final_read_hint,
812 } = options;
813
814 let mut ctx = self.execute_context(turbo_tasks);
815 let (mut task, reader_task) = if self.should_track_dependencies()
816 && !matches!(tracking, ReadCellTracking::Untracked)
817 && let Some(reader_id) = reader
818 && reader_id != task_id
819 {
820 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
824 (task, Some(reader))
825 } else {
826 (ctx.task(task_id, TaskDataCategory::All), None)
827 };
828
829 let content = if final_read_hint {
830 task.remove_cell_data(&cell)
831 } else {
832 task.get_cell_data(&cell).cloned()
833 };
834 if let Some(content) = content {
835 if tracking.should_track(false) {
836 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
837 }
838 return Ok(Ok(TypedCellContent(
839 cell.type_id,
840 CellContent(Some(content)),
841 )));
842 }
843
844 let in_progress = task.get_in_progress();
845 if matches!(
846 in_progress,
847 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
848 ) {
849 return Ok(Err(self
850 .listen_to_cell(&mut task, task_id, &reader_task, cell)
851 .0));
852 }
853 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
854
855 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
857 let Some(max_id) = max_id else {
858 let task_desc = task.get_task_description();
859 if tracking.should_track(true) {
860 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
861 }
862 bail!(
863 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
864 );
865 };
866 if cell.index >= max_id {
867 let task_desc = task.get_task_description();
868 if tracking.should_track(true) {
869 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
870 }
871 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
872 }
873
874 if is_cancelled {
880 bail!("{} was canceled", task.get_task_description());
881 }
882
883 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
885 drop(reader_task);
886 if !new_listener {
887 return Ok(Err(listener));
888 }
889
890 let _span = tracing::trace_span!(
891 "recomputation",
892 cell_type = get_value_type(cell.type_id).ty.global_name,
893 cell_index = cell.index
894 )
895 .entered();
896
897 let _ = task.add_scheduled(
898 TaskExecutionReason::CellNotAvailable,
899 EventDescription::new(|| task.get_task_desc_fn()),
900 );
901 ctx.schedule_task(task, TaskPriority::Recomputation);
902
903 Ok(Err(listener))
904 }
905
906 fn listen_to_cell(
907 &self,
908 task: &mut impl TaskGuard,
909 task_id: TaskId,
910 reader_task: &Option<impl TaskGuard>,
911 cell: CellId,
912 ) -> (EventListener, bool) {
913 let note = || {
914 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
915 move || {
916 if let Some(reader_desc) = reader_desc.as_ref() {
917 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
918 } else {
919 "try_read_task_cell (in progress, untracked)".to_string()
920 }
921 }
922 };
923 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
924 let listener = in_progress.event.listen_with_note(note);
926 return (listener, false);
927 }
928 let in_progress = InProgressCellState::new(task_id, cell);
929 let listener = in_progress.event.listen_with_note(note);
930 let old = task.insert_in_progress_cells(cell, in_progress);
931 debug_assert!(old.is_none(), "InProgressCell already exists");
932 (listener, true)
933 }
934
935 fn snapshot_and_persist(
936 &self,
937 parent_span: Option<tracing::Id>,
938 reason: &str,
939 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
940 ) -> Result<(Instant, bool), anyhow::Error> {
941 let snapshot_span =
942 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
943 .entered();
944 let _snapshot_in_progress = self.snapshot_in_progress.lock();
948 let start = Instant::now();
949 let wall_start = SystemTime::now();
953 debug_assert!(self.should_persist());
954
955 let mut snapshot_phase = {
956 let _span = tracing::info_span!("blocking").entered();
957 self.snapshot_coord.begin_snapshot()
958 };
959 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
962
963 let suspended_operations = snapshot_phase.take_suspended_operations();
964
965 let snapshot_time = Instant::now();
966 drop(snapshot_phase);
967
968 if !has_modifications {
969 drop(snapshot_guard);
972 return Ok((start, false));
973 }
974
975 #[cfg(feature = "print_cache_item_size")]
976 #[derive(Default)]
977 struct TaskCacheStats {
978 data: usize,
979 #[cfg(feature = "print_cache_item_size_with_compressed")]
980 data_compressed: usize,
981 data_count: usize,
982 meta: usize,
983 #[cfg(feature = "print_cache_item_size_with_compressed")]
984 meta_compressed: usize,
985 meta_count: usize,
986 upper_count: usize,
987 collectibles_count: usize,
988 aggregated_collectibles_count: usize,
989 children_count: usize,
990 followers_count: usize,
991 collectibles_dependents_count: usize,
992 aggregated_dirty_containers_count: usize,
993 output_size: usize,
994 }
995 #[cfg(feature = "print_cache_item_size")]
998 struct FormatSizes {
999 size: usize,
1000 #[cfg(feature = "print_cache_item_size_with_compressed")]
1001 compressed_size: usize,
1002 }
1003 #[cfg(feature = "print_cache_item_size")]
1004 impl std::fmt::Display for FormatSizes {
1005 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1006 use turbo_tasks::util::FormatBytes;
1007 #[cfg(feature = "print_cache_item_size_with_compressed")]
1008 {
1009 write!(
1010 f,
1011 "{} ({} compressed)",
1012 FormatBytes(self.size),
1013 FormatBytes(self.compressed_size)
1014 )
1015 }
1016 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1017 {
1018 write!(f, "{}", FormatBytes(self.size))
1019 }
1020 }
1021 }
1022 #[cfg(feature = "print_cache_item_size")]
1023 impl TaskCacheStats {
1024 #[cfg(feature = "print_cache_item_size_with_compressed")]
1025 fn compressed_size(data: &[u8]) -> Result<usize> {
1026 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1027 data,
1028 &mut Vec::new(),
1029 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1030 )?)
1031 }
1032
1033 fn add_data(&mut self, data: &[u8]) {
1034 self.data += data.len();
1035 #[cfg(feature = "print_cache_item_size_with_compressed")]
1036 {
1037 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1038 }
1039 self.data_count += 1;
1040 }
1041
1042 fn add_meta(&mut self, data: &[u8]) {
1043 self.meta += data.len();
1044 #[cfg(feature = "print_cache_item_size_with_compressed")]
1045 {
1046 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1047 }
1048 self.meta_count += 1;
1049 }
1050
1051 fn add_counts(&mut self, storage: &TaskStorage) {
1052 let counts = storage.meta_counts();
1053 self.upper_count += counts.upper;
1054 self.collectibles_count += counts.collectibles;
1055 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1056 self.children_count += counts.children;
1057 self.followers_count += counts.followers;
1058 self.collectibles_dependents_count += counts.collectibles_dependents;
1059 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1060 if let Some(output) = storage.get_output() {
1061 use turbo_bincode::turbo_bincode_encode;
1062
1063 self.output_size += turbo_bincode_encode(&output)
1064 .map(|data| data.len())
1065 .unwrap_or(0);
1066 }
1067 }
1068
1069 fn task_name(storage: &TaskStorage) -> String {
1071 storage
1072 .get_persistent_task_type()
1073 .map(|t| t.to_string())
1074 .unwrap_or_else(|| "<unknown>".to_string())
1075 }
1076
1077 fn sort_key(&self) -> usize {
1080 #[cfg(feature = "print_cache_item_size_with_compressed")]
1081 {
1082 self.data_compressed + self.meta_compressed
1083 }
1084 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1085 {
1086 self.data + self.meta
1087 }
1088 }
1089
1090 fn format_total(&self) -> FormatSizes {
1091 FormatSizes {
1092 size: self.data + self.meta,
1093 #[cfg(feature = "print_cache_item_size_with_compressed")]
1094 compressed_size: self.data_compressed + self.meta_compressed,
1095 }
1096 }
1097
1098 fn format_data(&self) -> FormatSizes {
1099 FormatSizes {
1100 size: self.data,
1101 #[cfg(feature = "print_cache_item_size_with_compressed")]
1102 compressed_size: self.data_compressed,
1103 }
1104 }
1105
1106 fn format_avg_data(&self) -> FormatSizes {
1107 FormatSizes {
1108 size: self.data.checked_div(self.data_count).unwrap_or(0),
1109 #[cfg(feature = "print_cache_item_size_with_compressed")]
1110 compressed_size: self
1111 .data_compressed
1112 .checked_div(self.data_count)
1113 .unwrap_or(0),
1114 }
1115 }
1116
1117 fn format_meta(&self) -> FormatSizes {
1118 FormatSizes {
1119 size: self.meta,
1120 #[cfg(feature = "print_cache_item_size_with_compressed")]
1121 compressed_size: self.meta_compressed,
1122 }
1123 }
1124
1125 fn format_avg_meta(&self) -> FormatSizes {
1126 FormatSizes {
1127 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1128 #[cfg(feature = "print_cache_item_size_with_compressed")]
1129 compressed_size: self
1130 .meta_compressed
1131 .checked_div(self.meta_count)
1132 .unwrap_or(0),
1133 }
1134 }
1135 }
1136 #[cfg(feature = "print_cache_item_size")]
1137 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1138 Mutex::new(FxHashMap::default());
1139
1140 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1147 let encode_category = |task_id: TaskId,
1148 data: &TaskStorage,
1149 category: SpecificTaskDataCategory,
1150 buffer: &mut TurboBincodeBuffer|
1151 -> Option<TurboBincodeBuffer> {
1152 match encode_task_data(task_id, data, category, buffer) {
1153 Ok(encoded) => {
1154 #[cfg(feature = "print_cache_item_size")]
1155 {
1156 let mut stats = task_cache_stats.lock();
1157 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1158 match category {
1159 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1160 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1161 }
1162 }
1163 Some(encoded)
1164 }
1165 Err(err) => {
1166 panic!(
1167 "Serializing task {} failed ({:?}): {:?}",
1168 self.debug_get_task_description(task_id),
1169 category,
1170 err
1171 );
1172 }
1173 }
1174 };
1175 if task_id.is_transient() {
1176 unreachable!("transient task_ids should never be enqueued to be persisted");
1177 }
1178
1179 let encode_meta = inner.flags.meta_modified();
1180 let encode_data = inner.flags.data_modified();
1181
1182 #[cfg(feature = "print_cache_item_size")]
1183 if encode_data || encode_meta {
1184 task_cache_stats
1185 .lock()
1186 .entry(TaskCacheStats::task_name(inner))
1187 .or_default()
1188 .add_counts(inner);
1189 }
1190
1191 let meta = if encode_meta {
1192 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1193 } else {
1194 None
1195 };
1196
1197 let data = if encode_data {
1198 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1199 } else {
1200 None
1201 };
1202 let task_type_hash = if inner.flags.new_task() {
1203 let task_type = inner.get_persistent_task_type().expect(
1204 "It is not possible for a new_task to not have a persistent_task_type. Task \
1205 creation for persistent tasks uses a single ExecutionContextImpl for \
1206 creating the task (which sets new_task) and connect_child (which sets \
1207 persistent_task_type) and take_snapshot waits for all operations to complete \
1208 or suspend before we start snapshotting. So task creation will always set \
1209 the task_type.",
1210 );
1211 Some(compute_task_type_hash(task_type))
1212 } else {
1213 None
1214 };
1215
1216 SnapshotItem {
1217 task_id,
1218 meta,
1219 data,
1220 task_type_hash,
1221 }
1222 };
1223
1224 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1225
1226 drop(snapshot_span);
1227 let snapshot_duration = start.elapsed();
1228 let task_count = task_snapshots.len();
1229
1230 if task_snapshots.is_empty() {
1231 std::hint::cold_path();
1234 return Ok((snapshot_time, false));
1235 }
1236
1237 let persist_start = Instant::now();
1238 let span = tracing::info_span!(
1239 parent: parent_span,
1240 "persist",
1241 reason = reason,
1242 data_items= tracing::field::Empty,
1243 meta_items= tracing::field::Empty,
1244 task_cache_items= tracing::field::Empty,
1245 next_task_id= tracing::field::Empty,)
1246 .entered();
1247 {
1248 let SnapshotMeta {
1252 task_cache_items,
1253 data_items,
1254 meta_items,
1255 max_next_task_id,
1256 } = self
1257 .backing_storage
1258 .save_snapshot(suspended_operations, task_snapshots)?;
1259 span.record("data_items", data_items);
1260 span.record("meta_items", meta_items);
1261 span.record("task_cache_items", task_cache_items);
1262 span.record("next_task_id", max_next_task_id);
1263
1264 #[cfg(feature = "print_cache_item_size")]
1265 {
1266 let mut task_cache_stats = task_cache_stats
1267 .into_inner()
1268 .into_iter()
1269 .collect::<Vec<_>>();
1270 if !task_cache_stats.is_empty() {
1271 use turbo_tasks::util::FormatBytes;
1272
1273 use crate::utils::markdown_table::print_markdown_table;
1274
1275 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1276 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1277 });
1278
1279 println!(
1280 "Task cache stats: {}",
1281 FormatSizes {
1282 size: task_cache_stats
1283 .iter()
1284 .map(|(_, s)| s.data + s.meta)
1285 .sum::<usize>(),
1286 #[cfg(feature = "print_cache_item_size_with_compressed")]
1287 compressed_size: task_cache_stats
1288 .iter()
1289 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1290 .sum::<usize>()
1291 },
1292 );
1293
1294 print_markdown_table(
1295 [
1296 "Task",
1297 " Total Size",
1298 " Data Size",
1299 " Data Count x Avg",
1300 " Data Count x Avg",
1301 " Meta Size",
1302 " Meta Count x Avg",
1303 " Meta Count x Avg",
1304 " Uppers",
1305 " Coll",
1306 " Agg Coll",
1307 " Children",
1308 " Followers",
1309 " Coll Deps",
1310 " Agg Dirty",
1311 " Output Size",
1312 ],
1313 task_cache_stats.iter(),
1314 |(task_desc, stats)| {
1315 [
1316 task_desc.to_string(),
1317 format!(" {}", stats.format_total()),
1318 format!(" {}", stats.format_data()),
1319 format!(" {} x", stats.data_count),
1320 format!("{}", stats.format_avg_data()),
1321 format!(" {}", stats.format_meta()),
1322 format!(" {} x", stats.meta_count),
1323 format!("{}", stats.format_avg_meta()),
1324 format!(" {}", stats.upper_count),
1325 format!(" {}", stats.collectibles_count),
1326 format!(" {}", stats.aggregated_collectibles_count),
1327 format!(" {}", stats.children_count),
1328 format!(" {}", stats.followers_count),
1329 format!(" {}", stats.collectibles_dependents_count),
1330 format!(" {}", stats.aggregated_dirty_containers_count),
1331 format!(" {}", FormatBytes(stats.output_size)),
1332 ]
1333 },
1334 );
1335 }
1336 }
1337 }
1338
1339 let elapsed = start.elapsed();
1340 let persist_duration = persist_start.elapsed();
1341 if elapsed > Duration::from_secs(10) {
1343 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1344 "Finished writing to filesystem cache".to_string(),
1345 elapsed,
1346 )));
1347 }
1348
1349 let wall_start_ms = wall_start
1350 .duration_since(SystemTime::UNIX_EPOCH)
1351 .unwrap_or_default()
1352 .as_secs_f64()
1354 * 1000.0;
1355 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1356 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1357 "turbopack-persistence",
1358 wall_start_ms,
1359 wall_end_ms,
1360 vec![
1361 ("reason", serde_json::Value::from(reason)),
1362 (
1363 "snapshot_duration_ms",
1364 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1365 ),
1366 (
1367 "persist_duration_ms",
1368 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1369 ),
1370 ("task_count", serde_json::Value::from(task_count)),
1371 ],
1372 )));
1373
1374 Ok((snapshot_time, true))
1375 }
1376
1377 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1378 if self.should_restore() {
1379 let uncompleted_operations = self
1383 .backing_storage
1384 .uncompleted_operations()
1385 .expect("Failed to get uncompleted operations");
1386 if !uncompleted_operations.is_empty() {
1387 let mut ctx = self.execute_context(turbo_tasks);
1388 for op in uncompleted_operations {
1389 op.execute(&mut ctx);
1390 }
1391 }
1392 }
1393
1394 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1397 let _span = trace_span!("persisting background job").entered();
1399 let _span = tracing::info_span!("thread").entered();
1400 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1401 }
1402 }
1403
1404 fn stopping(&self) {
1405 self.stopping.store(true, Ordering::Release);
1406 self.stopping_event.notify(usize::MAX);
1407 }
1408
1409 #[allow(unused_variables)]
1410 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1411 #[cfg(feature = "verify_aggregation_graph")]
1412 {
1413 self.is_idle.store(false, Ordering::Release);
1414 self.verify_aggregation_graph(turbo_tasks, false);
1415 }
1416 if self.should_persist()
1417 && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1418 {
1419 eprintln!("Persisting failed during shutdown: {err:?}");
1420 }
1421 self.storage.drop_contents();
1422 if let Err(err) = self.backing_storage.shutdown() {
1423 println!("Shutting down failed: {err}");
1424 }
1425 }
1426
1427 #[allow(unused_variables)]
1428 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1429 self.idle_start_event.notify(usize::MAX);
1430
1431 #[cfg(feature = "verify_aggregation_graph")]
1432 {
1433 use tokio::select;
1434
1435 self.is_idle.store(true, Ordering::Release);
1436 let this = self.clone();
1437 let turbo_tasks = turbo_tasks.pin();
1438 tokio::task::spawn(async move {
1439 select! {
1440 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1441 }
1443 _ = this.idle_end_event.listen() => {
1444 return;
1445 }
1446 }
1447 if !this.is_idle.load(Ordering::Relaxed) {
1448 return;
1449 }
1450 this.verify_aggregation_graph(&*turbo_tasks, true);
1451 });
1452 }
1453 }
1454
1455 fn idle_end(&self) {
1456 #[cfg(feature = "verify_aggregation_graph")]
1457 self.is_idle.store(false, Ordering::Release);
1458 self.idle_end_event.notify(usize::MAX);
1459 }
1460
1461 fn get_or_create_task(
1462 &self,
1463 native_fn: &'static NativeFunction,
1464 this: Option<RawVc>,
1465 arg: &mut dyn StackDynTaskInputs,
1466 parent_task: Option<TaskId>,
1467 persistence: TaskPersistence,
1468 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1469 ) -> TaskId {
1470 let transient = matches!(persistence, TaskPersistence::Transient);
1471
1472 if transient
1473 && let Some(parent_task) = parent_task
1474 && !parent_task.is_transient()
1475 {
1476 let task_type = CachedTaskType {
1477 native_fn,
1478 this,
1479 arg: arg.take_box(),
1480 };
1481 self.panic_persistent_calling_transient(
1482 self.debug_get_task_description(parent_task),
1483 Some(&task_type),
1484 None,
1485 );
1486 }
1487
1488 let is_root = native_fn.is_root;
1489
1490 let arg_ref = arg.as_ref();
1492 let hash = CachedTaskType::hash_from_components(
1493 self.storage.task_cache.hasher(),
1494 native_fn,
1495 this,
1496 arg_ref,
1497 );
1498 let shard = get_shard(&self.storage.task_cache, hash);
1502
1503 let mut ctx = self.execute_context(turbo_tasks);
1504 if let Some(task_id) =
1508 raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1509 {
1510 self.track_cache_hit_by_fn(native_fn);
1511 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1512 return task_id;
1513 }
1514
1515 let task_id = if !transient
1520 && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1521 {
1522 self.track_cache_hit_by_fn(native_fn);
1523 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1526 k.eq_components(native_fn, this, arg_ref)
1527 }) {
1528 RawEntry::Occupied(_) => {}
1529 RawEntry::Vacant(e) => {
1530 e.insert(stored_type, task_id);
1531 }
1532 };
1533 task_id
1534 } else {
1535 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1536 k.eq_components(native_fn, this, arg_ref)
1537 }) {
1538 RawEntry::Occupied(e) => {
1539 let task_id = *e.get();
1542 drop(e);
1543 self.track_cache_hit_by_fn(native_fn);
1544 task_id
1545 }
1546 RawEntry::Vacant(e) => {
1547 let task_type = CachedTaskTypeArc::new(CachedTaskType {
1551 native_fn,
1552 this,
1553 arg: arg.take_box(),
1554 });
1555 let task_id = if transient {
1556 self.transient_task_id_factory.get()
1557 } else {
1558 self.persisted_task_id_factory.get()
1559 };
1560 self.storage
1564 .initialize_new_task(task_id, Some(task_type.clone()));
1565 e.insert(task_type, task_id);
1567 self.track_cache_miss_by_fn(native_fn);
1568 if is_root {
1572 AggregationUpdateQueue::run(
1573 AggregationUpdateJob::UpdateAggregationNumber {
1574 task_id,
1575 base_aggregation_number: u32::MAX,
1576 distance: None,
1577 },
1578 &mut ctx,
1579 );
1580 } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1581 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1582 AggregationUpdateQueue::run(
1583 AggregationUpdateJob::UpdateAggregationNumber {
1584 task_id,
1585 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1586 distance: None,
1587 },
1588 &mut ctx,
1589 );
1590 };
1591
1592 task_id
1593 }
1594 }
1595 };
1596
1597 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1598
1599 task_id
1600 }
1601
1602 fn debug_trace_transient_task(
1605 &self,
1606 task_type: &CachedTaskType,
1607 cell_id: Option<CellId>,
1608 ) -> DebugTraceTransientTask {
1609 fn inner_id(
1612 backend: &TurboTasksBackendInner<impl BackingStorage>,
1613 task_id: TaskId,
1614 cell_type_id: Option<ValueTypeId>,
1615 visited_set: &mut FxHashSet<TaskId>,
1616 ) -> DebugTraceTransientTask {
1617 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1618 if visited_set.contains(&task_id) {
1619 let task_name = task_type.get_name();
1620 DebugTraceTransientTask::Collapsed {
1621 task_name,
1622 cell_type_id,
1623 }
1624 } else {
1625 inner_cached(backend, &task_type, cell_type_id, visited_set)
1626 }
1627 } else {
1628 DebugTraceTransientTask::Uncached { cell_type_id }
1629 }
1630 }
1631 fn inner_cached(
1632 backend: &TurboTasksBackendInner<impl BackingStorage>,
1633 task_type: &CachedTaskType,
1634 cell_type_id: Option<ValueTypeId>,
1635 visited_set: &mut FxHashSet<TaskId>,
1636 ) -> DebugTraceTransientTask {
1637 let task_name = task_type.get_name();
1638
1639 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1640 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1641 return None;
1645 };
1646 if task_id.is_transient() {
1647 Some(Box::new(inner_id(
1648 backend,
1649 task_id,
1650 cause_self_raw_vc.try_get_type_id(),
1651 visited_set,
1652 )))
1653 } else {
1654 None
1655 }
1656 });
1657 let cause_args = task_type
1658 .arg
1659 .get_raw_vcs()
1660 .into_iter()
1661 .filter_map(|raw_vc| {
1662 let Some(task_id) = raw_vc.try_get_task_id() else {
1663 return None;
1665 };
1666 if !task_id.is_transient() {
1667 return None;
1668 }
1669 Some((task_id, raw_vc.try_get_type_id()))
1670 })
1671 .collect::<IndexSet<_>>() .into_iter()
1673 .map(|(task_id, cell_type_id)| {
1674 inner_id(backend, task_id, cell_type_id, visited_set)
1675 })
1676 .collect();
1677
1678 DebugTraceTransientTask::Cached {
1679 task_name,
1680 cell_type_id,
1681 cause_self,
1682 cause_args,
1683 }
1684 }
1685 inner_cached(
1686 self,
1687 task_type,
1688 cell_id.map(|c| c.type_id),
1689 &mut FxHashSet::default(),
1690 )
1691 }
1692
1693 fn invalidate_task(
1694 &self,
1695 task_id: TaskId,
1696 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1697 ) {
1698 if !self.should_track_dependencies() {
1699 panic!("Dependency tracking is disabled so invalidation is not allowed");
1700 }
1701 operation::InvalidateOperation::run(
1702 smallvec![task_id],
1703 #[cfg(feature = "trace_task_dirty")]
1704 TaskDirtyCause::Invalidator,
1705 self.execute_context(turbo_tasks),
1706 );
1707 }
1708
1709 fn invalidate_tasks(
1710 &self,
1711 tasks: &[TaskId],
1712 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1713 ) {
1714 if !self.should_track_dependencies() {
1715 panic!("Dependency tracking is disabled so invalidation is not allowed");
1716 }
1717 operation::InvalidateOperation::run(
1718 tasks.iter().copied().collect(),
1719 #[cfg(feature = "trace_task_dirty")]
1720 TaskDirtyCause::Unknown,
1721 self.execute_context(turbo_tasks),
1722 );
1723 }
1724
1725 fn invalidate_tasks_set(
1726 &self,
1727 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1728 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1729 ) {
1730 if !self.should_track_dependencies() {
1731 panic!("Dependency tracking is disabled so invalidation is not allowed");
1732 }
1733 operation::InvalidateOperation::run(
1734 tasks.iter().copied().collect(),
1735 #[cfg(feature = "trace_task_dirty")]
1736 TaskDirtyCause::Unknown,
1737 self.execute_context(turbo_tasks),
1738 );
1739 }
1740
1741 fn invalidate_serialization(
1742 &self,
1743 task_id: TaskId,
1744 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1745 ) {
1746 if task_id.is_transient() {
1747 return;
1748 }
1749 let mut ctx = self.execute_context(turbo_tasks);
1750 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1751 task.invalidate_serialization();
1752 }
1753
1754 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1755 let task = self.storage.access_mut(task_id);
1756 if let Some(value) = task.get_persistent_task_type() {
1757 format!("{task_id:?} {}", value)
1758 } else if let Some(value) = task.get_transient_task_type() {
1759 format!("{task_id:?} {}", value)
1760 } else {
1761 format!("{task_id:?} unknown")
1762 }
1763 }
1764
1765 fn get_task_name(
1766 &self,
1767 task_id: TaskId,
1768 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1769 ) -> String {
1770 let mut ctx = self.execute_context(turbo_tasks);
1771 let task = ctx.task(task_id, TaskDataCategory::Data);
1772 if let Some(value) = task.get_persistent_task_type() {
1773 value.to_string()
1774 } else if let Some(value) = task.get_transient_task_type() {
1775 value.to_string()
1776 } else {
1777 "unknown".to_string()
1778 }
1779 }
1780
1781 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1782 let task = self.storage.access_mut(task_id);
1783 task.get_persistent_task_type().cloned()
1784 }
1785
1786 fn task_execution_canceled(
1787 &self,
1788 task_id: TaskId,
1789 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1790 ) {
1791 let mut ctx = self.execute_context(turbo_tasks);
1792 let mut task = ctx.task(task_id, TaskDataCategory::All);
1793 if let Some(in_progress) = task.take_in_progress() {
1794 match in_progress {
1795 InProgressState::Scheduled {
1796 done_event,
1797 reason: _,
1798 } => done_event.notify(usize::MAX),
1799 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1800 done_event.notify(usize::MAX)
1801 }
1802 InProgressState::Canceled => {}
1803 }
1804 }
1805 let in_progress_cells = task.take_in_progress_cells();
1808 if let Some(ref cells) = in_progress_cells {
1809 for state in cells.values() {
1810 state.event.notify(usize::MAX);
1811 }
1812 }
1813
1814 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1820 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1821 } else {
1822 None
1823 };
1824
1825 let old = task.set_in_progress(InProgressState::Canceled);
1826 debug_assert!(old.is_none(), "InProgress already exists");
1827 drop(task);
1828
1829 if let Some(data_update) = data_update {
1830 AggregationUpdateQueue::run(data_update, &mut ctx);
1831 }
1832
1833 drop(in_progress_cells);
1834 }
1835
1836 fn try_start_task_execution(
1837 &self,
1838 task_id: TaskId,
1839 priority: TaskPriority,
1840 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1841 ) -> Option<TaskExecutionSpec<'_>> {
1842 let execution_reason;
1843 let task_type;
1844 {
1845 let mut ctx = self.execute_context(turbo_tasks);
1846 let mut task = ctx.task(task_id, TaskDataCategory::All);
1847 task_type = task.get_task_type().to_owned();
1848 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1849 if let Some(tasks) = task.prefetch() {
1850 drop(task);
1851 ctx.prepare_tasks(tasks, "prefetch");
1852 task = ctx.task(task_id, TaskDataCategory::All);
1853 }
1854 let in_progress = task.take_in_progress()?;
1855 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1856 let old = task.set_in_progress(in_progress);
1857 debug_assert!(old.is_none(), "InProgress already exists");
1858 return None;
1859 };
1860 execution_reason = reason;
1861 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1862 InProgressStateInner {
1863 stale: false,
1864 once_task,
1865 done_event,
1866 marked_as_completed: false,
1867 new_children: Default::default(),
1868 },
1869 )));
1870 debug_assert!(old.is_none(), "InProgress already exists");
1871
1872 enum Collectible {
1874 Current(CollectibleRef, i32),
1875 Outdated(CollectibleRef),
1876 }
1877 let collectibles = task
1878 .iter_collectibles()
1879 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1880 .chain(
1881 task.iter_outdated_collectibles()
1882 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1883 )
1884 .collect::<Vec<_>>();
1885 for collectible in collectibles {
1886 match collectible {
1887 Collectible::Current(collectible, value) => {
1888 let _ = task.insert_outdated_collectible(collectible, value);
1889 }
1890 Collectible::Outdated(collectible) => {
1891 if task
1892 .collectibles()
1893 .is_none_or(|m| m.get(&collectible).is_none())
1894 {
1895 task.remove_outdated_collectibles(&collectible);
1896 }
1897 }
1898 }
1899 }
1900
1901 if self.should_track_dependencies() {
1902 let cell_dependencies = task.iter_cell_dependencies().collect();
1904 task.set_outdated_cell_dependencies(cell_dependencies);
1905
1906 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1907 task.set_outdated_output_dependencies(outdated_output_dependencies);
1908 }
1909 }
1910
1911 let (span, future) = match task_type {
1912 TaskType::Cached(task_type) => {
1913 let CachedTaskType {
1914 native_fn,
1915 this,
1916 arg,
1917 } = &*task_type;
1918 (
1919 native_fn.span(task_id.persistence(), execution_reason, priority),
1920 native_fn.execute(*this, &**arg),
1921 )
1922 }
1923 TaskType::Transient(task_type) => {
1924 let span = tracing::trace_span!("turbo_tasks::root_task");
1925 let future = match &*task_type {
1926 TransientTask::Root(f) => f(),
1927 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1928 };
1929 (span, future)
1930 }
1931 };
1932 Some(TaskExecutionSpec { future, span })
1933 }
1934
1935 fn task_execution_completed(
1939 &self,
1940 task_id: TaskId,
1941 result: Result<RawVc, TurboTasksExecutionError>,
1942 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1943 #[cfg(feature = "verify_determinism")] stateful: bool,
1944 has_invalidator: bool,
1945 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1946 ) -> Option<TaskPriority> {
1947 #[cfg(not(feature = "trace_task_details"))]
1962 let span = tracing::trace_span!(
1963 "task execution completed",
1964 new_children = tracing::field::Empty
1965 )
1966 .entered();
1967 #[cfg(feature = "trace_task_details")]
1968 let span = tracing::trace_span!(
1969 "task execution completed",
1970 task_id = display(task_id),
1971 result = match result.as_ref() {
1972 Ok(value) => display(either::Either::Left(value)),
1973 Err(err) => display(either::Either::Right(err)),
1974 },
1975 new_children = tracing::field::Empty,
1976 immutable = tracing::field::Empty,
1977 new_output = tracing::field::Empty,
1978 output_dependents = tracing::field::Empty,
1979 aggregation_number = tracing::field::Empty,
1980 stale = tracing::field::Empty,
1981 )
1982 .entered();
1983
1984 let is_error = result.is_err();
1985
1986 let mut ctx = self.execute_context(turbo_tasks);
1987
1988 let TaskExecutionCompletePrepareResult {
1989 new_children,
1990 is_now_immutable,
1991 #[cfg(feature = "verify_determinism")]
1992 no_output_set,
1993 new_output,
1994 output_dependent_tasks,
1995 is_recomputation,
1996 is_session_dependent,
1997 } = match self.task_execution_completed_prepare(
1998 &mut ctx,
1999 #[cfg(feature = "trace_task_details")]
2000 &span,
2001 task_id,
2002 result,
2003 cell_counters,
2004 #[cfg(feature = "verify_determinism")]
2005 stateful,
2006 has_invalidator,
2007 ) {
2008 Ok(r) => r,
2009 Err(stale_priority) => {
2010 #[cfg(feature = "trace_task_details")]
2012 span.record("stale", "prepare");
2013 return Some(stale_priority);
2014 }
2015 };
2016
2017 #[cfg(feature = "trace_task_details")]
2018 span.record("new_output", new_output.is_some());
2019 #[cfg(feature = "trace_task_details")]
2020 span.record("output_dependents", output_dependent_tasks.len());
2021
2022 if !output_dependent_tasks.is_empty() {
2027 self.task_execution_completed_invalidate_output_dependent(
2028 &mut ctx,
2029 task_id,
2030 output_dependent_tasks,
2031 );
2032 }
2033
2034 let has_new_children = !new_children.is_empty();
2035 span.record("new_children", new_children.len());
2036
2037 if has_new_children {
2038 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2039 }
2040
2041 if has_new_children
2042 && let Some(stale_priority) =
2043 self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2044 {
2045 #[cfg(feature = "trace_task_details")]
2047 span.record("stale", "connect");
2048 return Some(stale_priority);
2049 }
2050
2051 let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2052 &mut ctx,
2053 task_id,
2054 #[cfg(feature = "verify_determinism")]
2055 no_output_set,
2056 new_output,
2057 is_now_immutable,
2058 is_session_dependent,
2059 );
2060 if let Some(stale_priority) = stale_priority {
2061 #[cfg(feature = "trace_task_details")]
2063 span.record("stale", "finish");
2064 return Some(stale_priority);
2065 }
2066
2067 let removed_data = self.task_execution_completed_cleanup(
2068 &mut ctx,
2069 task_id,
2070 cell_counters,
2071 is_error,
2072 is_recomputation,
2073 );
2074
2075 drop(removed_data);
2077 drop(in_progress_cells);
2078
2079 None
2080 }
2081
2082 fn task_execution_completed_prepare(
2083 &self,
2084 ctx: &mut impl ExecuteContext<'_>,
2085 #[cfg(feature = "trace_task_details")] span: &Span,
2086 task_id: TaskId,
2087 result: Result<RawVc, TurboTasksExecutionError>,
2088 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2089 #[cfg(feature = "verify_determinism")] stateful: bool,
2090 has_invalidator: bool,
2091 ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2092 let mut task = ctx.task(task_id, TaskDataCategory::All);
2093 let is_recomputation = task.is_dirty().is_none();
2094 let is_session_dependent = self.should_track_dependencies()
2097 && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2098 let Some(in_progress) = task.get_in_progress_mut() else {
2099 panic!("Task execution completed, but task is not in progress: {task:#?}");
2100 };
2101 if matches!(in_progress, InProgressState::Canceled) {
2102 return Ok(TaskExecutionCompletePrepareResult {
2103 new_children: Default::default(),
2104 is_now_immutable: false,
2105 #[cfg(feature = "verify_determinism")]
2106 no_output_set: false,
2107 new_output: None,
2108 output_dependent_tasks: Default::default(),
2109 is_recomputation,
2110 is_session_dependent,
2111 });
2112 }
2113 let &mut InProgressState::InProgress(box InProgressStateInner {
2114 stale,
2115 ref mut new_children,
2116 once_task: is_once_task,
2117 ..
2118 }) = in_progress
2119 else {
2120 panic!("Task execution completed, but task is not in progress: {task:#?}");
2121 };
2122
2123 #[cfg(not(feature = "no_fast_stale"))]
2125 if stale && !is_once_task {
2126 let stale_priority = compute_stale_priority(&task);
2127 let Some(InProgressState::InProgress(box InProgressStateInner {
2128 done_event,
2129 mut new_children,
2130 ..
2131 })) = task.take_in_progress()
2132 else {
2133 unreachable!();
2134 };
2135 let old = task.set_in_progress(InProgressState::Scheduled {
2136 done_event,
2137 reason: TaskExecutionReason::Stale,
2138 });
2139 debug_assert!(old.is_none(), "InProgress already exists");
2140 for task in task.iter_children() {
2143 new_children.remove(&task);
2144 }
2145 drop(task);
2146
2147 AggregationUpdateQueue::run(
2150 AggregationUpdateJob::DecreaseActiveCounts {
2151 task_ids: new_children.into_iter().collect(),
2152 },
2153 ctx,
2154 );
2155 return Err(stale_priority);
2156 }
2157
2158 let mut new_children = take(new_children);
2160
2161 #[cfg(feature = "verify_determinism")]
2163 if stateful {
2164 task.set_stateful(true);
2165 }
2166
2167 if has_invalidator {
2169 task.set_invalidator(true);
2170 }
2171
2172 if result.is_ok() || is_recomputation {
2182 let old_counters: FxHashMap<_, _> = task
2183 .iter_cell_type_max_index()
2184 .map(|(&k, &v)| (k, v))
2185 .collect();
2186 let mut counters_to_remove = old_counters.clone();
2187
2188 for (&cell_type, &max_index) in cell_counters.iter() {
2189 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2190 if old_max_index != max_index {
2191 task.insert_cell_type_max_index(cell_type, max_index);
2192 }
2193 } else {
2194 task.insert_cell_type_max_index(cell_type, max_index);
2195 }
2196 }
2197 for (cell_type, _) in counters_to_remove {
2198 task.remove_cell_type_max_index(&cell_type);
2199 }
2200 }
2201
2202 let mut queue = AggregationUpdateQueue::new();
2203
2204 let mut old_edges = Vec::new();
2205
2206 let has_children = !new_children.is_empty();
2207 let is_immutable = task.immutable();
2208 let task_dependencies_for_immutable =
2209 if !is_immutable
2211 && !is_session_dependent
2213 && !task.invalidator()
2215 && task.is_collectibles_dependencies_empty()
2217 {
2218 Some(
2219 task.iter_output_dependencies()
2221 .chain(task.iter_cell_dependencies().map(|dep| dep.cell_ref().task))
2222 .collect::<FxHashSet<_>>(),
2223 )
2224 } else {
2225 None
2226 };
2227
2228 if has_children {
2229 let _aggregation_number =
2231 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2232
2233 #[cfg(feature = "trace_task_details")]
2234 span.record("aggregation_number", _aggregation_number);
2235
2236 old_edges.extend(
2238 task.iter_children()
2239 .filter(|task| !new_children.remove(task))
2240 .map(OutdatedEdge::Child),
2241 );
2242 } else {
2243 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2244 }
2245
2246 old_edges.extend(
2247 task.iter_outdated_collectibles()
2248 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2249 );
2250
2251 if self.should_track_dependencies() {
2252 old_edges.extend(
2259 task.iter_outdated_cell_dependencies()
2260 .map(OutdatedEdge::CellDependency),
2261 );
2262 old_edges.extend(
2263 task.iter_outdated_output_dependencies()
2264 .map(OutdatedEdge::OutputDependency),
2265 );
2266 }
2267
2268 let current_output = task.get_output();
2270 #[cfg(feature = "verify_determinism")]
2271 let no_output_set = current_output.is_none();
2272 let new_output = match result {
2273 Ok(RawVc::TaskOutput(output_task_id)) => {
2274 if let Some(OutputValue::Output(current_task_id)) = current_output
2275 && *current_task_id == output_task_id
2276 {
2277 None
2278 } else {
2279 Some(OutputValue::Output(output_task_id))
2280 }
2281 }
2282 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2283 if let Some(OutputValue::Cell(CellRef {
2284 task: current_task_id,
2285 cell: current_cell,
2286 })) = current_output
2287 && *current_task_id == output_task_id
2288 && *current_cell == cell
2289 {
2290 None
2291 } else {
2292 Some(OutputValue::Cell(CellRef {
2293 task: output_task_id,
2294 cell,
2295 }))
2296 }
2297 }
2298 Ok(RawVc::LocalOutput(..)) => {
2299 panic!("Non-local tasks must not return a local Vc");
2300 }
2301 Err(err) => {
2302 if let Some(OutputValue::Error(old_error)) = current_output
2303 && **old_error == err
2304 {
2305 None
2306 } else {
2307 Some(OutputValue::Error(Arc::new((&err).into())))
2308 }
2309 }
2310 };
2311 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2312 if new_output.is_some() && ctx.should_track_dependencies() {
2314 output_dependent_tasks = task.iter_output_dependent().collect();
2315 }
2316
2317 drop(task);
2318
2319 let mut is_now_immutable = false;
2321 if let Some(dependencies) = task_dependencies_for_immutable
2322 && dependencies
2323 .iter()
2324 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2325 {
2326 is_now_immutable = true;
2327 }
2328 #[cfg(feature = "trace_task_details")]
2329 span.record("immutable", is_immutable || is_now_immutable);
2330
2331 if !queue.is_empty() || !old_edges.is_empty() {
2332 #[cfg(any(
2333 feature = "trace_task_completion",
2334 feature = "trace_aggregation_update_stats"
2335 ))]
2336 let _span =
2337 tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2338 .entered();
2339 #[cfg(feature = "trace_aggregation_update_stats")]
2343 {
2344 let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2345 _span.record("stats", tracing::field::debug(stats));
2346 }
2347 #[cfg(not(feature = "trace_aggregation_update_stats"))]
2348 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2349 }
2350
2351 Ok(TaskExecutionCompletePrepareResult {
2352 new_children,
2353 is_now_immutable,
2354 #[cfg(feature = "verify_determinism")]
2355 no_output_set,
2356 new_output,
2357 output_dependent_tasks,
2358 is_recomputation,
2359 is_session_dependent,
2360 })
2361 }
2362
2363 fn task_execution_completed_invalidate_output_dependent(
2364 &self,
2365 ctx: &mut impl ExecuteContext<'_>,
2366 task_id: TaskId,
2367 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2368 ) {
2369 debug_assert!(!output_dependent_tasks.is_empty());
2370
2371 #[cfg(feature = "trace_task_dirty")]
2372 let task_description = self.debug_get_task_description(task_id);
2373
2374 if output_dependent_tasks.len() > 1 {
2375 ctx.prepare_tasks(
2376 output_dependent_tasks
2377 .iter()
2378 .map(|&id| (id, TaskDataCategory::All)),
2379 "invalidate output dependents",
2380 );
2381 }
2382
2383 fn process_output_dependents(
2384 ctx: &mut impl ExecuteContext<'_>,
2385 task_id: TaskId,
2386 #[cfg(feature = "trace_task_dirty")] task_description: &str,
2387 dependent_task_id: TaskId,
2388 queue: &mut AggregationUpdateQueue,
2389 ) {
2390 #[cfg(feature = "trace_task_output_dependencies")]
2391 let span = tracing::trace_span!(
2392 "invalidate output dependency",
2393 task = %task_id,
2394 dependent_task = %dependent_task_id,
2395 result = tracing::field::Empty,
2396 )
2397 .entered();
2398 let mut make_stale = true;
2399 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2400 let transient_task_type = dependent.get_transient_task_type();
2401 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2402 #[cfg(feature = "trace_task_output_dependencies")]
2404 span.record("result", "once task");
2405 return;
2406 }
2407 if dependent.outdated_output_dependencies_contains(&task_id) {
2408 #[cfg(feature = "trace_task_output_dependencies")]
2409 span.record("result", "outdated dependency");
2410 make_stale = false;
2415 } else if !dependent.output_dependencies_contains(&task_id) {
2416 #[cfg(feature = "trace_task_output_dependencies")]
2419 span.record("result", "no backward dependency");
2420 return;
2421 }
2422 make_task_dirty_internal(
2423 dependent,
2424 dependent_task_id,
2425 make_stale,
2426 #[cfg(feature = "trace_task_dirty")]
2427 TaskDirtyCause::OutputChange {
2428 task_description: task_description.to_string(),
2429 },
2430 queue,
2431 ctx,
2432 );
2433 #[cfg(feature = "trace_task_output_dependencies")]
2434 span.record("result", "marked dirty");
2435 }
2436
2437 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2438 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2439 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2440 let _ = scope_and_block(chunks.len(), |scope| {
2441 for chunk in chunks {
2442 let child_ctx = ctx.child_context();
2443 #[cfg(feature = "trace_task_dirty")]
2444 let task_description = &task_description;
2445 scope.spawn(move || {
2446 let mut ctx = child_ctx.create();
2447 let mut queue = AggregationUpdateQueue::new();
2448 for dependent_task_id in chunk {
2449 process_output_dependents(
2450 &mut ctx,
2451 task_id,
2452 #[cfg(feature = "trace_task_dirty")]
2453 task_description,
2454 dependent_task_id,
2455 &mut queue,
2456 )
2457 }
2458 queue.execute(&mut ctx);
2459 });
2460 }
2461 });
2462 } else {
2463 let mut queue = AggregationUpdateQueue::new();
2464 for dependent_task_id in output_dependent_tasks {
2465 process_output_dependents(
2466 ctx,
2467 task_id,
2468 #[cfg(feature = "trace_task_dirty")]
2469 &task_description,
2470 dependent_task_id,
2471 &mut queue,
2472 );
2473 }
2474 queue.execute(ctx);
2475 }
2476 }
2477
2478 fn task_execution_completed_unfinished_children_dirty(
2479 &self,
2480 ctx: &mut impl ExecuteContext<'_>,
2481 new_children: &FxHashSet<TaskId>,
2482 ) {
2483 debug_assert!(!new_children.is_empty());
2484
2485 let mut queue = AggregationUpdateQueue::new();
2486 ctx.for_each_task_all(
2487 new_children.iter().copied(),
2488 "unfinished children dirty",
2489 |child_task, ctx| {
2490 if !child_task.has_output() {
2491 let child_id = child_task.id();
2492 make_task_dirty_internal(
2493 child_task,
2494 child_id,
2495 false,
2496 #[cfg(feature = "trace_task_dirty")]
2497 TaskDirtyCause::InitialDirty,
2498 &mut queue,
2499 ctx,
2500 );
2501 }
2502 },
2503 );
2504
2505 queue.execute(ctx);
2506 }
2507
2508 fn task_execution_completed_connect(
2509 &self,
2510 ctx: &mut impl ExecuteContext<'_>,
2511 task_id: TaskId,
2512 new_children: FxHashSet<TaskId>,
2513 ) -> Option<TaskPriority> {
2514 debug_assert!(!new_children.is_empty());
2515
2516 let mut task = ctx.task(task_id, TaskDataCategory::All);
2517 let Some(in_progress) = task.get_in_progress() else {
2518 panic!("Task execution completed, but task is not in progress: {task:#?}");
2519 };
2520 if matches!(in_progress, InProgressState::Canceled) {
2521 return None;
2523 }
2524 let InProgressState::InProgress(box InProgressStateInner {
2525 #[cfg(not(feature = "no_fast_stale"))]
2526 stale,
2527 once_task: is_once_task,
2528 ..
2529 }) = in_progress
2530 else {
2531 panic!("Task execution completed, but task is not in progress: {task:#?}");
2532 };
2533
2534 #[cfg(not(feature = "no_fast_stale"))]
2536 if *stale && !is_once_task {
2537 let stale_priority = compute_stale_priority(&task);
2538 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2539 task.take_in_progress()
2540 else {
2541 unreachable!();
2542 };
2543 let old = task.set_in_progress(InProgressState::Scheduled {
2544 done_event,
2545 reason: TaskExecutionReason::Stale,
2546 });
2547 debug_assert!(old.is_none(), "InProgress already exists");
2548 drop(task);
2549
2550 AggregationUpdateQueue::run(
2553 AggregationUpdateJob::DecreaseActiveCounts {
2554 task_ids: new_children.into_iter().collect(),
2555 },
2556 ctx,
2557 );
2558 return Some(stale_priority);
2559 }
2560
2561 let has_active_count = ctx.should_track_activeness()
2562 && task
2563 .get_activeness()
2564 .is_some_and(|activeness| activeness.active_counter > 0);
2565 connect_children(
2566 ctx,
2567 task_id,
2568 task,
2569 new_children,
2570 has_active_count,
2571 ctx.should_track_activeness(),
2572 );
2573
2574 None
2575 }
2576
2577 #[allow(clippy::type_complexity)]
2578 fn task_execution_completed_finish(
2579 &self,
2580 ctx: &mut impl ExecuteContext<'_>,
2581 task_id: TaskId,
2582 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2583 new_output: Option<OutputValue>,
2584 is_now_immutable: bool,
2585 is_session_dependent: bool,
2586 ) -> (
2587 Option<TaskPriority>,
2588 Option<
2589 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2590 >,
2591 ) {
2592 let mut task = ctx.task(task_id, TaskDataCategory::All);
2593 let Some(in_progress) = task.take_in_progress() else {
2594 panic!("Task execution completed, but task is not in progress: {task:#?}");
2595 };
2596 if matches!(in_progress, InProgressState::Canceled) {
2597 return (None, None);
2599 }
2600 let InProgressState::InProgress(box InProgressStateInner {
2601 done_event,
2602 once_task: is_once_task,
2603 stale,
2604 marked_as_completed: _,
2605 new_children,
2606 }) = in_progress
2607 else {
2608 panic!("Task execution completed, but task is not in progress: {task:#?}");
2609 };
2610 debug_assert!(new_children.is_empty());
2611
2612 if stale && !is_once_task {
2614 let stale_priority = compute_stale_priority(&task);
2615 let old = task.set_in_progress(InProgressState::Scheduled {
2616 done_event,
2617 reason: TaskExecutionReason::Stale,
2618 });
2619 debug_assert!(old.is_none(), "InProgress already exists");
2620 return (Some(stale_priority), None);
2621 }
2622
2623 let mut old_content = None;
2625 if let Some(value) = new_output {
2626 old_content = task.set_output(value);
2627 }
2628
2629 if is_now_immutable {
2632 task.set_immutable(true);
2633 }
2634
2635 let in_progress_cells = task.take_in_progress_cells();
2637 if let Some(ref cells) = in_progress_cells {
2638 for state in cells.values() {
2639 state.event.notify(usize::MAX);
2640 }
2641 }
2642
2643 let new_dirtyness = if is_session_dependent {
2645 Some(Dirtyness::SessionDependent)
2646 } else {
2647 None
2648 };
2649 #[cfg(feature = "verify_determinism")]
2650 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2651 let data_update = task.update_dirty_state(new_dirtyness);
2652
2653 #[cfg(feature = "verify_determinism")]
2657 let stale_priority: Option<TaskPriority> =
2658 ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2659 .then(TaskPriority::leaf);
2660 #[cfg(not(feature = "verify_determinism"))]
2661 let stale_priority: Option<TaskPriority> = None;
2662 if stale_priority.is_some() {
2663 let old = task.set_in_progress(InProgressState::Scheduled {
2664 done_event,
2665 reason: TaskExecutionReason::Stale,
2666 });
2667 debug_assert!(old.is_none(), "InProgress already exists");
2668 drop(task);
2669 } else {
2670 drop(task);
2671
2672 done_event.notify(usize::MAX);
2674 }
2675
2676 drop(old_content);
2677
2678 if let Some(data_update) = data_update {
2679 AggregationUpdateQueue::run(data_update, ctx);
2680 }
2681
2682 (stale_priority, in_progress_cells)
2684 }
2685
2686 fn task_execution_completed_cleanup(
2687 &self,
2688 ctx: &mut impl ExecuteContext<'_>,
2689 task_id: TaskId,
2690 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2691 is_error: bool,
2692 is_recomputation: bool,
2693 ) -> Vec<SharedReference> {
2694 let mut task = ctx.task(task_id, TaskDataCategory::All);
2695 let mut removed_cell_data = Vec::new();
2696 if !is_error || is_recomputation {
2702 let to_remove: Vec<_> = task
2708 .iter_cell_data()
2709 .filter_map(|(cell, _)| {
2710 cell_counters
2711 .get(&cell.type_id)
2712 .is_none_or(|start_index| cell.index >= *start_index)
2713 .then_some(*cell)
2714 })
2715 .collect();
2716 removed_cell_data.reserve_exact(to_remove.len());
2717 for cell in to_remove {
2718 if let Some(data) = task.remove_cell_data(&cell) {
2719 removed_cell_data.push(data);
2720 }
2721 }
2722 let to_remove_hash: Vec<_> = task
2724 .iter_cell_data_hash()
2725 .filter_map(|(cell, _)| {
2726 cell_counters
2727 .get(&cell.type_id)
2728 .is_none_or(|start_index| cell.index >= *start_index)
2729 .then_some(*cell)
2730 })
2731 .collect();
2732 for cell in to_remove_hash {
2733 task.remove_cell_data_hash(&cell);
2734 }
2735 }
2736
2737 task.cleanup_after_execution();
2741
2742 drop(task);
2743
2744 removed_cell_data
2746 }
2747
2748 fn log_unrecoverable_persist_error() {
2751 eprintln!(
2752 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2753 background persisting process."
2754 );
2755 }
2756
2757 fn run_backend_job<'a>(
2758 self: &'a Arc<Self>,
2759 job: TurboTasksBackendJob,
2760 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2761 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2762 Box::pin(async move {
2763 match job {
2764 TurboTasksBackendJob::Snapshot => {
2765 debug_assert!(self.should_persist());
2766
2767 let mut last_snapshot = self.start_time;
2768 let mut idle_start_listener = self.idle_start_event.listen();
2769 let mut idle_end_listener = self.idle_end_event.listen();
2770 let mut fresh_idle = true;
2773 let mut evicted = false;
2774 let mut is_first = true;
2775 'outer: loop {
2776 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2777 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2778 let idle_timeout = *IDLE_TIMEOUT;
2779 let (time, mut reason) = if is_first {
2780 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2781 } else {
2782 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2783 };
2784
2785 let until = last_snapshot + time;
2786 if until > Instant::now() {
2787 let mut stop_listener = self.stopping_event.listen();
2788 if self.stopping.load(Ordering::Acquire) {
2789 return;
2790 }
2791 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2792 Instant::now() + idle_timeout
2793 } else {
2794 far_future()
2795 };
2796 loop {
2797 tokio::select! {
2798 _ = &mut stop_listener => {
2799 return;
2800 },
2801 _ = &mut idle_start_listener => {
2802 idle_time = Instant::now() + idle_timeout;
2803 idle_start_listener = self.idle_start_event.listen()
2804 },
2805 _ = &mut idle_end_listener => {
2806 idle_time = far_future();
2807 idle_end_listener = self.idle_end_event.listen()
2808 },
2809 _ = tokio::time::sleep_until(until) => {
2810 break;
2811 },
2812 _ = tokio::time::sleep_until(idle_time) => {
2813 if turbo_tasks.is_idle() {
2814 reason = "idle timeout";
2815 break;
2816 }
2817 },
2818 }
2819 }
2820 }
2821
2822 let this = self.clone();
2823 let background_span =
2827 tracing::info_span!(parent: None, "background snapshot");
2828 match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2829 Err(err) => {
2830 eprintln!("Persisting failed: {err:?}");
2833 Self::log_unrecoverable_persist_error();
2834 return;
2835 }
2836 Ok((snapshot_start, new_data)) => {
2837 fresh_idle = new_data;
2839 is_first = false;
2840 last_snapshot = snapshot_start;
2841
2842 macro_rules! check_idle_ended {
2846 () => {{
2847 tokio::select! {
2848 biased;
2849 _ = &mut idle_end_listener => {
2850 idle_end_listener = self.idle_end_event.listen();
2851 true
2852 },
2853 _ = std::future::ready(()) => false,
2854 }
2855 }};
2856 }
2857 let mut ran_eviction = false;
2875 if this.should_evict() && (new_data || !evicted) {
2876 if check_idle_ended!() {
2877 continue 'outer;
2880 }
2881 evicted = true;
2882 ran_eviction = true;
2883 this.storage.evict_after_snapshot(background_span.id());
2884 }
2885
2886 let mut ran_compaction = false;
2893 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2894 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2895 if check_idle_ended!() {
2896 continue 'outer;
2897 }
2898 let _compact_span = tracing::info_span!(
2902 parent: background_span.id(),
2903 "compact database"
2904 )
2905 .entered();
2906 match self.backing_storage.compact() {
2907 Ok(true) => {
2908 ran_compaction = true;
2909 }
2910 Ok(false) => break,
2911 Err(err) => {
2912 eprintln!("Compaction failed: {err:?}");
2913 if self.backing_storage.has_unrecoverable_write_error()
2914 {
2915 Self::log_unrecoverable_persist_error();
2916 return;
2917 }
2918 break;
2919 }
2920 }
2921 }
2922 if check_idle_ended!() {
2923 continue 'outer;
2924 }
2925 if new_data || ran_compaction || ran_eviction {
2930 turbo_tasks_malloc::TurboMalloc::collect(true);
2931 }
2932 }
2933 }
2934 }
2935 }
2936 }
2937 })
2938 }
2939
2940 fn try_read_own_task_cell(
2941 &self,
2942 task_id: TaskId,
2943 cell: CellId,
2944 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2945 ) -> Result<TypedCellContent> {
2946 let mut ctx = self.execute_context(turbo_tasks);
2947 let task = ctx.task(task_id, TaskDataCategory::Data);
2948 if let Some(content) = task.get_cell_data(&cell).cloned() {
2949 Ok(CellContent(Some(content)).into_typed(cell.type_id))
2950 } else {
2951 Ok(CellContent(None).into_typed(cell.type_id))
2952 }
2953 }
2954
2955 fn read_task_collectibles(
2956 &self,
2957 task_id: TaskId,
2958 collectible_type: TraitTypeId,
2959 reader_id: Option<TaskId>,
2960 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2961 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2962 let mut ctx = self.execute_context(turbo_tasks);
2963 let mut collectibles = AutoMap::default();
2964 {
2965 let mut task = ctx.task(task_id, TaskDataCategory::All);
2966 if task
2967 .get_persistent_task_type()
2968 .is_some_and(|t| !t.native_fn.is_root)
2969 {
2970 drop(task);
2971 panic!(
2972 "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
2973 is missing on the task.",
2974 self.debug_get_task_description(task_id),
2975 reader_id.map_or_else(
2976 || "unknown".to_string(),
2977 |r| self.debug_get_task_description(r)
2978 )
2979 );
2980 }
2981 for (collectible, count) in task.iter_aggregated_collectibles() {
2982 if *count > 0 && collectible.collectible_type == collectible_type {
2983 *collectibles
2984 .entry(RawVc::TaskCell(
2985 collectible.cell.task,
2986 collectible.cell.cell,
2987 ))
2988 .or_insert(0) += 1;
2989 }
2990 }
2991 for (&collectible, &count) in task.iter_collectibles() {
2992 if collectible.collectible_type == collectible_type {
2993 *collectibles
2994 .entry(RawVc::TaskCell(
2995 collectible.cell.task,
2996 collectible.cell.cell,
2997 ))
2998 .or_insert(0) += count;
2999 }
3000 }
3001 if let Some(reader_id) = reader_id {
3002 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3003 }
3004 }
3005 if let Some(reader_id) = reader_id {
3006 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3007 let target = CollectiblesRef {
3008 task: task_id,
3009 collectible_type,
3010 };
3011 if !reader.remove_outdated_collectibles_dependencies(&target) {
3012 let _ = reader.add_collectibles_dependencies(target);
3013 }
3014 }
3015 collectibles
3016 }
3017
3018 fn emit_collectible(
3019 &self,
3020 collectible_type: TraitTypeId,
3021 collectible: RawVc,
3022 task_id: TaskId,
3023 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3024 ) {
3025 self.assert_valid_collectible(task_id, collectible);
3026
3027 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3028 panic!("Collectibles need to be resolved");
3029 };
3030 let cell = CellRef {
3031 task: collectible_task,
3032 cell,
3033 };
3034 operation::UpdateCollectibleOperation::run(
3035 task_id,
3036 CollectibleRef {
3037 collectible_type,
3038 cell,
3039 },
3040 1,
3041 self.execute_context(turbo_tasks),
3042 );
3043 }
3044
3045 fn unemit_collectible(
3046 &self,
3047 collectible_type: TraitTypeId,
3048 collectible: RawVc,
3049 count: u32,
3050 task_id: TaskId,
3051 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3052 ) {
3053 self.assert_valid_collectible(task_id, collectible);
3054
3055 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3056 panic!("Collectibles need to be resolved");
3057 };
3058 let cell = CellRef {
3059 task: collectible_task,
3060 cell,
3061 };
3062 operation::UpdateCollectibleOperation::run(
3063 task_id,
3064 CollectibleRef {
3065 collectible_type,
3066 cell,
3067 },
3068 -(i32::try_from(count).unwrap()),
3069 self.execute_context(turbo_tasks),
3070 );
3071 }
3072
3073 fn update_task_cell(
3074 &self,
3075 task_id: TaskId,
3076 cell: CellId,
3077 content: CellContent,
3078 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3079 content_hash: Option<CellHash>,
3080 verification_mode: VerificationMode,
3081 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3082 ) {
3083 operation::UpdateCellOperation::run(
3084 task_id,
3085 cell,
3086 content,
3087 updated_key_hashes,
3088 content_hash,
3089 verification_mode,
3090 self.execute_context(turbo_tasks),
3091 );
3092 }
3093
3094 fn mark_own_task_as_finished(
3095 &self,
3096 task: TaskId,
3097 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3098 ) {
3099 let mut ctx = self.execute_context(turbo_tasks);
3100 let mut task = ctx.task(task, TaskDataCategory::Data);
3101 if let Some(InProgressState::InProgress(box InProgressStateInner {
3102 marked_as_completed,
3103 ..
3104 })) = task.get_in_progress_mut()
3105 {
3106 *marked_as_completed = true;
3107 }
3112 }
3113
3114 fn connect_task(
3115 &self,
3116 task: TaskId,
3117 parent_task: Option<TaskId>,
3118 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3119 ) {
3120 self.assert_not_persistent_calling_transient(parent_task, task, None);
3121 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3122 }
3123
3124 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3125 let task_id = self.transient_task_id_factory.get();
3126 {
3127 let mut task = self.storage.access_mut(task_id);
3128 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3129 }
3130 #[cfg(feature = "verify_aggregation_graph")]
3131 self.root_tasks.lock().insert(task_id);
3132 task_id
3133 }
3134
3135 fn dispose_root_task(
3136 &self,
3137 task_id: TaskId,
3138 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3139 ) {
3140 #[cfg(feature = "verify_aggregation_graph")]
3141 self.root_tasks.lock().remove(&task_id);
3142
3143 let mut ctx = self.execute_context(turbo_tasks);
3144 let mut task = ctx.task(task_id, TaskDataCategory::All);
3145 let is_dirty = task.is_dirty();
3146 let has_dirty_containers = task.has_dirty_containers();
3147 if is_dirty.is_some() || has_dirty_containers {
3148 if let Some(activeness_state) = task.get_activeness_mut() {
3149 activeness_state.unset_root_type();
3151 activeness_state.set_active_until_clean();
3152 };
3153 } else if let Some(activeness_state) = task.take_activeness() {
3154 activeness_state.all_clean_event.notify(usize::MAX);
3157 }
3158 }
3159
3160 #[cfg(feature = "verify_aggregation_graph")]
3161 fn verify_aggregation_graph(
3162 &self,
3163 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3164 idle: bool,
3165 ) {
3166 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3167 return;
3168 }
3169 use std::{collections::VecDeque, env, io::stdout};
3170
3171 use crate::backend::operation::{get_uppers, is_aggregating_node};
3172
3173 let mut ctx = self.execute_context(turbo_tasks);
3174 let root_tasks = self.root_tasks.lock().clone();
3175
3176 for task_id in root_tasks.into_iter() {
3177 let mut queue = VecDeque::new();
3178 let mut visited = FxHashSet::default();
3179 let mut aggregated_nodes = FxHashSet::default();
3180 let mut collectibles = FxHashMap::default();
3181 let root_task_id = task_id;
3182 visited.insert(task_id);
3183 aggregated_nodes.insert(task_id);
3184 queue.push_back(task_id);
3185 let mut counter = 0;
3186 while let Some(task_id) = queue.pop_front() {
3187 counter += 1;
3188 if counter % 100000 == 0 {
3189 println!(
3190 "queue={}, visited={}, aggregated_nodes={}",
3191 queue.len(),
3192 visited.len(),
3193 aggregated_nodes.len()
3194 );
3195 }
3196 let task = ctx.task(task_id, TaskDataCategory::All);
3197 if idle && !self.is_idle.load(Ordering::Relaxed) {
3198 return;
3199 }
3200
3201 let uppers = get_uppers(&task);
3202 if task_id != root_task_id
3203 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3204 {
3205 panic!(
3206 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3207 {:?})",
3208 task_id,
3209 task.get_task_description(),
3210 uppers
3211 );
3212 }
3213
3214 for (collectible, _) in task.iter_aggregated_collectibles() {
3215 collectibles
3216 .entry(*collectible)
3217 .or_insert_with(|| (false, Vec::new()))
3218 .1
3219 .push(task_id);
3220 }
3221
3222 for (&collectible, &value) in task.iter_collectibles() {
3223 if value > 0 {
3224 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3225 *flag = true
3226 } else {
3227 panic!(
3228 "Task {} has a collectible {:?} that is not in any upper task",
3229 task_id, collectible
3230 );
3231 }
3232 }
3233 }
3234
3235 let is_dirty = task.has_dirty();
3236 let has_dirty_container = task.has_dirty_containers();
3237 let should_be_in_upper = is_dirty || has_dirty_container;
3238
3239 let aggregation_number = get_aggregation_number(&task);
3240 if is_aggregating_node(aggregation_number) {
3241 aggregated_nodes.insert(task_id);
3242 }
3243 for child_id in task.iter_children() {
3250 if visited.insert(child_id) {
3252 queue.push_back(child_id);
3253 }
3254 }
3255 drop(task);
3256
3257 if should_be_in_upper {
3258 for upper_id in uppers {
3259 let upper = ctx.task(upper_id, TaskDataCategory::All);
3260 let in_upper = upper
3261 .get_aggregated_dirty_containers(&task_id)
3262 .is_some_and(|&dirty| dirty > 0);
3263 if !in_upper {
3264 let containers: Vec<_> = upper
3265 .iter_aggregated_dirty_containers()
3266 .map(|(&k, &v)| (k, v))
3267 .collect();
3268 let upper_task_desc = upper.get_task_description();
3269 drop(upper);
3270 panic!(
3271 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3272 ({})\nThese dirty containers are present:\n{:#?}",
3273 task_id,
3274 ctx.task(task_id, TaskDataCategory::Data)
3275 .get_task_description(),
3276 upper_id,
3277 upper_task_desc,
3278 containers,
3279 );
3280 }
3281 }
3282 }
3283 }
3284
3285 for (collectible, (flag, task_ids)) in collectibles {
3286 if !flag {
3287 use std::io::Write;
3288 let mut stdout = stdout().lock();
3289 writeln!(
3290 stdout,
3291 "{:?} that is not emitted in any child task but in these aggregated \
3292 tasks: {:#?}",
3293 collectible,
3294 task_ids
3295 .iter()
3296 .map(|t| format!(
3297 "{t} {}",
3298 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3299 ))
3300 .collect::<Vec<_>>()
3301 )
3302 .unwrap();
3303
3304 let task_id = collectible.cell.task;
3305 let mut queue = {
3306 let task = ctx.task(task_id, TaskDataCategory::All);
3307 get_uppers(&task)
3308 };
3309 let mut visited = FxHashSet::default();
3310 for &upper_id in queue.iter() {
3311 visited.insert(upper_id);
3312 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3313 }
3314 while let Some(task_id) = queue.pop() {
3315 let task = ctx.task(task_id, TaskDataCategory::All);
3316 let desc = task.get_task_description();
3317 let aggregated_collectible = task
3318 .get_aggregated_collectibles(&collectible)
3319 .copied()
3320 .unwrap_or_default();
3321 let uppers = get_uppers(&task);
3322 drop(task);
3323 writeln!(
3324 stdout,
3325 "upper {task_id} {desc} collectible={aggregated_collectible}"
3326 )
3327 .unwrap();
3328 if task_ids.contains(&task_id) {
3329 writeln!(
3330 stdout,
3331 "Task has an upper connection to an aggregated task that doesn't \
3332 reference it. Upper connection is invalid!"
3333 )
3334 .unwrap();
3335 }
3336 for upper_id in uppers {
3337 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3338 if !visited.contains(&upper_id) {
3339 queue.push(upper_id);
3340 }
3341 }
3342 }
3343 panic!("See stdout for more details");
3344 }
3345 }
3346 }
3347 }
3348
3349 fn assert_not_persistent_calling_transient(
3350 &self,
3351 parent_id: Option<TaskId>,
3352 child_id: TaskId,
3353 cell_id: Option<CellId>,
3354 ) {
3355 if let Some(parent_id) = parent_id
3356 && !parent_id.is_transient()
3357 && child_id.is_transient()
3358 {
3359 self.panic_persistent_calling_transient(
3360 self.debug_get_task_description(parent_id),
3361 self.debug_get_cached_task_type(child_id).as_deref(),
3362 cell_id,
3363 );
3364 }
3365 }
3366
3367 fn panic_persistent_calling_transient(
3368 &self,
3369 parent: String,
3370 child: Option<&CachedTaskType>,
3371 cell_id: Option<CellId>,
3372 ) -> ! {
3373 let transient_reason = if let Some(child) = child {
3374 Cow::Owned(format!(
3375 " The callee is transient because it depends on:\n{}",
3376 self.debug_trace_transient_task(child, cell_id),
3377 ))
3378 } else {
3379 Cow::Borrowed("")
3380 };
3381 panic!(
3382 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3383 parent,
3384 child.map_or("unknown", |t| t.get_name()),
3385 transient_reason,
3386 );
3387 }
3388
3389 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3390 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3392 let task_info = if let Some(col_task_ty) = collectible
3394 .try_get_task_id()
3395 .map(|t| self.debug_get_task_description(t))
3396 {
3397 Cow::Owned(format!(" (return type of {col_task_ty})"))
3398 } else {
3399 Cow::Borrowed("")
3400 };
3401 panic!("Collectible{task_info} must be a ResolvedVc")
3402 };
3403 if col_task_id.is_transient() && !task_id.is_transient() {
3404 let transient_reason =
3405 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3406 Cow::Owned(format!(
3407 ". The collectible is transient because it depends on:\n{}",
3408 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3409 ))
3410 } else {
3411 Cow::Borrowed("")
3412 };
3413 panic!(
3415 "Collectible is transient, transient collectibles cannot be emitted from \
3416 persistent tasks{transient_reason}",
3417 )
3418 }
3419 }
3420}
3421
3422impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3423 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3424 self.0.startup(turbo_tasks);
3425 }
3426
3427 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3428 self.0.stopping();
3429 }
3430
3431 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3432 self.0.stop(turbo_tasks);
3433 }
3434
3435 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3436 self.0.idle_start(turbo_tasks);
3437 }
3438
3439 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3440 self.0.idle_end();
3441 }
3442
3443 fn get_or_create_task(
3444 &self,
3445 native_fn: &'static NativeFunction,
3446 this: Option<RawVc>,
3447 arg: &mut dyn StackDynTaskInputs,
3448 parent_task: Option<TaskId>,
3449 persistence: TaskPersistence,
3450 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3451 ) -> TaskId {
3452 self.0
3453 .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3454 }
3455
3456 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3457 self.0.invalidate_task(task_id, turbo_tasks);
3458 }
3459
3460 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3461 self.0.invalidate_tasks(tasks, turbo_tasks);
3462 }
3463
3464 fn invalidate_tasks_set(
3465 &self,
3466 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3467 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3468 ) {
3469 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3470 }
3471
3472 fn invalidate_serialization(
3473 &self,
3474 task_id: TaskId,
3475 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3476 ) {
3477 self.0.invalidate_serialization(task_id, turbo_tasks);
3478 }
3479
3480 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3481 self.0.task_execution_canceled(task, turbo_tasks)
3482 }
3483
3484 fn try_start_task_execution(
3485 &self,
3486 task_id: TaskId,
3487 priority: TaskPriority,
3488 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3489 ) -> Option<TaskExecutionSpec<'_>> {
3490 self.0
3491 .try_start_task_execution(task_id, priority, turbo_tasks)
3492 }
3493
3494 fn task_execution_completed(
3495 &self,
3496 task_id: TaskId,
3497 result: Result<RawVc, TurboTasksExecutionError>,
3498 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3499 #[cfg(feature = "verify_determinism")] stateful: bool,
3500 has_invalidator: bool,
3501 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3502 ) -> Option<TaskPriority> {
3503 self.0.task_execution_completed(
3504 task_id,
3505 result,
3506 cell_counters,
3507 #[cfg(feature = "verify_determinism")]
3508 stateful,
3509 has_invalidator,
3510 turbo_tasks,
3511 )
3512 }
3513
3514 type BackendJob = TurboTasksBackendJob;
3515
3516 fn run_backend_job<'a>(
3517 &'a self,
3518 job: Self::BackendJob,
3519 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3520 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3521 self.0.run_backend_job(job, turbo_tasks)
3522 }
3523
3524 fn try_read_task_output(
3525 &self,
3526 task_id: TaskId,
3527 reader: Option<TaskId>,
3528 options: ReadOutputOptions,
3529 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3530 ) -> Result<Result<RawVc, EventListener>> {
3531 self.0
3532 .try_read_task_output(task_id, reader, options, turbo_tasks)
3533 }
3534
3535 fn try_read_task_cell(
3536 &self,
3537 task_id: TaskId,
3538 cell: CellId,
3539 reader: Option<TaskId>,
3540 options: ReadCellOptions,
3541 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3542 ) -> Result<Result<TypedCellContent, EventListener>> {
3543 self.0
3544 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3545 }
3546
3547 fn try_read_own_task_cell(
3548 &self,
3549 task_id: TaskId,
3550 cell: CellId,
3551 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3552 ) -> Result<TypedCellContent> {
3553 self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3554 }
3555
3556 fn read_task_collectibles(
3557 &self,
3558 task_id: TaskId,
3559 collectible_type: TraitTypeId,
3560 reader: Option<TaskId>,
3561 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3562 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3563 self.0
3564 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3565 }
3566
3567 fn emit_collectible(
3568 &self,
3569 collectible_type: TraitTypeId,
3570 collectible: RawVc,
3571 task_id: TaskId,
3572 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3573 ) {
3574 self.0
3575 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3576 }
3577
3578 fn unemit_collectible(
3579 &self,
3580 collectible_type: TraitTypeId,
3581 collectible: RawVc,
3582 count: u32,
3583 task_id: TaskId,
3584 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3585 ) {
3586 self.0
3587 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3588 }
3589
3590 fn update_task_cell(
3591 &self,
3592 task_id: TaskId,
3593 cell: CellId,
3594 content: CellContent,
3595 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3596 content_hash: Option<CellHash>,
3597 verification_mode: VerificationMode,
3598 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3599 ) {
3600 self.0.update_task_cell(
3601 task_id,
3602 cell,
3603 content,
3604 updated_key_hashes,
3605 content_hash,
3606 verification_mode,
3607 turbo_tasks,
3608 );
3609 }
3610
3611 fn mark_own_task_as_finished(
3612 &self,
3613 task_id: TaskId,
3614 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3615 ) {
3616 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3617 }
3618
3619 fn connect_task(
3620 &self,
3621 task: TaskId,
3622 parent_task: Option<TaskId>,
3623 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3624 ) {
3625 self.0.connect_task(task, parent_task, turbo_tasks);
3626 }
3627
3628 fn create_transient_task(
3629 &self,
3630 task_type: TransientTaskType,
3631 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3632 ) -> TaskId {
3633 self.0.create_transient_task(task_type)
3634 }
3635
3636 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3637 self.0.dispose_root_task(task_id, turbo_tasks);
3638 }
3639
3640 fn task_statistics(&self) -> &TaskStatisticsApi {
3641 &self.0.task_statistics
3642 }
3643
3644 fn is_tracking_dependencies(&self) -> bool {
3645 self.0.options.dependency_tracking
3646 }
3647
3648 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3649 self.0.get_task_name(task, turbo_tasks)
3650 }
3651}
3652
3653enum DebugTraceTransientTask {
3654 Cached {
3655 task_name: &'static str,
3656 cell_type_id: Option<ValueTypeId>,
3657 cause_self: Option<Box<DebugTraceTransientTask>>,
3658 cause_args: Vec<DebugTraceTransientTask>,
3659 },
3660 Collapsed {
3662 task_name: &'static str,
3663 cell_type_id: Option<ValueTypeId>,
3664 },
3665 Uncached {
3666 cell_type_id: Option<ValueTypeId>,
3667 },
3668}
3669
3670impl DebugTraceTransientTask {
3671 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3672 let indent = " ".repeat(level);
3673 f.write_str(&indent)?;
3674
3675 fn fmt_cell_type_id(
3676 f: &mut fmt::Formatter<'_>,
3677 cell_type_id: Option<ValueTypeId>,
3678 ) -> fmt::Result {
3679 if let Some(ty) = cell_type_id {
3680 write!(
3681 f,
3682 " (read cell of type {})",
3683 get_value_type(ty).ty.global_name
3684 )
3685 } else {
3686 Ok(())
3687 }
3688 }
3689
3690 match self {
3692 Self::Cached {
3693 task_name,
3694 cell_type_id,
3695 ..
3696 }
3697 | Self::Collapsed {
3698 task_name,
3699 cell_type_id,
3700 ..
3701 } => {
3702 f.write_str(task_name)?;
3703 fmt_cell_type_id(f, *cell_type_id)?;
3704 if matches!(self, Self::Collapsed { .. }) {
3705 f.write_str(" (collapsed)")?;
3706 }
3707 }
3708 Self::Uncached { cell_type_id } => {
3709 f.write_str("unknown transient task")?;
3710 fmt_cell_type_id(f, *cell_type_id)?;
3711 }
3712 }
3713 f.write_char('\n')?;
3714
3715 if let Self::Cached {
3717 cause_self,
3718 cause_args,
3719 ..
3720 } = self
3721 {
3722 if let Some(c) = cause_self {
3723 writeln!(f, "{indent} self:")?;
3724 c.fmt_indented(f, level + 1)?;
3725 }
3726 if !cause_args.is_empty() {
3727 writeln!(f, "{indent} args:")?;
3728 for c in cause_args {
3729 c.fmt_indented(f, level + 1)?;
3730 }
3731 }
3732 }
3733 Ok(())
3734 }
3735}
3736
3737impl fmt::Display for DebugTraceTransientTask {
3738 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3739 self.fmt_indented(f, 0)
3740 }
3741}
3742
3743fn far_future() -> Instant {
3745 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3750}
3751
3752fn encode_task_data(
3764 task: TaskId,
3765 data: &TaskStorage,
3766 category: SpecificTaskDataCategory,
3767 scratch_buffer: &mut TurboBincodeBuffer,
3768) -> Result<TurboBincodeBuffer> {
3769 scratch_buffer.clear();
3770 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3771 data.encode(category, &mut encoder)?;
3772
3773 if cfg!(feature = "verify_serialization") {
3774 TaskStorage::new()
3775 .decode(
3776 category,
3777 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3778 )
3779 .with_context(|| {
3780 format!(
3781 "expected to be able to decode serialized data for '{category:?}' information \
3782 for {task}"
3783 )
3784 })?;
3785 }
3786 Ok(SmallVec::from_slice(scratch_buffer))
3787}