1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9 borrow::Cow,
10 fmt::{self, Write},
11 future::Future,
12 hash::BuildHasherDefault,
13 mem::take,
14 pin::Pin,
15 sync::{
16 Arc, LazyLock,
17 atomic::{AtomicBool, Ordering},
18 },
19 time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32 CellId, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions,
33 ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, TaskExecutionReason,
34 TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic,
35 ValueTypeId,
36 backend::{
37 Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38 TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40 VerificationMode,
41 },
42 event::{Event, EventDescription, EventListener},
43 macro_helpers::NativeFunction,
44 message_queue::{TimingEvent, TraceEvent},
45 registry::get_value_type,
46 scope::scope_and_block,
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51#[cfg(feature = "task_dirty_cause")]
52use turbo_tasks::{FunctionId, TaskDirtyCause};
53
54pub use self::{
55 operation::AnyOperation,
56 storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
57};
58use crate::{
59 backend::{
60 operation::{
61 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64 connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65 prepare_new_children,
66 },
67 snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68 storage::Storage,
69 storage_schema::{TaskStorage, TaskStorageAccessors},
70 },
71 backing_storage::{BackingStorage, SnapshotItem, SnapshotMeta, compute_task_type_hash},
72 data::{
73 ActivenessState, CellDependency, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
74 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, TransientTask,
75 },
76 error::TaskError,
77 utils::{
78 dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79 shard_amount::compute_shard_amount,
80 },
81};
82
83const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92 .ok()
93 .and_then(|v| v.parse::<u64>().ok())
94 .map(Duration::from_millis)
95 .unwrap_or(Duration::from_secs(2))
96});
97
98fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105 TaskPriority::invalidation(
106 task.get_leaf_distance()
107 .copied()
108 .unwrap_or_default()
109 .distance,
110 )
111 .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115 ReadOnly,
117 ReadWrite,
120 ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126 pub dependency_tracking: bool,
131
132 pub active_tracking: bool,
138
139 pub storage_mode: Option<StorageMode>,
141
142 pub num_workers: Option<usize>,
145
146 pub small_preallocation: bool,
148
149 pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156 fn default() -> Self {
157 Self {
158 dependency_tracking: true,
159 active_tracking: true,
160 storage_mode: Some(StorageMode::ReadWrite),
161 num_workers: None,
162 small_preallocation: false,
163 evict_after_snapshot: false,
164 }
165 }
166}
167
168pub enum TurboTasksBackendJob {
169 Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175 options: BackendOptions,
176
177 start_time: Instant,
178
179 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182 storage: Storage,
183
184 snapshot_coord: SnapshotCoordinator,
187 snapshot_in_progress: Mutex<()>,
192
193 stopping: AtomicBool,
194 stopping_event: Event,
195 idle_start_event: Event,
196 idle_end_event: Event,
197 #[cfg(feature = "verify_aggregation_graph")]
198 is_idle: AtomicBool,
199
200 task_statistics: TaskStatisticsApi,
201
202 backing_storage: B,
203
204 #[cfg(feature = "verify_aggregation_graph")]
205 root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210 Self(Arc::new(TurboTasksBackendInner::new(
211 options,
212 backing_storage,
213 )))
214 }
215
216 pub fn backing_storage(&self) -> &B {
217 &self.0.backing_storage
218 }
219
220 pub fn snapshot_and_evict_for_testing(
227 &self,
228 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
229 ) -> (bool, EvictionCounts) {
230 self.0.snapshot_and_evict_for_testing(turbo_tasks)
231 }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237 if !options.dependency_tracking {
238 options.active_tracking = false;
239 }
240 let small_preallocation = options.small_preallocation;
241 let next_task_id = backing_storage
242 .next_free_task_id()
243 .expect("Failed to get task id");
244 Self {
245 options,
246 start_time: Instant::now(),
247 persisted_task_id_factory: IdFactoryWithReuse::new(
248 next_task_id,
249 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250 ),
251 transient_task_id_factory: IdFactoryWithReuse::new(
252 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253 TaskId::MAX,
254 ),
255 storage: Storage::new(shard_amount, small_preallocation),
256 snapshot_coord: SnapshotCoordinator::new(),
257 snapshot_in_progress: Mutex::new(()),
258 stopping: AtomicBool::new(false),
259 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262 #[cfg(feature = "verify_aggregation_graph")]
263 is_idle: AtomicBool::new(false),
264 task_statistics: TaskStatisticsApi::default(),
265 backing_storage,
266 #[cfg(feature = "verify_aggregation_graph")]
267 root_tasks: Default::default(),
268 }
269 }
270
271 fn execute_context<'a>(
272 &'a self,
273 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
274 ) -> impl ExecuteContext<'a> {
275 ExecuteContextImpl::new(self, turbo_tasks)
276 }
277
278 fn suspending_requested(&self) -> bool {
279 self.should_persist() && self.snapshot_coord.snapshot_pending()
280 }
281
282 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283 if self.should_persist() {
284 self.snapshot_coord.suspend_point(suspend);
285 }
286 }
287
288 pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289 if !self.should_persist() {
290 return OperationGuard::noop();
291 }
292 self.snapshot_coord.begin_operation()
293 }
294
295 fn should_persist(&self) -> bool {
296 matches!(
297 self.options.storage_mode,
298 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299 )
300 }
301
302 fn should_evict(&self) -> bool {
303 self.options.evict_after_snapshot && self.should_persist()
304 }
305
306 #[doc(hidden)]
313 pub fn snapshot_and_evict_for_testing(
314 &self,
315 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
316 ) -> (bool, EvictionCounts) {
317 assert!(
318 self.should_persist(),
319 "snapshot_and_evict requires persistence"
320 );
321 let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322 let had_new_data = match snapshot_result {
323 Ok((_, new_data)) => new_data,
324 Err(_) => {
325 return (false, EvictionCounts::default());
329 }
330 };
331 let counts = self.storage.evict_after_snapshot(None);
332 (had_new_data, counts)
333 }
334
335 fn should_restore(&self) -> bool {
336 self.options.storage_mode.is_some()
337 }
338
339 fn should_track_dependencies(&self) -> bool {
340 self.options.dependency_tracking
341 }
342
343 fn should_track_activeness(&self) -> bool {
344 self.options.active_tracking
345 }
346
347 fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348 self.task_statistics
349 .map(|stats| stats.increment_cache_hit(native_fn));
350 }
351
352 fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353 self.task_statistics
354 .map(|stats| stats.increment_cache_miss(native_fn));
355 }
356
357 fn task_error_to_turbo_tasks_execution_error(
362 &self,
363 error: &TaskError,
364 ctx: &mut impl ExecuteContext<'_>,
365 ) -> TurboTasksExecutionError {
366 match error {
367 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369 message: item.message.clone(),
370 source: item
371 .source
372 .as_ref()
373 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374 })),
375 TaskError::LocalTaskContext(local_task_context) => {
376 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377 name: local_task_context.name.clone(),
378 source: local_task_context
379 .source
380 .as_ref()
381 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382 }))
383 }
384 TaskError::TaskChain(chain) => {
385 let task_id = chain.last().unwrap();
386 let error = {
387 let task = ctx.task(*task_id, TaskDataCategory::Meta);
388 if let Some(OutputValue::Error(error)) = task.get_output() {
389 Some(error.clone())
390 } else {
391 None
392 }
393 };
394 let error = error.map_or_else(
395 || {
396 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399 "Error no longer available",
400 )),
401 location: None,
402 }))
403 },
404 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405 );
406 let mut current_error = error;
407 for &task_id in chain.iter().rev() {
408 current_error =
409 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410 task_id,
411 source: Some(current_error),
412 turbo_tasks: ctx.turbo_tasks(),
413 }));
414 }
415 current_error
416 }
417 }
418 }
419}
420
421struct TaskExecutionCompletePrepareResult {
423 pub new_children: FxHashSet<TaskId>,
424 pub is_now_immutable: bool,
425 #[cfg(feature = "verify_determinism")]
426 pub no_output_set: bool,
427 #[cfg(feature = "task_dirty_cause")]
428 pub function_id: Option<FunctionId>,
429 pub new_output: Option<OutputValue>,
430 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
431 pub is_recomputation: bool,
432 pub is_session_dependent: bool,
433}
434
435impl<B: BackingStorage> TurboTasksBackendInner<B> {
437 fn try_read_task_output(
438 self: &Arc<Self>,
439 task_id: TaskId,
440 reader: Option<TaskId>,
441 options: ReadOutputOptions,
442 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
443 ) -> Result<Result<RawVc, EventListener>> {
444 self.assert_not_persistent_calling_transient(reader, task_id, None);
445
446 let mut ctx = self.execute_context(turbo_tasks);
447 let need_reader_task = if self.should_track_dependencies()
448 && !matches!(options.tracking, ReadTracking::Untracked)
449 && reader.is_some_and(|reader_id| reader_id != task_id)
450 && let Some(reader_id) = reader
451 && reader_id != task_id
452 {
453 Some(reader_id)
454 } else {
455 None
456 };
457 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
458 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
462 (task, Some(reader))
463 } else {
464 (ctx.task(task_id, TaskDataCategory::All), None)
465 };
466
467 fn listen_to_done_event(
468 reader_description: Option<EventDescription>,
469 tracking: ReadTracking,
470 done_event: &Event,
471 ) -> EventListener {
472 done_event.listen_with_note(move || {
473 move || {
474 if let Some(reader_description) = reader_description.as_ref() {
475 format!(
476 "try_read_task_output from {} ({})",
477 reader_description, tracking
478 )
479 } else {
480 format!("try_read_task_output ({})", tracking)
481 }
482 }
483 })
484 }
485
486 fn check_in_progress(
487 task: &impl TaskGuard,
488 reader_description: Option<EventDescription>,
489 tracking: ReadTracking,
490 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
491 {
492 match task.get_in_progress() {
493 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
494 listen_to_done_event(reader_description, tracking, done_event),
495 ))),
496 Some(InProgressState::InProgress(box InProgressStateInner {
497 done_event, ..
498 })) => Some(Ok(Err(listen_to_done_event(
499 reader_description,
500 tracking,
501 done_event,
502 )))),
503 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
504 "{} was canceled",
505 task.get_task_description()
506 ))),
507 None => None,
508 }
509 }
510
511 if matches!(options.consistency, ReadConsistency::Strong) {
512 if task
513 .get_persistent_task_type()
514 .is_some_and(|t| !t.native_fn.is_root)
515 {
516 drop(task);
517 drop(reader_task);
518 panic!(
519 "Strongly consistent read of non-root task {} (reader: {}). The `root` \
520 attribute is missing on the task.",
521 self.debug_get_task_description(task_id),
522 reader.map_or_else(
523 || "unknown".to_string(),
524 |r| self.debug_get_task_description(r)
525 )
526 );
527 }
528
529 let is_dirty = task.is_dirty();
530
531 let has_dirty_containers = task.has_dirty_containers();
533 if has_dirty_containers || is_dirty.is_some() {
534 let activeness = task.get_activeness_mut();
535 let mut task_ids_to_schedule: Vec<_> = Vec::new();
536 let activeness = if let Some(activeness) = activeness {
538 activeness.set_active_until_clean();
542 activeness
543 } else {
544 if ctx.should_track_activeness() {
548 task_ids_to_schedule = task.dirty_containers().collect();
550 task_ids_to_schedule.push(task_id);
551 }
552 let activeness =
553 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
554 activeness.set_active_until_clean();
555 activeness
556 };
557 let listener = activeness.all_clean_event.listen_with_note(move || {
558 let this = self.clone();
559 let tt = turbo_tasks.pin();
560 move || {
561 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
562 let mut ctx = this.execute_context(tt);
563 let mut visited = FxHashSet::default();
564 fn indent(s: &str) -> String {
565 s.split_inclusive('\n')
566 .flat_map(|line: &str| [" ", line].into_iter())
567 .collect::<String>()
568 }
569 fn get_info(
570 ctx: &mut impl ExecuteContext<'_>,
571 task_id: TaskId,
572 parent_and_count: Option<(TaskId, i32)>,
573 visited: &mut FxHashSet<TaskId>,
574 ) -> String {
575 let task = ctx.task(task_id, TaskDataCategory::All);
576 let is_dirty = task.is_dirty();
577 let in_progress =
578 task.get_in_progress()
579 .map_or("not in progress", |p| match p {
580 InProgressState::InProgress(_) => "in progress",
581 InProgressState::Scheduled { .. } => "scheduled",
582 InProgressState::Canceled => "canceled",
583 });
584 let activeness = task.get_activeness().map_or_else(
585 || "not active".to_string(),
586 |activeness| format!("{activeness:?}"),
587 );
588 let aggregation_number = get_aggregation_number(&task);
589 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
590 {
591 let uppers = get_uppers(&task);
592 !uppers.contains(&parent_task_id)
593 } else {
594 false
595 };
596
597 let has_dirty_containers = task.has_dirty_containers();
599
600 let task_description = task.get_task_description();
601 let is_dirty_label = if let Some(parent_priority) = is_dirty {
602 format!(", dirty({parent_priority})")
603 } else {
604 String::new()
605 };
606 let has_dirty_containers_label = if has_dirty_containers {
607 ", dirty containers"
608 } else {
609 ""
610 };
611 let count = if let Some((_, count)) = parent_and_count {
612 format!(" {count}")
613 } else {
614 String::new()
615 };
616 let mut info = format!(
617 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
618 {in_progress}, \
619 {activeness}{is_dirty_label}{has_dirty_containers_label})",
620 );
621 let children: Vec<_> = task.dirty_containers_with_count().collect();
622 drop(task);
623
624 if missing_upper {
625 info.push_str("\n ERROR: missing upper connection");
626 }
627
628 if has_dirty_containers || !children.is_empty() {
629 writeln!(info, "\n dirty tasks:").unwrap();
630
631 for (child_task_id, count) in children {
632 let task_description = ctx
633 .task(child_task_id, TaskDataCategory::Data)
634 .get_task_description();
635 if visited.insert(child_task_id) {
636 let child_info = get_info(
637 ctx,
638 child_task_id,
639 Some((task_id, count)),
640 visited,
641 );
642 info.push_str(&indent(&child_info));
643 if !info.ends_with('\n') {
644 info.push('\n');
645 }
646 } else {
647 writeln!(
648 info,
649 " {child_task_id} {task_description} {count} \
650 (already visited)"
651 )
652 .unwrap();
653 }
654 }
655 }
656 info
657 }
658 let info = get_info(&mut ctx, task_id, None, &mut visited);
659 format!(
660 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
661 )
662 }
663 });
664 drop(reader_task);
665 drop(task);
666 if !task_ids_to_schedule.is_empty() {
667 let mut queue = AggregationUpdateQueue::new();
668 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
669 queue.execute(&mut ctx);
670 }
671
672 return Ok(Err(listener));
673 }
674 }
675
676 let reader_description = reader_task
677 .as_ref()
678 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
679 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
680 {
681 return value;
682 }
683
684 if let Some(output) = task.get_output() {
685 let result = match output {
686 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
687 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
688 OutputValue::Error(error) => Err(error.clone()),
689 };
690 if let Some(mut reader_task) = reader_task.take()
691 && options.tracking.should_track(result.is_err())
692 && (!task.immutable() || cfg!(feature = "verify_immutable"))
693 {
694 #[cfg(feature = "trace_task_output_dependencies")]
695 let _span = tracing::trace_span!(
696 "add output dependency",
697 task = %task_id,
698 dependent_task = ?reader
699 )
700 .entered();
701 let mut queue = LeafDistanceUpdateQueue::new();
702 let reader = reader.unwrap();
703 if task.add_output_dependent(reader) {
704 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
706 let reader_leaf_distance =
707 reader_task.get_leaf_distance().copied().unwrap_or_default();
708 if reader_leaf_distance.distance <= leaf_distance.distance {
709 queue.push(
710 reader,
711 leaf_distance.distance,
712 leaf_distance.max_distance_in_buffer,
713 );
714 }
715 }
716
717 drop(task);
718
719 if !reader_task.remove_outdated_output_dependencies(&task_id) {
725 let _ = reader_task.add_output_dependencies(task_id);
726 }
727 drop(reader_task);
728
729 queue.execute(&mut ctx);
730 } else {
731 drop(task);
732 }
733
734 return result.map_err(|error| {
735 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
736 .with_task_context(task_id, turbo_tasks.pin())
737 .into()
738 });
739 }
740 drop(reader_task);
741
742 let note = EventDescription::new(|| {
743 move || {
744 if let Some(reader) = reader_description.as_ref() {
745 format!("try_read_task_output (recompute) from {reader}",)
746 } else {
747 "try_read_task_output (recompute, untracked)".to_string()
748 }
749 }
750 });
751
752 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
754 TaskExecutionReason::OutputNotAvailable,
755 EventDescription::new(|| task.get_task_desc_fn()),
756 note,
757 );
758
759 let old = task.set_in_progress(in_progress_state);
762 debug_assert!(old.is_none(), "InProgress already exists");
763 ctx.schedule_task(task, TaskPriority::Recomputation);
764
765 Ok(Err(listener))
766 }
767
768 fn try_read_task_cell(
769 &self,
770 task_id: TaskId,
771 reader: Option<TaskId>,
772 cell: CellId,
773 options: ReadCellOptions,
774 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
775 ) -> Result<Result<TypedCellContent, EventListener>> {
776 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
777
778 fn add_cell_dependency(
779 task_id: TaskId,
780 mut task: impl TaskGuard,
781 reader: Option<TaskId>,
782 reader_task: Option<impl TaskGuard>,
783 cell: CellId,
784 key: Option<u64>,
785 ) {
786 if let Some(mut reader_task) = reader_task
787 && (!task.immutable() || cfg!(feature = "verify_immutable"))
788 {
789 let reader = reader.unwrap();
790 let _ = task
791 .add_cell_dependents(CellDependency::new(CellRef { task: reader, cell }, key));
792 drop(task);
793
794 let target = CellRef {
800 task: task_id,
801 cell,
802 };
803 let dep = CellDependency::new(target, key);
804 if !reader_task.remove_outdated_cell_dependencies(&dep) {
805 let _ = reader_task.add_cell_dependencies(dep);
806 }
807 drop(reader_task);
808 }
809 }
810
811 let ReadCellOptions {
812 tracking,
813 final_read_hint,
814 } = options;
815
816 let mut ctx = self.execute_context(turbo_tasks);
817 let (mut task, reader_task) = if self.should_track_dependencies()
818 && !matches!(tracking, ReadCellTracking::Untracked)
819 && let Some(reader_id) = reader
820 && reader_id != task_id
821 {
822 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
826 (task, Some(reader))
827 } else {
828 (ctx.task(task_id, TaskDataCategory::All), None)
829 };
830
831 let content = if final_read_hint {
832 task.remove_cell_data(&cell)
833 } else {
834 task.get_cell_data(&cell).cloned()
835 };
836 if let Some(content) = content {
837 if tracking.should_track(false) {
838 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
839 }
840 return Ok(Ok(TypedCellContent(
841 cell.type_id,
842 CellContent(Some(content)),
843 )));
844 }
845
846 let in_progress = task.get_in_progress();
847 if matches!(
848 in_progress,
849 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
850 ) {
851 return Ok(Err(self
852 .listen_to_cell(&mut task, task_id, &reader_task, cell)
853 .0));
854 }
855 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
856
857 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
859 let Some(max_id) = max_id else {
860 let task_desc = task.get_task_description();
861 if tracking.should_track(true) {
862 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
863 }
864 bail!(
865 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
866 );
867 };
868 if cell.index >= max_id {
869 let task_desc = task.get_task_description();
870 if tracking.should_track(true) {
871 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
872 }
873 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
874 }
875
876 if is_cancelled {
882 bail!("{} was canceled", task.get_task_description());
883 }
884
885 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
887 drop(reader_task);
888 if !new_listener {
889 return Ok(Err(listener));
890 }
891
892 let _span = tracing::trace_span!(
893 "recomputation",
894 cell_type = get_value_type(cell.type_id).ty.global_name,
895 cell_index = cell.index
896 )
897 .entered();
898
899 let _ = task.add_scheduled(
900 TaskExecutionReason::CellNotAvailable,
901 EventDescription::new(|| task.get_task_desc_fn()),
902 );
903 ctx.schedule_task(task, TaskPriority::Recomputation);
904
905 Ok(Err(listener))
906 }
907
908 fn listen_to_cell(
909 &self,
910 task: &mut impl TaskGuard,
911 task_id: TaskId,
912 reader_task: &Option<impl TaskGuard>,
913 cell: CellId,
914 ) -> (EventListener, bool) {
915 let note = || {
916 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
917 move || {
918 if let Some(reader_desc) = reader_desc.as_ref() {
919 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
920 } else {
921 "try_read_task_cell (in progress, untracked)".to_string()
922 }
923 }
924 };
925 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
926 let listener = in_progress.event.listen_with_note(note);
928 return (listener, false);
929 }
930 let in_progress = InProgressCellState::new(task_id, cell);
931 let listener = in_progress.event.listen_with_note(note);
932 let old = task.insert_in_progress_cells(cell, in_progress);
933 debug_assert!(old.is_none(), "InProgressCell already exists");
934 (listener, true)
935 }
936
937 fn snapshot_and_persist(
938 &self,
939 parent_span: Option<tracing::Id>,
940 reason: &str,
941 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
942 ) -> Result<(Instant, bool), anyhow::Error> {
943 let snapshot_span =
944 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
945 .entered();
946 let _snapshot_in_progress = self.snapshot_in_progress.lock();
950 let start = Instant::now();
951 let wall_start = SystemTime::now();
955 debug_assert!(self.should_persist());
956
957 let mut snapshot_phase = {
958 let _span = tracing::info_span!("blocking").entered();
959 self.snapshot_coord.begin_snapshot()
960 };
961 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
964
965 let suspended_operations = snapshot_phase.take_suspended_operations();
966
967 let snapshot_time = Instant::now();
968 drop(snapshot_phase);
969
970 if !has_modifications {
971 drop(snapshot_guard);
974 return Ok((start, false));
975 }
976
977 #[cfg(feature = "print_cache_item_size")]
978 #[derive(Default)]
979 struct TaskCacheStats {
980 data: usize,
981 #[cfg(feature = "print_cache_item_size_with_compressed")]
982 data_compressed: usize,
983 data_count: usize,
984 meta: usize,
985 #[cfg(feature = "print_cache_item_size_with_compressed")]
986 meta_compressed: usize,
987 meta_count: usize,
988 upper_count: usize,
989 collectibles_count: usize,
990 aggregated_collectibles_count: usize,
991 children_count: usize,
992 followers_count: usize,
993 collectibles_dependents_count: usize,
994 aggregated_dirty_containers_count: usize,
995 output_size: usize,
996 }
997 #[cfg(feature = "print_cache_item_size")]
1000 struct FormatSizes {
1001 size: usize,
1002 #[cfg(feature = "print_cache_item_size_with_compressed")]
1003 compressed_size: usize,
1004 }
1005 #[cfg(feature = "print_cache_item_size")]
1006 impl std::fmt::Display for FormatSizes {
1007 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1008 use turbo_tasks::util::FormatBytes;
1009 #[cfg(feature = "print_cache_item_size_with_compressed")]
1010 {
1011 write!(
1012 f,
1013 "{} ({} compressed)",
1014 FormatBytes(self.size),
1015 FormatBytes(self.compressed_size)
1016 )
1017 }
1018 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1019 {
1020 write!(f, "{}", FormatBytes(self.size))
1021 }
1022 }
1023 }
1024 #[cfg(feature = "print_cache_item_size")]
1025 impl TaskCacheStats {
1026 #[cfg(feature = "print_cache_item_size_with_compressed")]
1027 fn compressed_size(data: &[u8]) -> Result<usize> {
1028 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1029 data,
1030 &mut Vec::new(),
1031 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1032 )?)
1033 }
1034
1035 fn add_data(&mut self, data: &[u8]) {
1036 self.data += data.len();
1037 #[cfg(feature = "print_cache_item_size_with_compressed")]
1038 {
1039 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1040 }
1041 self.data_count += 1;
1042 }
1043
1044 fn add_meta(&mut self, data: &[u8]) {
1045 self.meta += data.len();
1046 #[cfg(feature = "print_cache_item_size_with_compressed")]
1047 {
1048 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1049 }
1050 self.meta_count += 1;
1051 }
1052
1053 fn add_counts(&mut self, storage: &TaskStorage) {
1054 let counts = storage.meta_counts();
1055 self.upper_count += counts.upper;
1056 self.collectibles_count += counts.collectibles;
1057 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1058 self.children_count += counts.children;
1059 self.followers_count += counts.followers;
1060 self.collectibles_dependents_count += counts.collectibles_dependents;
1061 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1062 if let Some(output) = storage.get_output() {
1063 use turbo_bincode::turbo_bincode_encode;
1064
1065 self.output_size += turbo_bincode_encode(&output)
1066 .map(|data| data.len())
1067 .unwrap_or(0);
1068 }
1069 }
1070
1071 fn task_name(storage: &TaskStorage) -> String {
1073 storage
1074 .get_persistent_task_type()
1075 .map(|t| t.to_string())
1076 .unwrap_or_else(|| "<unknown>".to_string())
1077 }
1078
1079 fn sort_key(&self) -> usize {
1082 #[cfg(feature = "print_cache_item_size_with_compressed")]
1083 {
1084 self.data_compressed + self.meta_compressed
1085 }
1086 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1087 {
1088 self.data + self.meta
1089 }
1090 }
1091
1092 fn format_total(&self) -> FormatSizes {
1093 FormatSizes {
1094 size: self.data + self.meta,
1095 #[cfg(feature = "print_cache_item_size_with_compressed")]
1096 compressed_size: self.data_compressed + self.meta_compressed,
1097 }
1098 }
1099
1100 fn format_data(&self) -> FormatSizes {
1101 FormatSizes {
1102 size: self.data,
1103 #[cfg(feature = "print_cache_item_size_with_compressed")]
1104 compressed_size: self.data_compressed,
1105 }
1106 }
1107
1108 fn format_avg_data(&self) -> FormatSizes {
1109 FormatSizes {
1110 size: self.data.checked_div(self.data_count).unwrap_or(0),
1111 #[cfg(feature = "print_cache_item_size_with_compressed")]
1112 compressed_size: self
1113 .data_compressed
1114 .checked_div(self.data_count)
1115 .unwrap_or(0),
1116 }
1117 }
1118
1119 fn format_meta(&self) -> FormatSizes {
1120 FormatSizes {
1121 size: self.meta,
1122 #[cfg(feature = "print_cache_item_size_with_compressed")]
1123 compressed_size: self.meta_compressed,
1124 }
1125 }
1126
1127 fn format_avg_meta(&self) -> FormatSizes {
1128 FormatSizes {
1129 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1130 #[cfg(feature = "print_cache_item_size_with_compressed")]
1131 compressed_size: self
1132 .meta_compressed
1133 .checked_div(self.meta_count)
1134 .unwrap_or(0),
1135 }
1136 }
1137 }
1138 #[cfg(feature = "print_cache_item_size")]
1139 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1140 Mutex::new(FxHashMap::default());
1141
1142 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1149 let encode_category = |task_id: TaskId,
1150 data: &TaskStorage,
1151 category: SpecificTaskDataCategory,
1152 buffer: &mut TurboBincodeBuffer|
1153 -> Option<TurboBincodeBuffer> {
1154 match encode_task_data(task_id, data, category, buffer) {
1155 Ok(encoded) => {
1156 #[cfg(feature = "print_cache_item_size")]
1157 {
1158 let mut stats = task_cache_stats.lock();
1159 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1160 match category {
1161 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1162 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1163 }
1164 }
1165 Some(encoded)
1166 }
1167 Err(err) => {
1168 panic!(
1169 "Serializing task {} failed ({:?}): {:?}",
1170 self.debug_get_task_description(task_id),
1171 category,
1172 err
1173 );
1174 }
1175 }
1176 };
1177 if task_id.is_transient() {
1178 unreachable!("transient task_ids should never be enqueued to be persisted");
1179 }
1180
1181 let encode_meta = inner.flags.meta_modified();
1182 let encode_data = inner.flags.data_modified();
1183
1184 #[cfg(feature = "print_cache_item_size")]
1185 if encode_data || encode_meta {
1186 task_cache_stats
1187 .lock()
1188 .entry(TaskCacheStats::task_name(inner))
1189 .or_default()
1190 .add_counts(inner);
1191 }
1192
1193 let meta = if encode_meta {
1194 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1195 } else {
1196 None
1197 };
1198
1199 let data = if encode_data {
1200 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1201 } else {
1202 None
1203 };
1204 let task_type_hash = if inner.flags.new_task() {
1205 let task_type = inner.get_persistent_task_type().expect(
1206 "It is not possible for a new_task to not have a persistent_task_type. Task \
1207 creation for persistent tasks uses a single ExecutionContextImpl for \
1208 creating the task (which sets new_task) and connect_child (which sets \
1209 persistent_task_type) and take_snapshot waits for all operations to complete \
1210 or suspend before we start snapshotting. So task creation will always set \
1211 the task_type.",
1212 );
1213 Some(compute_task_type_hash(task_type))
1214 } else {
1215 None
1216 };
1217
1218 SnapshotItem {
1219 task_id,
1220 meta,
1221 data,
1222 task_type_hash,
1223 }
1224 };
1225
1226 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1227
1228 drop(snapshot_span);
1229 let snapshot_duration = start.elapsed();
1230 let task_count = task_snapshots.len();
1231
1232 if task_snapshots.is_empty() {
1233 std::hint::cold_path();
1236 return Ok((snapshot_time, false));
1237 }
1238
1239 let persist_start = Instant::now();
1240 let span = tracing::info_span!(
1241 parent: parent_span,
1242 "persist",
1243 reason = reason,
1244 data_items= tracing::field::Empty,
1245 meta_items= tracing::field::Empty,
1246 task_cache_items= tracing::field::Empty,
1247 next_task_id= tracing::field::Empty,)
1248 .entered();
1249 {
1250 let SnapshotMeta {
1254 task_cache_items,
1255 data_items,
1256 meta_items,
1257 max_next_task_id,
1258 } = self
1259 .backing_storage
1260 .save_snapshot(suspended_operations, task_snapshots)?;
1261 span.record("data_items", data_items);
1262 span.record("meta_items", meta_items);
1263 span.record("task_cache_items", task_cache_items);
1264 span.record("next_task_id", max_next_task_id);
1265
1266 #[cfg(feature = "print_cache_item_size")]
1267 {
1268 let mut task_cache_stats = task_cache_stats
1269 .into_inner()
1270 .into_iter()
1271 .collect::<Vec<_>>();
1272 if !task_cache_stats.is_empty() {
1273 use turbo_tasks::util::FormatBytes;
1274
1275 use crate::utils::markdown_table::print_markdown_table;
1276
1277 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1278 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1279 });
1280
1281 println!(
1282 "Task cache stats: {}",
1283 FormatSizes {
1284 size: task_cache_stats
1285 .iter()
1286 .map(|(_, s)| s.data + s.meta)
1287 .sum::<usize>(),
1288 #[cfg(feature = "print_cache_item_size_with_compressed")]
1289 compressed_size: task_cache_stats
1290 .iter()
1291 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1292 .sum::<usize>()
1293 },
1294 );
1295
1296 print_markdown_table(
1297 [
1298 "Task",
1299 " Total Size",
1300 " Data Size",
1301 " Data Count x Avg",
1302 " Data Count x Avg",
1303 " Meta Size",
1304 " Meta Count x Avg",
1305 " Meta Count x Avg",
1306 " Uppers",
1307 " Coll",
1308 " Agg Coll",
1309 " Children",
1310 " Followers",
1311 " Coll Deps",
1312 " Agg Dirty",
1313 " Output Size",
1314 ],
1315 task_cache_stats.iter(),
1316 |(task_desc, stats)| {
1317 [
1318 task_desc.to_string(),
1319 format!(" {}", stats.format_total()),
1320 format!(" {}", stats.format_data()),
1321 format!(" {} x", stats.data_count),
1322 format!("{}", stats.format_avg_data()),
1323 format!(" {}", stats.format_meta()),
1324 format!(" {} x", stats.meta_count),
1325 format!("{}", stats.format_avg_meta()),
1326 format!(" {}", stats.upper_count),
1327 format!(" {}", stats.collectibles_count),
1328 format!(" {}", stats.aggregated_collectibles_count),
1329 format!(" {}", stats.children_count),
1330 format!(" {}", stats.followers_count),
1331 format!(" {}", stats.collectibles_dependents_count),
1332 format!(" {}", stats.aggregated_dirty_containers_count),
1333 format!(" {}", FormatBytes(stats.output_size)),
1334 ]
1335 },
1336 );
1337 }
1338 }
1339 }
1340
1341 let elapsed = start.elapsed();
1342 let persist_duration = persist_start.elapsed();
1343 if elapsed > Duration::from_secs(10) {
1345 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1346 "Finished writing to filesystem cache".to_string(),
1347 elapsed,
1348 )));
1349 }
1350
1351 let wall_start_ms = wall_start
1352 .duration_since(SystemTime::UNIX_EPOCH)
1353 .unwrap_or_default()
1354 .as_secs_f64()
1356 * 1000.0;
1357 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1358 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1359 "turbopack-persistence",
1360 wall_start_ms,
1361 wall_end_ms,
1362 vec![
1363 ("reason", serde_json::Value::from(reason)),
1364 (
1365 "snapshot_duration_ms",
1366 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1367 ),
1368 (
1369 "persist_duration_ms",
1370 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1371 ),
1372 ("task_count", serde_json::Value::from(task_count)),
1373 ],
1374 )));
1375
1376 Ok((snapshot_time, true))
1377 }
1378
1379 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1380 if self.should_restore() {
1381 let uncompleted_operations = self
1385 .backing_storage
1386 .uncompleted_operations()
1387 .expect("Failed to get uncompleted operations");
1388 if !uncompleted_operations.is_empty() {
1389 let mut ctx = self.execute_context(turbo_tasks);
1390 for op in uncompleted_operations {
1391 op.execute(&mut ctx);
1392 }
1393 }
1394 }
1395
1396 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1399 let _span = trace_span!("persisting background job").entered();
1401 let _span = tracing::info_span!("thread").entered();
1402 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1403 }
1404 }
1405
1406 fn stopping(&self) {
1407 self.stopping.store(true, Ordering::Release);
1408 self.stopping_event.notify(usize::MAX);
1409 }
1410
1411 #[allow(unused_variables)]
1412 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1413 #[cfg(feature = "verify_aggregation_graph")]
1414 {
1415 self.is_idle.store(false, Ordering::Release);
1416 self.verify_aggregation_graph(turbo_tasks, false);
1417 }
1418 if self.should_persist()
1419 && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1420 {
1421 eprintln!("Persisting failed during shutdown: {err:?}");
1422 }
1423 self.storage.drop_contents();
1424 if let Err(err) = self.backing_storage.shutdown() {
1425 println!("Shutting down failed: {err}");
1426 }
1427 }
1428
1429 #[allow(unused_variables)]
1430 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1431 self.idle_start_event.notify(usize::MAX);
1432
1433 #[cfg(feature = "verify_aggregation_graph")]
1434 {
1435 use tokio::select;
1436
1437 self.is_idle.store(true, Ordering::Release);
1438 let this = self.clone();
1439 let turbo_tasks = turbo_tasks.pin();
1440 tokio::task::spawn(async move {
1441 select! {
1442 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1443 }
1445 _ = this.idle_end_event.listen() => {
1446 return;
1447 }
1448 }
1449 if !this.is_idle.load(Ordering::Relaxed) {
1450 return;
1451 }
1452 this.verify_aggregation_graph(&*turbo_tasks, true);
1453 });
1454 }
1455 }
1456
1457 fn idle_end(&self) {
1458 #[cfg(feature = "verify_aggregation_graph")]
1459 self.is_idle.store(false, Ordering::Release);
1460 self.idle_end_event.notify(usize::MAX);
1461 }
1462
1463 fn get_or_create_task(
1464 &self,
1465 native_fn: &'static NativeFunction,
1466 this: Option<RawVc>,
1467 arg: &mut dyn StackDynTaskInputs,
1468 parent_task: Option<TaskId>,
1469 persistence: TaskPersistence,
1470 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1471 ) -> TaskId {
1472 let transient = matches!(persistence, TaskPersistence::Transient);
1473
1474 if transient
1475 && let Some(parent_task) = parent_task
1476 && !parent_task.is_transient()
1477 {
1478 let task_type = CachedTaskType {
1479 native_fn,
1480 this,
1481 arg: arg.take_box(),
1482 };
1483 self.panic_persistent_calling_transient(
1484 self.debug_get_task_description(parent_task),
1485 Some(&task_type),
1486 None,
1487 );
1488 }
1489
1490 let is_root = native_fn.is_root;
1491
1492 let arg_ref = arg.as_ref();
1494 let hash = CachedTaskType::hash_from_components(
1495 self.storage.task_cache.hasher(),
1496 native_fn,
1497 this,
1498 arg_ref,
1499 );
1500 let shard = get_shard(&self.storage.task_cache, hash);
1504
1505 let mut ctx = self.execute_context(turbo_tasks);
1506 if let Some(task_id) =
1510 raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1511 {
1512 self.track_cache_hit_by_fn(native_fn);
1513 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1514 return task_id;
1515 }
1516
1517 let task_id = if !transient
1522 && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1523 {
1524 self.track_cache_hit_by_fn(native_fn);
1525 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1528 k.eq_components(native_fn, this, arg_ref)
1529 }) {
1530 RawEntry::Occupied(_) => {}
1531 RawEntry::Vacant(e) => {
1532 e.insert(stored_type, task_id);
1533 }
1534 };
1535 task_id
1536 } else {
1537 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1538 k.eq_components(native_fn, this, arg_ref)
1539 }) {
1540 RawEntry::Occupied(e) => {
1541 let task_id = *e.get();
1544 drop(e);
1545 self.track_cache_hit_by_fn(native_fn);
1546 task_id
1547 }
1548 RawEntry::Vacant(e) => {
1549 let task_type = CachedTaskTypeArc::new(CachedTaskType {
1553 native_fn,
1554 this,
1555 arg: arg.take_box(),
1556 });
1557 let task_id = if transient {
1558 self.transient_task_id_factory.get()
1559 } else {
1560 self.persisted_task_id_factory.get()
1561 };
1562 self.storage
1566 .initialize_new_task(task_id, Some(task_type.clone()));
1567 e.insert(task_type, task_id);
1569 self.track_cache_miss_by_fn(native_fn);
1570 if is_root {
1574 AggregationUpdateQueue::run(
1575 AggregationUpdateJob::UpdateAggregationNumber {
1576 task_id,
1577 base_aggregation_number: u32::MAX,
1578 distance: None,
1579 },
1580 &mut ctx,
1581 );
1582 } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1583 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1584 AggregationUpdateQueue::run(
1585 AggregationUpdateJob::UpdateAggregationNumber {
1586 task_id,
1587 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1588 distance: None,
1589 },
1590 &mut ctx,
1591 );
1592 };
1593
1594 task_id
1595 }
1596 }
1597 };
1598
1599 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1600
1601 task_id
1602 }
1603
1604 fn debug_trace_transient_task(
1607 &self,
1608 task_type: &CachedTaskType,
1609 cell_id: Option<CellId>,
1610 ) -> DebugTraceTransientTask {
1611 fn inner_id(
1614 backend: &TurboTasksBackendInner<impl BackingStorage>,
1615 task_id: TaskId,
1616 cell_type_id: Option<ValueTypeId>,
1617 visited_set: &mut FxHashSet<TaskId>,
1618 ) -> DebugTraceTransientTask {
1619 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1620 if visited_set.contains(&task_id) {
1621 let task_name = task_type.get_name();
1622 DebugTraceTransientTask::Collapsed {
1623 task_name,
1624 cell_type_id,
1625 }
1626 } else {
1627 inner_cached(backend, &task_type, cell_type_id, visited_set)
1628 }
1629 } else {
1630 DebugTraceTransientTask::Uncached { cell_type_id }
1631 }
1632 }
1633 fn inner_cached(
1634 backend: &TurboTasksBackendInner<impl BackingStorage>,
1635 task_type: &CachedTaskType,
1636 cell_type_id: Option<ValueTypeId>,
1637 visited_set: &mut FxHashSet<TaskId>,
1638 ) -> DebugTraceTransientTask {
1639 let task_name = task_type.get_name();
1640
1641 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1642 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1643 return None;
1647 };
1648 if task_id.is_transient() {
1649 Some(Box::new(inner_id(
1650 backend,
1651 task_id,
1652 cause_self_raw_vc.try_get_type_id(),
1653 visited_set,
1654 )))
1655 } else {
1656 None
1657 }
1658 });
1659 let cause_args = task_type
1660 .arg
1661 .get_raw_vcs()
1662 .into_iter()
1663 .filter_map(|raw_vc| {
1664 let Some(task_id) = raw_vc.try_get_task_id() else {
1665 return None;
1667 };
1668 if !task_id.is_transient() {
1669 return None;
1670 }
1671 Some((task_id, raw_vc.try_get_type_id()))
1672 })
1673 .collect::<IndexSet<_>>() .into_iter()
1675 .map(|(task_id, cell_type_id)| {
1676 inner_id(backend, task_id, cell_type_id, visited_set)
1677 })
1678 .collect();
1679
1680 DebugTraceTransientTask::Cached {
1681 task_name,
1682 cell_type_id,
1683 cause_self,
1684 cause_args,
1685 }
1686 }
1687 inner_cached(
1688 self,
1689 task_type,
1690 cell_id.map(|c| c.type_id),
1691 &mut FxHashSet::default(),
1692 )
1693 }
1694
1695 fn invalidate_task(
1696 &self,
1697 task_id: TaskId,
1698 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1699 ) {
1700 if !self.should_track_dependencies() {
1701 panic!("Dependency tracking is disabled so invalidation is not allowed");
1702 }
1703 operation::InvalidateOperation::run(
1704 smallvec![task_id],
1705 #[cfg(feature = "task_dirty_cause")]
1706 TaskDirtyCause::Invalidator,
1707 self.execute_context(turbo_tasks),
1708 );
1709 }
1710
1711 fn invalidate_tasks(
1712 &self,
1713 tasks: &[TaskId],
1714 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1715 ) {
1716 if !self.should_track_dependencies() {
1717 panic!("Dependency tracking is disabled so invalidation is not allowed");
1718 }
1719 operation::InvalidateOperation::run(
1720 tasks.iter().copied().collect(),
1721 #[cfg(feature = "task_dirty_cause")]
1722 TaskDirtyCause::Unknown,
1723 self.execute_context(turbo_tasks),
1724 );
1725 }
1726
1727 fn invalidate_tasks_set(
1728 &self,
1729 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1730 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1731 ) {
1732 if !self.should_track_dependencies() {
1733 panic!("Dependency tracking is disabled so invalidation is not allowed");
1734 }
1735 operation::InvalidateOperation::run(
1736 tasks.iter().copied().collect(),
1737 #[cfg(feature = "task_dirty_cause")]
1738 TaskDirtyCause::Unknown,
1739 self.execute_context(turbo_tasks),
1740 );
1741 }
1742
1743 fn invalidate_serialization(
1744 &self,
1745 task_id: TaskId,
1746 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1747 ) {
1748 if task_id.is_transient() {
1749 return;
1750 }
1751 let mut ctx = self.execute_context(turbo_tasks);
1752 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1753 task.invalidate_serialization();
1754 }
1755
1756 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1757 let task = self.storage.access_mut(task_id);
1758 if let Some(value) = task.get_persistent_task_type() {
1759 format!("{task_id:?} {}", value)
1760 } else if let Some(value) = task.get_transient_task_type() {
1761 format!("{task_id:?} {}", value)
1762 } else {
1763 format!("{task_id:?} unknown")
1764 }
1765 }
1766
1767 fn get_task_name(
1768 &self,
1769 task_id: TaskId,
1770 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1771 ) -> String {
1772 let mut ctx = self.execute_context(turbo_tasks);
1773 let task = ctx.task(task_id, TaskDataCategory::Data);
1774 if let Some(value) = task.get_persistent_task_type() {
1775 value.to_string()
1776 } else if let Some(value) = task.get_transient_task_type() {
1777 value.to_string()
1778 } else {
1779 "unknown".to_string()
1780 }
1781 }
1782
1783 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1784 let task = self.storage.access_mut(task_id);
1785 task.get_persistent_task_type().cloned()
1786 }
1787
1788 fn task_execution_canceled(
1789 &self,
1790 task_id: TaskId,
1791 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1792 ) {
1793 let mut ctx = self.execute_context(turbo_tasks);
1794 let mut task = ctx.task(task_id, TaskDataCategory::All);
1795 if let Some(in_progress) = task.take_in_progress() {
1796 match in_progress {
1797 InProgressState::Scheduled {
1798 done_event,
1799 reason: _,
1800 } => done_event.notify(usize::MAX),
1801 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1802 done_event.notify(usize::MAX)
1803 }
1804 InProgressState::Canceled => {}
1805 }
1806 }
1807 let in_progress_cells = task.take_in_progress_cells();
1810 if let Some(ref cells) = in_progress_cells {
1811 for state in cells.values() {
1812 state.event.notify(usize::MAX);
1813 }
1814 }
1815
1816 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1822 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1823 } else {
1824 None
1825 };
1826
1827 let old = task.set_in_progress(InProgressState::Canceled);
1828 debug_assert!(old.is_none(), "InProgress already exists");
1829 drop(task);
1830
1831 if let Some(data_update) = data_update {
1832 AggregationUpdateQueue::run(data_update, &mut ctx);
1833 }
1834
1835 drop(in_progress_cells);
1836 }
1837
1838 fn try_start_task_execution(
1839 &self,
1840 task_id: TaskId,
1841 priority: TaskPriority,
1842 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1843 ) -> Option<TaskExecutionSpec<'_>> {
1844 let execution_reason;
1845 let task_type;
1846 #[cfg(feature = "task_dirty_cause")]
1847 let cause;
1848 {
1849 let mut ctx = self.execute_context(turbo_tasks);
1850 let mut task = ctx.task(task_id, TaskDataCategory::All);
1851 task_type = task.get_task_type().to_owned();
1852 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1853 if let Some(tasks) = task.prefetch() {
1854 drop(task);
1855 ctx.prepare_tasks(tasks, "prefetch");
1856 task = ctx.task(task_id, TaskDataCategory::All);
1857 }
1858 let in_progress = task.take_in_progress()?;
1859 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1860 let old = task.set_in_progress(in_progress);
1861 debug_assert!(old.is_none(), "InProgress already exists");
1862 return None;
1863 };
1864 execution_reason = reason;
1865 #[cfg(feature = "task_dirty_cause")]
1866 {
1867 cause = match task.get_dirty() {
1868 Some(Dirtyness::Dirty { cause, .. }) => Some(cause.clone()),
1869 _ => None,
1870 };
1871 }
1872 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1873 InProgressStateInner {
1874 stale: false,
1875 once_task,
1876 done_event,
1877 marked_as_completed: false,
1878 new_children: Default::default(),
1879 },
1880 )));
1881 debug_assert!(old.is_none(), "InProgress already exists");
1882
1883 enum Collectible {
1885 Current(CollectibleRef, i32),
1886 Outdated(CollectibleRef),
1887 }
1888 let collectibles = task
1889 .iter_collectibles()
1890 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1891 .chain(
1892 task.iter_outdated_collectibles()
1893 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1894 )
1895 .collect::<Vec<_>>();
1896 for collectible in collectibles {
1897 match collectible {
1898 Collectible::Current(collectible, value) => {
1899 let _ = task.insert_outdated_collectible(collectible, value);
1900 }
1901 Collectible::Outdated(collectible) => {
1902 if task
1903 .collectibles()
1904 .is_none_or(|m| m.get(&collectible).is_none())
1905 {
1906 task.remove_outdated_collectibles(&collectible);
1907 }
1908 }
1909 }
1910 }
1911
1912 if self.should_track_dependencies() {
1913 let cell_dependencies = task.iter_cell_dependencies().collect();
1915 task.set_outdated_cell_dependencies(cell_dependencies);
1916
1917 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1918 task.set_outdated_output_dependencies(outdated_output_dependencies);
1919 }
1920 }
1921
1922 let (span, future) = match task_type {
1923 TaskType::Cached(task_type) => {
1924 let CachedTaskType {
1925 native_fn,
1926 this,
1927 arg,
1928 } = &*task_type;
1929 (
1930 native_fn.span(
1931 task_id.persistence(),
1932 execution_reason,
1933 priority,
1934 #[cfg(feature = "task_dirty_cause")]
1935 cause.as_ref(),
1936 ),
1937 native_fn.execute(*this, &**arg),
1938 )
1939 }
1940 TaskType::Transient(task_type) => {
1941 let span = tracing::trace_span!("turbo_tasks::root_task");
1942 let future = match &*task_type {
1943 TransientTask::Root(f) => f(),
1944 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1945 };
1946 (span, future)
1947 }
1948 };
1949 Some(TaskExecutionSpec { future, span })
1950 }
1951
1952 fn task_execution_completed(
1956 &self,
1957 task_id: TaskId,
1958 result: Result<RawVc, TurboTasksExecutionError>,
1959 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1960 #[cfg(feature = "verify_determinism")] stateful: bool,
1961 has_invalidator: bool,
1962 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1963 ) -> Option<TaskPriority> {
1964 #[cfg(not(feature = "trace_task_details"))]
1979 let span = tracing::trace_span!(
1980 "task execution completed",
1981 new_children = tracing::field::Empty
1982 )
1983 .entered();
1984 #[cfg(feature = "trace_task_details")]
1985 let span = tracing::trace_span!(
1986 "task execution completed",
1987 task_id = display(task_id),
1988 result = match result.as_ref() {
1989 Ok(value) => display(either::Either::Left(value)),
1990 Err(err) => display(either::Either::Right(err)),
1991 },
1992 new_children = tracing::field::Empty,
1993 immutable = tracing::field::Empty,
1994 new_output = tracing::field::Empty,
1995 output_dependents = tracing::field::Empty,
1996 aggregation_number = tracing::field::Empty,
1997 stale = tracing::field::Empty,
1998 )
1999 .entered();
2000
2001 let is_error = result.is_err();
2002
2003 let mut ctx = self.execute_context(turbo_tasks);
2004
2005 let TaskExecutionCompletePrepareResult {
2006 new_children,
2007 is_now_immutable,
2008 #[cfg(feature = "verify_determinism")]
2009 no_output_set,
2010 new_output,
2011 #[cfg(feature = "task_dirty_cause")]
2012 function_id,
2013 output_dependent_tasks,
2014 is_recomputation,
2015 is_session_dependent,
2016 } = match self.task_execution_completed_prepare(
2017 &mut ctx,
2018 #[cfg(feature = "trace_task_details")]
2019 &span,
2020 task_id,
2021 result,
2022 cell_counters,
2023 #[cfg(feature = "verify_determinism")]
2024 stateful,
2025 has_invalidator,
2026 ) {
2027 Ok(r) => r,
2028 Err(stale_priority) => {
2029 #[cfg(feature = "trace_task_details")]
2031 span.record("stale", "prepare");
2032 return Some(stale_priority);
2033 }
2034 };
2035
2036 #[cfg(feature = "trace_task_details")]
2037 span.record("new_output", new_output.is_some());
2038 #[cfg(feature = "trace_task_details")]
2039 span.record("output_dependents", output_dependent_tasks.len());
2040
2041 if !output_dependent_tasks.is_empty() {
2046 self.task_execution_completed_invalidate_output_dependent(
2047 &mut ctx,
2048 task_id,
2049 #[cfg(feature = "task_dirty_cause")]
2050 function_id,
2051 output_dependent_tasks,
2052 );
2053 }
2054
2055 let has_new_children = !new_children.is_empty();
2056 span.record("new_children", new_children.len());
2057
2058 if has_new_children {
2059 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2060 }
2061
2062 if has_new_children
2063 && let Some(stale_priority) =
2064 self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2065 {
2066 #[cfg(feature = "trace_task_details")]
2068 span.record("stale", "connect");
2069 return Some(stale_priority);
2070 }
2071
2072 let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2073 &mut ctx,
2074 task_id,
2075 #[cfg(feature = "verify_determinism")]
2076 no_output_set,
2077 new_output,
2078 is_now_immutable,
2079 is_session_dependent,
2080 );
2081 if let Some(stale_priority) = stale_priority {
2082 #[cfg(feature = "trace_task_details")]
2084 span.record("stale", "finish");
2085 return Some(stale_priority);
2086 }
2087
2088 let removed_data = self.task_execution_completed_cleanup(
2089 &mut ctx,
2090 task_id,
2091 cell_counters,
2092 is_error,
2093 is_recomputation,
2094 );
2095
2096 drop(removed_data);
2098 drop(in_progress_cells);
2099
2100 None
2101 }
2102
2103 fn task_execution_completed_prepare(
2104 &self,
2105 ctx: &mut impl ExecuteContext<'_>,
2106 #[cfg(feature = "trace_task_details")] span: &Span,
2107 task_id: TaskId,
2108 result: Result<RawVc, TurboTasksExecutionError>,
2109 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2110 #[cfg(feature = "verify_determinism")] stateful: bool,
2111 has_invalidator: bool,
2112 ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2113 let mut task = ctx.task(task_id, TaskDataCategory::All);
2114 let is_recomputation = task.is_dirty().is_none();
2115 let is_session_dependent = self.should_track_dependencies()
2118 && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2119 let Some(in_progress) = task.get_in_progress_mut() else {
2120 panic!("Task execution completed, but task is not in progress: {task:#?}");
2121 };
2122 if matches!(in_progress, InProgressState::Canceled) {
2123 return Ok(TaskExecutionCompletePrepareResult {
2124 new_children: Default::default(),
2125 is_now_immutable: false,
2126 #[cfg(feature = "verify_determinism")]
2127 no_output_set: false,
2128 #[cfg(feature = "task_dirty_cause")]
2129 function_id: None,
2130 new_output: None,
2131 output_dependent_tasks: Default::default(),
2132 is_recomputation,
2133 is_session_dependent,
2134 });
2135 }
2136 let &mut InProgressState::InProgress(box InProgressStateInner {
2137 stale,
2138 ref mut new_children,
2139 once_task: is_once_task,
2140 ..
2141 }) = in_progress
2142 else {
2143 panic!("Task execution completed, but task is not in progress: {task:#?}");
2144 };
2145
2146 #[cfg(not(feature = "no_fast_stale"))]
2148 if stale && !is_once_task {
2149 let stale_priority = compute_stale_priority(&task);
2150 let Some(InProgressState::InProgress(box InProgressStateInner {
2151 done_event,
2152 mut new_children,
2153 ..
2154 })) = task.take_in_progress()
2155 else {
2156 unreachable!();
2157 };
2158 let old = task.set_in_progress(InProgressState::Scheduled {
2159 done_event,
2160 reason: TaskExecutionReason::Stale,
2161 });
2162 debug_assert!(old.is_none(), "InProgress already exists");
2163 for task in task.iter_children() {
2166 new_children.remove(&task);
2167 }
2168 drop(task);
2169
2170 AggregationUpdateQueue::run(
2173 AggregationUpdateJob::DecreaseActiveCounts {
2174 task_ids: new_children.into_iter().collect(),
2175 },
2176 ctx,
2177 );
2178 return Err(stale_priority);
2179 }
2180
2181 let mut new_children = take(new_children);
2183
2184 #[cfg(feature = "task_dirty_cause")]
2186 let function_id = match task.get_task_type() {
2187 TaskTypeRef::Cached(task_type) => {
2188 Some(turbo_tasks::registry::get_function_id(task_type.native_fn))
2189 }
2190 TaskTypeRef::Transient(_) => None,
2191 };
2192
2193 #[cfg(feature = "verify_determinism")]
2195 if stateful {
2196 task.set_stateful(true);
2197 }
2198
2199 if has_invalidator {
2201 task.set_invalidator(true);
2202 }
2203
2204 if result.is_ok() || is_recomputation {
2214 let old_counters: FxHashMap<_, _> = task
2215 .iter_cell_type_max_index()
2216 .map(|(&k, &v)| (k, v))
2217 .collect();
2218 let mut counters_to_remove = old_counters.clone();
2219
2220 for (&cell_type, &max_index) in cell_counters.iter() {
2221 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2222 if old_max_index != max_index {
2223 task.insert_cell_type_max_index(cell_type, max_index);
2224 }
2225 } else {
2226 task.insert_cell_type_max_index(cell_type, max_index);
2227 }
2228 }
2229 for (cell_type, _) in counters_to_remove {
2230 task.remove_cell_type_max_index(&cell_type);
2231 }
2232 }
2233
2234 let mut queue = AggregationUpdateQueue::new();
2235
2236 let mut old_edges = Vec::new();
2237
2238 let has_children = !new_children.is_empty();
2239 let is_immutable = task.immutable();
2240 let task_dependencies_for_immutable =
2241 if !is_immutable
2243 && !is_session_dependent
2245 && !task.invalidator()
2247 && task.is_collectibles_dependencies_empty()
2249 {
2250 Some(
2251 task.iter_output_dependencies()
2253 .chain(task.iter_cell_dependencies().map(|dep| dep.cell_ref().task))
2254 .collect::<FxHashSet<_>>(),
2255 )
2256 } else {
2257 None
2258 };
2259
2260 if has_children {
2261 let _aggregation_number =
2263 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2264
2265 #[cfg(feature = "trace_task_details")]
2266 span.record("aggregation_number", _aggregation_number);
2267
2268 old_edges.extend(
2270 task.iter_children()
2271 .filter(|task| !new_children.remove(task))
2272 .map(OutdatedEdge::Child),
2273 );
2274 } else {
2275 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2276 }
2277
2278 old_edges.extend(
2279 task.iter_outdated_collectibles()
2280 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2281 );
2282
2283 if self.should_track_dependencies() {
2284 old_edges.extend(
2291 task.iter_outdated_cell_dependencies()
2292 .map(OutdatedEdge::CellDependency),
2293 );
2294 old_edges.extend(
2295 task.iter_outdated_output_dependencies()
2296 .map(OutdatedEdge::OutputDependency),
2297 );
2298 }
2299
2300 let current_output = task.get_output();
2302 #[cfg(feature = "verify_determinism")]
2303 let no_output_set = current_output.is_none();
2304 let new_output = match result {
2305 Ok(RawVc::TaskOutput(output_task_id)) => {
2306 if let Some(OutputValue::Output(current_task_id)) = current_output
2307 && *current_task_id == output_task_id
2308 {
2309 None
2310 } else {
2311 Some(OutputValue::Output(output_task_id))
2312 }
2313 }
2314 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2315 if let Some(OutputValue::Cell(CellRef {
2316 task: current_task_id,
2317 cell: current_cell,
2318 })) = current_output
2319 && *current_task_id == output_task_id
2320 && *current_cell == cell
2321 {
2322 None
2323 } else {
2324 Some(OutputValue::Cell(CellRef {
2325 task: output_task_id,
2326 cell,
2327 }))
2328 }
2329 }
2330 Ok(RawVc::LocalOutput(..)) => {
2331 panic!("Non-local tasks must not return a local Vc");
2332 }
2333 Err(err) => {
2334 if let Some(OutputValue::Error(old_error)) = current_output
2335 && **old_error == err
2336 {
2337 None
2338 } else {
2339 Some(OutputValue::Error(Arc::new((&err).into())))
2340 }
2341 }
2342 };
2343 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2344 if new_output.is_some() && ctx.should_track_dependencies() {
2346 output_dependent_tasks = task.iter_output_dependent().collect();
2347 }
2348
2349 drop(task);
2350
2351 let mut is_now_immutable = false;
2353 if let Some(dependencies) = task_dependencies_for_immutable
2354 && dependencies
2355 .iter()
2356 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2357 {
2358 is_now_immutable = true;
2359 }
2360 #[cfg(feature = "trace_task_details")]
2361 span.record("immutable", is_immutable || is_now_immutable);
2362
2363 if !queue.is_empty() || !old_edges.is_empty() {
2364 #[cfg(any(
2365 feature = "trace_task_completion",
2366 feature = "trace_aggregation_update_stats"
2367 ))]
2368 let _span =
2369 tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2370 .entered();
2371 #[cfg(feature = "trace_aggregation_update_stats")]
2375 {
2376 let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2377 _span.record("stats", tracing::field::debug(stats));
2378 }
2379 #[cfg(not(feature = "trace_aggregation_update_stats"))]
2380 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2381 }
2382
2383 Ok(TaskExecutionCompletePrepareResult {
2384 new_children,
2385 is_now_immutable,
2386 #[cfg(feature = "verify_determinism")]
2387 no_output_set,
2388 #[cfg(feature = "task_dirty_cause")]
2389 function_id,
2390 new_output,
2391 output_dependent_tasks,
2392 is_recomputation,
2393 is_session_dependent,
2394 })
2395 }
2396
2397 fn task_execution_completed_invalidate_output_dependent(
2398 &self,
2399 ctx: &mut impl ExecuteContext<'_>,
2400 task_id: TaskId,
2401 #[cfg(feature = "task_dirty_cause")] function_id: Option<FunctionId>,
2402 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2403 ) {
2404 debug_assert!(!output_dependent_tasks.is_empty());
2405
2406 #[cfg(feature = "task_dirty_cause")]
2407 let cause = match function_id {
2408 Some(function) => TaskDirtyCause::OutputChange { function },
2409 None => TaskDirtyCause::RootOutputChange,
2410 };
2411
2412 if output_dependent_tasks.len() > 1 {
2413 ctx.prepare_tasks(
2414 output_dependent_tasks
2415 .iter()
2416 .map(|&id| (id, TaskDataCategory::All)),
2417 "invalidate output dependents",
2418 );
2419 }
2420
2421 fn process_output_dependents(
2422 ctx: &mut impl ExecuteContext<'_>,
2423 task_id: TaskId,
2424 #[cfg(feature = "task_dirty_cause")] cause: &TaskDirtyCause,
2425 dependent_task_id: TaskId,
2426 queue: &mut AggregationUpdateQueue,
2427 ) {
2428 #[cfg(feature = "trace_task_output_dependencies")]
2429 let span = tracing::trace_span!(
2430 "invalidate output dependency",
2431 task = %task_id,
2432 dependent_task = %dependent_task_id,
2433 result = tracing::field::Empty,
2434 )
2435 .entered();
2436 let mut make_stale = true;
2437 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2438 let transient_task_type = dependent.get_transient_task_type();
2439 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2440 #[cfg(feature = "trace_task_output_dependencies")]
2442 span.record("result", "once task");
2443 return;
2444 }
2445 if dependent.outdated_output_dependencies_contains(&task_id) {
2446 #[cfg(feature = "trace_task_output_dependencies")]
2447 span.record("result", "outdated dependency");
2448 make_stale = false;
2453 } else if !dependent.output_dependencies_contains(&task_id) {
2454 #[cfg(feature = "trace_task_output_dependencies")]
2457 span.record("result", "no backward dependency");
2458 return;
2459 }
2460 make_task_dirty_internal(
2461 dependent,
2462 dependent_task_id,
2463 make_stale,
2464 #[cfg(feature = "task_dirty_cause")]
2465 cause.clone(),
2466 queue,
2467 ctx,
2468 );
2469 #[cfg(feature = "trace_task_output_dependencies")]
2470 span.record("result", "marked dirty");
2471 }
2472
2473 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2474 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2475 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2476 let _ = scope_and_block(chunks.len(), |scope| {
2477 for chunk in chunks {
2478 let child_ctx = ctx.child_context();
2479 #[cfg(feature = "task_dirty_cause")]
2480 let cause = &cause;
2481 scope.spawn(move || {
2482 let mut ctx = child_ctx.create();
2483 let mut queue = AggregationUpdateQueue::new();
2484 for dependent_task_id in chunk {
2485 process_output_dependents(
2486 &mut ctx,
2487 task_id,
2488 #[cfg(feature = "task_dirty_cause")]
2489 cause,
2490 dependent_task_id,
2491 &mut queue,
2492 )
2493 }
2494 queue.execute(&mut ctx);
2495 });
2496 }
2497 });
2498 } else {
2499 let mut queue = AggregationUpdateQueue::new();
2500 for dependent_task_id in output_dependent_tasks {
2501 process_output_dependents(
2502 ctx,
2503 task_id,
2504 #[cfg(feature = "task_dirty_cause")]
2505 &cause,
2506 dependent_task_id,
2507 &mut queue,
2508 );
2509 }
2510 queue.execute(ctx);
2511 }
2512 }
2513
2514 fn task_execution_completed_unfinished_children_dirty(
2515 &self,
2516 ctx: &mut impl ExecuteContext<'_>,
2517 new_children: &FxHashSet<TaskId>,
2518 ) {
2519 debug_assert!(!new_children.is_empty());
2520
2521 let mut queue = AggregationUpdateQueue::new();
2522 ctx.for_each_task_all(
2523 new_children.iter().copied(),
2524 "unfinished children dirty",
2525 |child_task, ctx| {
2526 if !child_task.has_output() {
2527 let child_id = child_task.id();
2528 make_task_dirty_internal(
2529 child_task,
2530 child_id,
2531 false,
2532 #[cfg(feature = "task_dirty_cause")]
2533 TaskDirtyCause::InitialDirty,
2534 &mut queue,
2535 ctx,
2536 );
2537 }
2538 },
2539 );
2540
2541 queue.execute(ctx);
2542 }
2543
2544 fn task_execution_completed_connect(
2545 &self,
2546 ctx: &mut impl ExecuteContext<'_>,
2547 task_id: TaskId,
2548 new_children: FxHashSet<TaskId>,
2549 ) -> Option<TaskPriority> {
2550 debug_assert!(!new_children.is_empty());
2551
2552 let mut task = ctx.task(task_id, TaskDataCategory::All);
2553 let Some(in_progress) = task.get_in_progress() else {
2554 panic!("Task execution completed, but task is not in progress: {task:#?}");
2555 };
2556 if matches!(in_progress, InProgressState::Canceled) {
2557 return None;
2559 }
2560 let InProgressState::InProgress(box InProgressStateInner {
2561 #[cfg(not(feature = "no_fast_stale"))]
2562 stale,
2563 once_task: is_once_task,
2564 ..
2565 }) = in_progress
2566 else {
2567 panic!("Task execution completed, but task is not in progress: {task:#?}");
2568 };
2569
2570 #[cfg(not(feature = "no_fast_stale"))]
2572 if *stale && !is_once_task {
2573 let stale_priority = compute_stale_priority(&task);
2574 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2575 task.take_in_progress()
2576 else {
2577 unreachable!();
2578 };
2579 let old = task.set_in_progress(InProgressState::Scheduled {
2580 done_event,
2581 reason: TaskExecutionReason::Stale,
2582 });
2583 debug_assert!(old.is_none(), "InProgress already exists");
2584 drop(task);
2585
2586 AggregationUpdateQueue::run(
2589 AggregationUpdateJob::DecreaseActiveCounts {
2590 task_ids: new_children.into_iter().collect(),
2591 },
2592 ctx,
2593 );
2594 return Some(stale_priority);
2595 }
2596
2597 let has_active_count = ctx.should_track_activeness()
2598 && task
2599 .get_activeness()
2600 .is_some_and(|activeness| activeness.active_counter > 0);
2601 connect_children(
2602 ctx,
2603 task_id,
2604 task,
2605 new_children,
2606 has_active_count,
2607 ctx.should_track_activeness(),
2608 );
2609
2610 None
2611 }
2612
2613 #[allow(clippy::type_complexity)]
2614 fn task_execution_completed_finish(
2615 &self,
2616 ctx: &mut impl ExecuteContext<'_>,
2617 task_id: TaskId,
2618 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2619 new_output: Option<OutputValue>,
2620 is_now_immutable: bool,
2621 is_session_dependent: bool,
2622 ) -> (
2623 Option<TaskPriority>,
2624 Option<
2625 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2626 >,
2627 ) {
2628 let mut task = ctx.task(task_id, TaskDataCategory::All);
2629 let Some(in_progress) = task.take_in_progress() else {
2630 panic!("Task execution completed, but task is not in progress: {task:#?}");
2631 };
2632 if matches!(in_progress, InProgressState::Canceled) {
2633 return (None, None);
2635 }
2636 let InProgressState::InProgress(box InProgressStateInner {
2637 done_event,
2638 once_task: is_once_task,
2639 stale,
2640 marked_as_completed: _,
2641 new_children,
2642 }) = in_progress
2643 else {
2644 panic!("Task execution completed, but task is not in progress: {task:#?}");
2645 };
2646 debug_assert!(new_children.is_empty());
2647
2648 if stale && !is_once_task {
2650 let stale_priority = compute_stale_priority(&task);
2651 let old = task.set_in_progress(InProgressState::Scheduled {
2652 done_event,
2653 reason: TaskExecutionReason::Stale,
2654 });
2655 debug_assert!(old.is_none(), "InProgress already exists");
2656 return (Some(stale_priority), None);
2657 }
2658
2659 let mut old_content = None;
2661 if let Some(value) = new_output {
2662 old_content = task.set_output(value);
2663 }
2664
2665 if is_now_immutable {
2668 task.set_immutable(true);
2669 }
2670
2671 let in_progress_cells = task.take_in_progress_cells();
2673 if let Some(ref cells) = in_progress_cells {
2674 for state in cells.values() {
2675 state.event.notify(usize::MAX);
2676 }
2677 }
2678
2679 let new_dirtyness = if is_session_dependent {
2681 Some(Dirtyness::SessionDependent)
2682 } else {
2683 None
2684 };
2685 #[cfg(feature = "verify_determinism")]
2686 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2687 let data_update = task.update_dirty_state(new_dirtyness);
2688
2689 #[cfg(feature = "verify_determinism")]
2693 let stale_priority: Option<TaskPriority> =
2694 ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2695 .then(TaskPriority::leaf);
2696 #[cfg(not(feature = "verify_determinism"))]
2697 let stale_priority: Option<TaskPriority> = None;
2698 if stale_priority.is_some() {
2699 let old = task.set_in_progress(InProgressState::Scheduled {
2700 done_event,
2701 reason: TaskExecutionReason::Stale,
2702 });
2703 debug_assert!(old.is_none(), "InProgress already exists");
2704 drop(task);
2705 } else {
2706 drop(task);
2707
2708 done_event.notify(usize::MAX);
2710 }
2711
2712 drop(old_content);
2713
2714 if let Some(data_update) = data_update {
2715 AggregationUpdateQueue::run(data_update, ctx);
2716 }
2717
2718 (stale_priority, in_progress_cells)
2720 }
2721
2722 fn task_execution_completed_cleanup(
2723 &self,
2724 ctx: &mut impl ExecuteContext<'_>,
2725 task_id: TaskId,
2726 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2727 is_error: bool,
2728 is_recomputation: bool,
2729 ) -> Vec<SharedReference> {
2730 let mut task = ctx.task(task_id, TaskDataCategory::All);
2731 let mut removed_cell_data = Vec::new();
2732 if !is_error || is_recomputation {
2738 let to_remove: Vec<_> = task
2744 .iter_cell_data()
2745 .filter_map(|(cell, _)| {
2746 cell_counters
2747 .get(&cell.type_id)
2748 .is_none_or(|start_index| cell.index >= *start_index)
2749 .then_some(*cell)
2750 })
2751 .collect();
2752 removed_cell_data.reserve_exact(to_remove.len());
2753 for cell in to_remove {
2754 if let Some(data) = task.remove_cell_data(&cell) {
2755 removed_cell_data.push(data);
2756 }
2757 }
2758 let to_remove_hash: Vec<_> = task
2760 .iter_cell_data_hash()
2761 .filter_map(|(cell, _)| {
2762 cell_counters
2763 .get(&cell.type_id)
2764 .is_none_or(|start_index| cell.index >= *start_index)
2765 .then_some(*cell)
2766 })
2767 .collect();
2768 for cell in to_remove_hash {
2769 task.remove_cell_data_hash(&cell);
2770 }
2771 }
2772
2773 task.cleanup_after_execution();
2777
2778 drop(task);
2779
2780 removed_cell_data
2782 }
2783
2784 fn log_unrecoverable_persist_error() {
2787 eprintln!(
2788 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2789 background persisting process."
2790 );
2791 }
2792
2793 fn run_backend_job<'a>(
2794 self: &'a Arc<Self>,
2795 job: TurboTasksBackendJob,
2796 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2797 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2798 Box::pin(async move {
2799 match job {
2800 TurboTasksBackendJob::Snapshot => {
2801 debug_assert!(self.should_persist());
2802
2803 let mut last_snapshot = self.start_time;
2804 let mut idle_start_listener = self.idle_start_event.listen();
2805 let mut idle_end_listener = self.idle_end_event.listen();
2806 let mut fresh_idle = true;
2809 let mut evicted = false;
2810 let mut is_first = true;
2811 'outer: loop {
2812 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2813 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2814 let idle_timeout = *IDLE_TIMEOUT;
2815 let (time, mut reason) = if is_first {
2816 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2817 } else {
2818 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2819 };
2820
2821 let until = last_snapshot + time;
2822 if until > Instant::now() {
2823 let mut stop_listener = self.stopping_event.listen();
2824 if self.stopping.load(Ordering::Acquire) {
2825 return;
2826 }
2827 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2828 Instant::now() + idle_timeout
2829 } else {
2830 far_future()
2831 };
2832 loop {
2833 tokio::select! {
2834 _ = &mut stop_listener => {
2835 return;
2836 },
2837 _ = &mut idle_start_listener => {
2838 idle_time = Instant::now() + idle_timeout;
2839 idle_start_listener = self.idle_start_event.listen()
2840 },
2841 _ = &mut idle_end_listener => {
2842 idle_time = far_future();
2843 idle_end_listener = self.idle_end_event.listen()
2844 },
2845 _ = tokio::time::sleep_until(until) => {
2846 break;
2847 },
2848 _ = tokio::time::sleep_until(idle_time) => {
2849 if turbo_tasks.is_idle() {
2850 reason = "idle timeout";
2851 break;
2852 }
2853 },
2854 }
2855 }
2856 }
2857
2858 let this = self.clone();
2859 let background_span =
2863 tracing::info_span!(parent: None, "background snapshot");
2864 match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2865 Err(err) => {
2866 eprintln!("Persisting failed: {err:?}");
2869 Self::log_unrecoverable_persist_error();
2870 return;
2871 }
2872 Ok((snapshot_start, new_data)) => {
2873 fresh_idle = new_data;
2875 is_first = false;
2876 last_snapshot = snapshot_start;
2877
2878 macro_rules! check_idle_ended {
2882 () => {{
2883 tokio::select! {
2884 biased;
2885 _ = &mut idle_end_listener => {
2886 idle_end_listener = self.idle_end_event.listen();
2887 true
2888 },
2889 _ = std::future::ready(()) => false,
2890 }
2891 }};
2892 }
2893 let mut ran_eviction = false;
2911 if this.should_evict() && (new_data || !evicted) {
2912 if check_idle_ended!() {
2913 continue 'outer;
2916 }
2917 evicted = true;
2918 ran_eviction = true;
2919 this.storage.evict_after_snapshot(background_span.id());
2920 }
2921
2922 let mut ran_compaction = false;
2929 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2930 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2931 if check_idle_ended!() {
2932 continue 'outer;
2933 }
2934 let _compact_span = tracing::info_span!(
2938 parent: background_span.id(),
2939 "compact database"
2940 )
2941 .entered();
2942 match self.backing_storage.compact() {
2943 Ok(true) => {
2944 ran_compaction = true;
2945 }
2946 Ok(false) => break,
2947 Err(err) => {
2948 eprintln!("Compaction failed: {err:?}");
2949 if self.backing_storage.has_unrecoverable_write_error()
2950 {
2951 Self::log_unrecoverable_persist_error();
2952 return;
2953 }
2954 break;
2955 }
2956 }
2957 }
2958 if check_idle_ended!() {
2959 continue 'outer;
2960 }
2961 if new_data || ran_compaction || ran_eviction {
2966 turbo_tasks_malloc::TurboMalloc::collect(true);
2967 }
2968 }
2969 }
2970 }
2971 }
2972 }
2973 })
2974 }
2975
2976 fn try_read_own_task_cell(
2977 &self,
2978 task_id: TaskId,
2979 cell: CellId,
2980 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2981 ) -> Result<TypedCellContent> {
2982 let mut ctx = self.execute_context(turbo_tasks);
2983 let task = ctx.task(task_id, TaskDataCategory::Data);
2984 if let Some(content) = task.get_cell_data(&cell).cloned() {
2985 Ok(CellContent(Some(content)).into_typed(cell.type_id))
2986 } else {
2987 Ok(CellContent(None).into_typed(cell.type_id))
2988 }
2989 }
2990
2991 fn read_task_collectibles(
2992 &self,
2993 task_id: TaskId,
2994 collectible_type: TraitTypeId,
2995 reader_id: Option<TaskId>,
2996 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2997 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2998 let mut ctx = self.execute_context(turbo_tasks);
2999 let mut collectibles = AutoMap::default();
3000 {
3001 let mut task = ctx.task(task_id, TaskDataCategory::All);
3002 if task
3003 .get_persistent_task_type()
3004 .is_some_and(|t| !t.native_fn.is_root)
3005 {
3006 drop(task);
3007 panic!(
3008 "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
3009 is missing on the task.",
3010 self.debug_get_task_description(task_id),
3011 reader_id.map_or_else(
3012 || "unknown".to_string(),
3013 |r| self.debug_get_task_description(r)
3014 )
3015 );
3016 }
3017 for (collectible, count) in task.iter_aggregated_collectibles() {
3018 if *count > 0 && collectible.collectible_type == collectible_type {
3019 *collectibles
3020 .entry(RawVc::TaskCell(
3021 collectible.cell.task,
3022 collectible.cell.cell,
3023 ))
3024 .or_insert(0) += 1;
3025 }
3026 }
3027 for (&collectible, &count) in task.iter_collectibles() {
3028 if collectible.collectible_type == collectible_type {
3029 *collectibles
3030 .entry(RawVc::TaskCell(
3031 collectible.cell.task,
3032 collectible.cell.cell,
3033 ))
3034 .or_insert(0) += count;
3035 }
3036 }
3037 if let Some(reader_id) = reader_id {
3038 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3039 }
3040 }
3041 if let Some(reader_id) = reader_id {
3042 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3043 let target = CollectiblesRef {
3044 task: task_id,
3045 collectible_type,
3046 };
3047 if !reader.remove_outdated_collectibles_dependencies(&target) {
3048 let _ = reader.add_collectibles_dependencies(target);
3049 }
3050 }
3051 collectibles
3052 }
3053
3054 fn emit_collectible(
3055 &self,
3056 collectible_type: TraitTypeId,
3057 collectible: RawVc,
3058 task_id: TaskId,
3059 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3060 ) {
3061 self.assert_valid_collectible(task_id, collectible);
3062
3063 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3064 panic!("Collectibles need to be resolved");
3065 };
3066 let cell = CellRef {
3067 task: collectible_task,
3068 cell,
3069 };
3070 operation::UpdateCollectibleOperation::run(
3071 task_id,
3072 CollectibleRef {
3073 collectible_type,
3074 cell,
3075 },
3076 1,
3077 self.execute_context(turbo_tasks),
3078 );
3079 }
3080
3081 fn unemit_collectible(
3082 &self,
3083 collectible_type: TraitTypeId,
3084 collectible: RawVc,
3085 count: u32,
3086 task_id: TaskId,
3087 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3088 ) {
3089 self.assert_valid_collectible(task_id, collectible);
3090
3091 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3092 panic!("Collectibles need to be resolved");
3093 };
3094 let cell = CellRef {
3095 task: collectible_task,
3096 cell,
3097 };
3098 operation::UpdateCollectibleOperation::run(
3099 task_id,
3100 CollectibleRef {
3101 collectible_type,
3102 cell,
3103 },
3104 -(i32::try_from(count).unwrap()),
3105 self.execute_context(turbo_tasks),
3106 );
3107 }
3108
3109 fn update_task_cell(
3110 &self,
3111 task_id: TaskId,
3112 cell: CellId,
3113 content: CellContent,
3114 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3115 content_hash: Option<CellHash>,
3116 verification_mode: VerificationMode,
3117 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3118 ) {
3119 operation::UpdateCellOperation::run(
3120 task_id,
3121 cell,
3122 content,
3123 updated_key_hashes,
3124 content_hash,
3125 verification_mode,
3126 self.execute_context(turbo_tasks),
3127 );
3128 }
3129
3130 fn mark_own_task_as_finished(
3131 &self,
3132 task: TaskId,
3133 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3134 ) {
3135 let mut ctx = self.execute_context(turbo_tasks);
3136 let mut task = ctx.task(task, TaskDataCategory::Data);
3137 if let Some(InProgressState::InProgress(box InProgressStateInner {
3138 marked_as_completed,
3139 ..
3140 })) = task.get_in_progress_mut()
3141 {
3142 *marked_as_completed = true;
3143 }
3148 }
3149
3150 fn connect_task(
3151 &self,
3152 task: TaskId,
3153 parent_task: Option<TaskId>,
3154 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3155 ) {
3156 self.assert_not_persistent_calling_transient(parent_task, task, None);
3157 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3158 }
3159
3160 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3161 let task_id = self.transient_task_id_factory.get();
3162 {
3163 let mut task = self.storage.access_mut(task_id);
3164 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3165 }
3166 #[cfg(feature = "verify_aggregation_graph")]
3167 self.root_tasks.lock().insert(task_id);
3168 task_id
3169 }
3170
3171 fn dispose_root_task(
3172 &self,
3173 task_id: TaskId,
3174 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3175 ) {
3176 #[cfg(feature = "verify_aggregation_graph")]
3177 self.root_tasks.lock().remove(&task_id);
3178
3179 let mut ctx = self.execute_context(turbo_tasks);
3180 let mut task = ctx.task(task_id, TaskDataCategory::All);
3181 let is_dirty = task.is_dirty();
3182 let has_dirty_containers = task.has_dirty_containers();
3183 if is_dirty.is_some() || has_dirty_containers {
3184 if let Some(activeness_state) = task.get_activeness_mut() {
3185 activeness_state.unset_root_type();
3187 activeness_state.set_active_until_clean();
3188 };
3189 } else if let Some(activeness_state) = task.take_activeness() {
3190 activeness_state.all_clean_event.notify(usize::MAX);
3193 }
3194 }
3195
3196 #[cfg(feature = "verify_aggregation_graph")]
3197 fn verify_aggregation_graph(
3198 &self,
3199 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3200 idle: bool,
3201 ) {
3202 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3203 return;
3204 }
3205 use std::{collections::VecDeque, env, io::stdout};
3206
3207 use crate::backend::operation::{get_uppers, is_aggregating_node};
3208
3209 let mut ctx = self.execute_context(turbo_tasks);
3210 let root_tasks = self.root_tasks.lock().clone();
3211
3212 for task_id in root_tasks.into_iter() {
3213 let mut queue = VecDeque::new();
3214 let mut visited = FxHashSet::default();
3215 let mut aggregated_nodes = FxHashSet::default();
3216 let mut collectibles = FxHashMap::default();
3217 let root_task_id = task_id;
3218 visited.insert(task_id);
3219 aggregated_nodes.insert(task_id);
3220 queue.push_back(task_id);
3221 let mut counter = 0;
3222 while let Some(task_id) = queue.pop_front() {
3223 counter += 1;
3224 if counter % 100000 == 0 {
3225 println!(
3226 "queue={}, visited={}, aggregated_nodes={}",
3227 queue.len(),
3228 visited.len(),
3229 aggregated_nodes.len()
3230 );
3231 }
3232 let task = ctx.task(task_id, TaskDataCategory::All);
3233 if idle && !self.is_idle.load(Ordering::Relaxed) {
3234 return;
3235 }
3236
3237 let uppers = get_uppers(&task);
3238 if task_id != root_task_id
3239 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3240 {
3241 panic!(
3242 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3243 {:?})",
3244 task_id,
3245 task.get_task_description(),
3246 uppers
3247 );
3248 }
3249
3250 for (collectible, _) in task.iter_aggregated_collectibles() {
3251 collectibles
3252 .entry(*collectible)
3253 .or_insert_with(|| (false, Vec::new()))
3254 .1
3255 .push(task_id);
3256 }
3257
3258 for (&collectible, &value) in task.iter_collectibles() {
3259 if value > 0 {
3260 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3261 *flag = true
3262 } else {
3263 panic!(
3264 "Task {} has a collectible {:?} that is not in any upper task",
3265 task_id, collectible
3266 );
3267 }
3268 }
3269 }
3270
3271 let is_dirty = task.has_dirty();
3272 let has_dirty_container = task.has_dirty_containers();
3273 let should_be_in_upper = is_dirty || has_dirty_container;
3274
3275 let aggregation_number = get_aggregation_number(&task);
3276 if is_aggregating_node(aggregation_number) {
3277 aggregated_nodes.insert(task_id);
3278 }
3279 for child_id in task.iter_children() {
3286 if visited.insert(child_id) {
3288 queue.push_back(child_id);
3289 }
3290 }
3291 drop(task);
3292
3293 if should_be_in_upper {
3294 for upper_id in uppers {
3295 let upper = ctx.task(upper_id, TaskDataCategory::All);
3296 let in_upper = upper
3297 .get_aggregated_dirty_containers(&task_id)
3298 .is_some_and(|&dirty| dirty > 0);
3299 if !in_upper {
3300 let containers: Vec<_> = upper
3301 .iter_aggregated_dirty_containers()
3302 .map(|(&k, &v)| (k, v))
3303 .collect();
3304 let upper_task_desc = upper.get_task_description();
3305 drop(upper);
3306 panic!(
3307 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3308 ({})\nThese dirty containers are present:\n{:#?}",
3309 task_id,
3310 ctx.task(task_id, TaskDataCategory::Data)
3311 .get_task_description(),
3312 upper_id,
3313 upper_task_desc,
3314 containers,
3315 );
3316 }
3317 }
3318 }
3319 }
3320
3321 for (collectible, (flag, task_ids)) in collectibles {
3322 if !flag {
3323 use std::io::Write;
3324 let mut stdout = stdout().lock();
3325 writeln!(
3326 stdout,
3327 "{:?} that is not emitted in any child task but in these aggregated \
3328 tasks: {:#?}",
3329 collectible,
3330 task_ids
3331 .iter()
3332 .map(|t| format!(
3333 "{t} {}",
3334 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3335 ))
3336 .collect::<Vec<_>>()
3337 )
3338 .unwrap();
3339
3340 let task_id = collectible.cell.task;
3341 let mut queue = {
3342 let task = ctx.task(task_id, TaskDataCategory::All);
3343 get_uppers(&task)
3344 };
3345 let mut visited = FxHashSet::default();
3346 for &upper_id in queue.iter() {
3347 visited.insert(upper_id);
3348 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3349 }
3350 while let Some(task_id) = queue.pop() {
3351 let task = ctx.task(task_id, TaskDataCategory::All);
3352 let desc = task.get_task_description();
3353 let aggregated_collectible = task
3354 .get_aggregated_collectibles(&collectible)
3355 .copied()
3356 .unwrap_or_default();
3357 let uppers = get_uppers(&task);
3358 drop(task);
3359 writeln!(
3360 stdout,
3361 "upper {task_id} {desc} collectible={aggregated_collectible}"
3362 )
3363 .unwrap();
3364 if task_ids.contains(&task_id) {
3365 writeln!(
3366 stdout,
3367 "Task has an upper connection to an aggregated task that doesn't \
3368 reference it. Upper connection is invalid!"
3369 )
3370 .unwrap();
3371 }
3372 for upper_id in uppers {
3373 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3374 if !visited.contains(&upper_id) {
3375 queue.push(upper_id);
3376 }
3377 }
3378 }
3379 panic!("See stdout for more details");
3380 }
3381 }
3382 }
3383 }
3384
3385 fn assert_not_persistent_calling_transient(
3386 &self,
3387 parent_id: Option<TaskId>,
3388 child_id: TaskId,
3389 cell_id: Option<CellId>,
3390 ) {
3391 if let Some(parent_id) = parent_id
3392 && !parent_id.is_transient()
3393 && child_id.is_transient()
3394 {
3395 self.panic_persistent_calling_transient(
3396 self.debug_get_task_description(parent_id),
3397 self.debug_get_cached_task_type(child_id).as_deref(),
3398 cell_id,
3399 );
3400 }
3401 }
3402
3403 fn panic_persistent_calling_transient(
3404 &self,
3405 parent: String,
3406 child: Option<&CachedTaskType>,
3407 cell_id: Option<CellId>,
3408 ) -> ! {
3409 let transient_reason = if let Some(child) = child {
3410 Cow::Owned(format!(
3411 " The callee is transient because it depends on:\n{}",
3412 self.debug_trace_transient_task(child, cell_id),
3413 ))
3414 } else {
3415 Cow::Borrowed("")
3416 };
3417 panic!(
3418 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3419 parent,
3420 child.map_or("unknown", |t| t.get_name()),
3421 transient_reason,
3422 );
3423 }
3424
3425 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3426 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3428 let task_info = if let Some(col_task_ty) = collectible
3430 .try_get_task_id()
3431 .map(|t| self.debug_get_task_description(t))
3432 {
3433 Cow::Owned(format!(" (return type of {col_task_ty})"))
3434 } else {
3435 Cow::Borrowed("")
3436 };
3437 panic!("Collectible{task_info} must be a ResolvedVc")
3438 };
3439 if col_task_id.is_transient() && !task_id.is_transient() {
3440 let transient_reason =
3441 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3442 Cow::Owned(format!(
3443 ". The collectible is transient because it depends on:\n{}",
3444 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3445 ))
3446 } else {
3447 Cow::Borrowed("")
3448 };
3449 panic!(
3451 "Collectible is transient, transient collectibles cannot be emitted from \
3452 persistent tasks{transient_reason}",
3453 )
3454 }
3455 }
3456}
3457
3458impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3459 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3460 self.0.startup(turbo_tasks);
3461 }
3462
3463 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3464 self.0.stopping();
3465 }
3466
3467 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3468 self.0.stop(turbo_tasks);
3469 }
3470
3471 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3472 self.0.idle_start(turbo_tasks);
3473 }
3474
3475 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3476 self.0.idle_end();
3477 }
3478
3479 fn get_or_create_task(
3480 &self,
3481 native_fn: &'static NativeFunction,
3482 this: Option<RawVc>,
3483 arg: &mut dyn StackDynTaskInputs,
3484 parent_task: Option<TaskId>,
3485 persistence: TaskPersistence,
3486 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3487 ) -> TaskId {
3488 self.0
3489 .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3490 }
3491
3492 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3493 self.0.invalidate_task(task_id, turbo_tasks);
3494 }
3495
3496 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3497 self.0.invalidate_tasks(tasks, turbo_tasks);
3498 }
3499
3500 fn invalidate_tasks_set(
3501 &self,
3502 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3503 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3504 ) {
3505 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3506 }
3507
3508 fn invalidate_serialization(
3509 &self,
3510 task_id: TaskId,
3511 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3512 ) {
3513 self.0.invalidate_serialization(task_id, turbo_tasks);
3514 }
3515
3516 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3517 self.0.task_execution_canceled(task, turbo_tasks)
3518 }
3519
3520 fn try_start_task_execution(
3521 &self,
3522 task_id: TaskId,
3523 priority: TaskPriority,
3524 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3525 ) -> Option<TaskExecutionSpec<'_>> {
3526 self.0
3527 .try_start_task_execution(task_id, priority, turbo_tasks)
3528 }
3529
3530 fn task_execution_completed(
3531 &self,
3532 task_id: TaskId,
3533 result: Result<RawVc, TurboTasksExecutionError>,
3534 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3535 #[cfg(feature = "verify_determinism")] stateful: bool,
3536 has_invalidator: bool,
3537 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3538 ) -> Option<TaskPriority> {
3539 self.0.task_execution_completed(
3540 task_id,
3541 result,
3542 cell_counters,
3543 #[cfg(feature = "verify_determinism")]
3544 stateful,
3545 has_invalidator,
3546 turbo_tasks,
3547 )
3548 }
3549
3550 type BackendJob = TurboTasksBackendJob;
3551
3552 fn run_backend_job<'a>(
3553 &'a self,
3554 job: Self::BackendJob,
3555 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3556 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3557 self.0.run_backend_job(job, turbo_tasks)
3558 }
3559
3560 fn try_read_task_output(
3561 &self,
3562 task_id: TaskId,
3563 reader: Option<TaskId>,
3564 options: ReadOutputOptions,
3565 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3566 ) -> Result<Result<RawVc, EventListener>> {
3567 self.0
3568 .try_read_task_output(task_id, reader, options, turbo_tasks)
3569 }
3570
3571 fn try_read_task_cell(
3572 &self,
3573 task_id: TaskId,
3574 cell: CellId,
3575 reader: Option<TaskId>,
3576 options: ReadCellOptions,
3577 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3578 ) -> Result<Result<TypedCellContent, EventListener>> {
3579 self.0
3580 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3581 }
3582
3583 fn try_read_own_task_cell(
3584 &self,
3585 task_id: TaskId,
3586 cell: CellId,
3587 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3588 ) -> Result<TypedCellContent> {
3589 self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3590 }
3591
3592 fn read_task_collectibles(
3593 &self,
3594 task_id: TaskId,
3595 collectible_type: TraitTypeId,
3596 reader: Option<TaskId>,
3597 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3598 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3599 self.0
3600 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3601 }
3602
3603 fn emit_collectible(
3604 &self,
3605 collectible_type: TraitTypeId,
3606 collectible: RawVc,
3607 task_id: TaskId,
3608 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3609 ) {
3610 self.0
3611 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3612 }
3613
3614 fn unemit_collectible(
3615 &self,
3616 collectible_type: TraitTypeId,
3617 collectible: RawVc,
3618 count: u32,
3619 task_id: TaskId,
3620 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3621 ) {
3622 self.0
3623 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3624 }
3625
3626 fn update_task_cell(
3627 &self,
3628 task_id: TaskId,
3629 cell: CellId,
3630 content: CellContent,
3631 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3632 content_hash: Option<CellHash>,
3633 verification_mode: VerificationMode,
3634 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3635 ) {
3636 self.0.update_task_cell(
3637 task_id,
3638 cell,
3639 content,
3640 updated_key_hashes,
3641 content_hash,
3642 verification_mode,
3643 turbo_tasks,
3644 );
3645 }
3646
3647 fn mark_own_task_as_finished(
3648 &self,
3649 task_id: TaskId,
3650 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3651 ) {
3652 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3653 }
3654
3655 fn connect_task(
3656 &self,
3657 task: TaskId,
3658 parent_task: Option<TaskId>,
3659 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3660 ) {
3661 self.0.connect_task(task, parent_task, turbo_tasks);
3662 }
3663
3664 fn create_transient_task(
3665 &self,
3666 task_type: TransientTaskType,
3667 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3668 ) -> TaskId {
3669 self.0.create_transient_task(task_type)
3670 }
3671
3672 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3673 self.0.dispose_root_task(task_id, turbo_tasks);
3674 }
3675
3676 fn task_statistics(&self) -> &TaskStatisticsApi {
3677 &self.0.task_statistics
3678 }
3679
3680 fn is_tracking_dependencies(&self) -> bool {
3681 self.0.options.dependency_tracking
3682 }
3683
3684 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3685 self.0.get_task_name(task, turbo_tasks)
3686 }
3687}
3688
3689enum DebugTraceTransientTask {
3690 Cached {
3691 task_name: &'static str,
3692 cell_type_id: Option<ValueTypeId>,
3693 cause_self: Option<Box<DebugTraceTransientTask>>,
3694 cause_args: Vec<DebugTraceTransientTask>,
3695 },
3696 Collapsed {
3698 task_name: &'static str,
3699 cell_type_id: Option<ValueTypeId>,
3700 },
3701 Uncached {
3702 cell_type_id: Option<ValueTypeId>,
3703 },
3704}
3705
3706impl DebugTraceTransientTask {
3707 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3708 let indent = " ".repeat(level);
3709 f.write_str(&indent)?;
3710
3711 fn fmt_cell_type_id(
3712 f: &mut fmt::Formatter<'_>,
3713 cell_type_id: Option<ValueTypeId>,
3714 ) -> fmt::Result {
3715 if let Some(ty) = cell_type_id {
3716 write!(
3717 f,
3718 " (read cell of type {})",
3719 get_value_type(ty).ty.global_name
3720 )
3721 } else {
3722 Ok(())
3723 }
3724 }
3725
3726 match self {
3728 Self::Cached {
3729 task_name,
3730 cell_type_id,
3731 ..
3732 }
3733 | Self::Collapsed {
3734 task_name,
3735 cell_type_id,
3736 ..
3737 } => {
3738 f.write_str(task_name)?;
3739 fmt_cell_type_id(f, *cell_type_id)?;
3740 if matches!(self, Self::Collapsed { .. }) {
3741 f.write_str(" (collapsed)")?;
3742 }
3743 }
3744 Self::Uncached { cell_type_id } => {
3745 f.write_str("unknown transient task")?;
3746 fmt_cell_type_id(f, *cell_type_id)?;
3747 }
3748 }
3749 f.write_char('\n')?;
3750
3751 if let Self::Cached {
3753 cause_self,
3754 cause_args,
3755 ..
3756 } = self
3757 {
3758 if let Some(c) = cause_self {
3759 writeln!(f, "{indent} self:")?;
3760 c.fmt_indented(f, level + 1)?;
3761 }
3762 if !cause_args.is_empty() {
3763 writeln!(f, "{indent} args:")?;
3764 for c in cause_args {
3765 c.fmt_indented(f, level + 1)?;
3766 }
3767 }
3768 }
3769 Ok(())
3770 }
3771}
3772
3773impl fmt::Display for DebugTraceTransientTask {
3774 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3775 self.fmt_indented(f, 0)
3776 }
3777}
3778
3779fn far_future() -> Instant {
3781 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3786}
3787
3788fn encode_task_data(
3800 task: TaskId,
3801 data: &TaskStorage,
3802 category: SpecificTaskDataCategory,
3803 scratch_buffer: &mut TurboBincodeBuffer,
3804) -> Result<TurboBincodeBuffer> {
3805 scratch_buffer.clear();
3806 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3807 data.encode(category, &mut encoder)?;
3808
3809 if cfg!(feature = "verify_serialization") {
3810 TaskStorage::new()
3811 .decode(
3812 category,
3813 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3814 )
3815 .with_context(|| {
3816 format!(
3817 "expected to be able to decode serialized data for '{category:?}' information \
3818 for {task}"
3819 )
3820 })?;
3821 }
3822 Ok(SmallVec::from_slice(scratch_buffer))
3823}