1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9 borrow::Cow,
10 fmt::{self, Write},
11 future::Future,
12 hash::BuildHasherDefault,
13 mem::take,
14 pin::Pin,
15 sync::{
16 Arc, LazyLock,
17 atomic::{AtomicBool, Ordering},
18 },
19 time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32 CellId, DynTaskInputsStorage, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
33 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
34 TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasks, TurboTasksCallApi,
35 TurboTasksPanic, ValueTypeId,
36 backend::{
37 Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38 TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40 VerificationMode,
41 },
42 event::{Event, EventDescription, EventListener},
43 macro_helpers::NativeFunction,
44 message_queue::{TimingEvent, TraceEvent},
45 registry::get_value_type,
46 scope::scope_and_block,
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51#[cfg(feature = "task_dirty_cause")]
52use turbo_tasks::{FunctionId, TaskDirtyCause};
53
54pub use self::{
55 operation::AnyOperation,
56 storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
57};
58use crate::{
59 backend::{
60 operation::{
61 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64 connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65 prepare_new_children,
66 },
67 snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68 storage::Storage,
69 storage_schema::{TaskStorage, TaskStorageAccessors},
70 },
71 backing_storage::{SnapshotItem, SnapshotMeta, compute_task_type_hash},
72 data::{
73 ActivenessState, CellDependency, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
74 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, TransientTask,
75 },
76 error::TaskError,
77 kv_backing_storage::TurboBackingStorage,
78 utils::{
79 dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
80 shard_amount::compute_shard_amount,
81 },
82};
83
84const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
88
89static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
92 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
93 .ok()
94 .and_then(|v| v.parse::<u64>().ok())
95 .map(Duration::from_millis)
96 .unwrap_or(Duration::from_secs(2))
97});
98
99fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
106 TaskPriority::invalidation(
107 task.get_leaf_distance()
108 .copied()
109 .unwrap_or_default()
110 .distance,
111 )
112 .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
113}
114
115pub enum StorageMode {
116 ReadOnly,
118 ReadWrite,
121 ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127 pub dependency_tracking: bool,
132
133 pub active_tracking: bool,
139
140 pub storage_mode: Option<StorageMode>,
142
143 pub num_workers: Option<usize>,
146
147 pub small_preallocation: bool,
149
150 pub evict_after_snapshot: bool,
154}
155
156impl Default for BackendOptions {
157 fn default() -> Self {
158 Self {
159 dependency_tracking: true,
160 active_tracking: true,
161 storage_mode: Some(StorageMode::ReadWrite),
162 num_workers: None,
163 small_preallocation: false,
164 evict_after_snapshot: false,
165 }
166 }
167}
168
169pub enum TurboTasksBackendJob {
170 Snapshot,
171}
172
173pub struct TurboTasksBackend {
174 options: BackendOptions,
175
176 start_time: Instant,
177
178 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
179 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
180
181 storage: Storage,
182
183 snapshot_coord: SnapshotCoordinator,
186 snapshot_in_progress: Mutex<()>,
191
192 stopping: AtomicBool,
193 stopping_event: Event,
194 idle_start_event: Event,
195 idle_end_event: Event,
196 #[cfg(feature = "verify_aggregation_graph")]
197 is_idle: AtomicBool,
198
199 task_statistics: TaskStatisticsApi,
200
201 backing_storage: TurboBackingStorage,
202
203 #[cfg(feature = "verify_aggregation_graph")]
204 root_tasks: Mutex<FxHashSet<TaskId>>,
205}
206
207impl TurboTasksBackend {
208 pub fn invalidate_storage(&self, reason_code: &str) -> Result<()> {
214 self.backing_storage.invalidate(reason_code)
215 }
216
217 pub fn new(mut options: BackendOptions, backing_storage: TurboBackingStorage) -> Self {
218 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
219 if !options.dependency_tracking {
220 options.active_tracking = false;
221 }
222 let small_preallocation = options.small_preallocation;
223 let next_task_id = backing_storage
224 .next_free_task_id()
225 .expect("Failed to get task id");
226 Self {
227 options,
228 start_time: Instant::now(),
229 persisted_task_id_factory: IdFactoryWithReuse::new(
230 next_task_id,
231 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
232 ),
233 transient_task_id_factory: IdFactoryWithReuse::new(
234 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
235 TaskId::MAX,
236 ),
237 storage: Storage::new(shard_amount, small_preallocation),
238 snapshot_coord: SnapshotCoordinator::new(),
239 snapshot_in_progress: Mutex::new(()),
240 stopping: AtomicBool::new(false),
241 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
242 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
243 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
244 #[cfg(feature = "verify_aggregation_graph")]
245 is_idle: AtomicBool::new(false),
246 task_statistics: TaskStatisticsApi::default(),
247 backing_storage,
248 #[cfg(feature = "verify_aggregation_graph")]
249 root_tasks: Default::default(),
250 }
251 }
252
253 fn execute_context<'a>(
254 &'a self,
255 turbo_tasks: &'a TurboTasks<TurboTasksBackend>,
256 ) -> impl ExecuteContext<'a> {
257 ExecuteContextImpl::new(self, turbo_tasks)
258 }
259
260 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
261 if self.should_persist() {
262 self.snapshot_coord.suspend_point(suspend);
263 }
264 }
265
266 pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
267 if !self.should_persist() {
268 return OperationGuard::noop();
269 }
270 self.snapshot_coord.begin_operation()
271 }
272
273 fn should_persist(&self) -> bool {
274 matches!(
275 self.options.storage_mode,
276 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
277 )
278 }
279
280 fn should_evict(&self) -> bool {
281 self.options.evict_after_snapshot && self.should_persist()
282 }
283
284 #[doc(hidden)]
291 pub fn snapshot_and_evict_for_testing(
292 &self,
293 turbo_tasks: &TurboTasks<TurboTasksBackend>,
294 ) -> (bool, EvictionCounts) {
295 assert!(
296 self.should_persist(),
297 "snapshot_and_evict requires persistence"
298 );
299 let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
300 let had_new_data = match snapshot_result {
301 Ok((_, new_data)) => new_data,
302 Err(_) => {
303 return (false, EvictionCounts::default());
307 }
308 };
309 let counts = self.storage.evict_after_snapshot(None);
310 (had_new_data, counts)
311 }
312
313 fn should_restore(&self) -> bool {
314 self.options.storage_mode.is_some()
315 }
316
317 fn should_track_dependencies(&self) -> bool {
318 self.options.dependency_tracking
319 }
320
321 fn should_track_activeness(&self) -> bool {
322 self.options.active_tracking
323 }
324
325 fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
326 self.task_statistics
327 .map(|stats| stats.increment_cache_hit(native_fn));
328 }
329
330 fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
331 self.task_statistics
332 .map(|stats| stats.increment_cache_miss(native_fn));
333 }
334
335 fn task_error_to_turbo_tasks_execution_error(
340 &self,
341 error: &TaskError,
342 ctx: &mut impl ExecuteContext<'_>,
343 ) -> TurboTasksExecutionError {
344 match error {
345 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
346 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
347 message: item.message.clone(),
348 source: item
349 .source
350 .as_ref()
351 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
352 })),
353 TaskError::LocalTaskContext(local_task_context) => {
354 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
355 name: local_task_context.name.clone(),
356 source: local_task_context
357 .source
358 .as_ref()
359 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
360 }))
361 }
362 TaskError::TaskChain(chain) => {
363 let task_id = chain.last().unwrap();
364 let error = {
365 let task = ctx.task(*task_id, TaskDataCategory::Meta);
366 if let Some(OutputValue::Error(error)) = task.get_output() {
367 Some(error.clone())
368 } else {
369 None
370 }
371 };
372 let error = error.map_or_else(
373 || {
374 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
376 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
377 "Error no longer available",
378 )),
379 location: None,
380 }))
381 },
382 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
383 );
384 let mut current_error = error;
385 for &task_id in chain.iter().rev() {
386 current_error =
387 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
388 task_id,
389 source: Some(current_error),
390 turbo_tasks: ctx.turbo_tasks(),
391 }));
392 }
393 current_error
394 }
395 }
396 }
397}
398
399struct TaskExecutionCompletePrepareResult {
401 pub new_children: FxHashSet<TaskId>,
402 pub is_now_immutable: bool,
403 #[cfg(feature = "verify_determinism")]
404 pub no_output_set: bool,
405 #[cfg(feature = "task_dirty_cause")]
406 pub function_id: Option<FunctionId>,
407 pub new_output: Option<OutputValue>,
408 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
409 pub is_recomputation: bool,
410 pub is_session_dependent: bool,
411}
412
413impl TurboTasksBackend {
415 fn try_read_task_output(
416 &self,
417 task_id: TaskId,
418 reader: Option<TaskId>,
419 options: ReadOutputOptions,
420 turbo_tasks: &TurboTasks<TurboTasksBackend>,
421 ) -> Result<Result<RawVc, EventListener>> {
422 self.assert_not_persistent_calling_transient(reader, task_id, None);
423
424 let mut ctx = self.execute_context(turbo_tasks);
425 let need_reader_task = if self.should_track_dependencies()
426 && !matches!(options.tracking, ReadTracking::Untracked)
427 && reader.is_some_and(|reader_id| reader_id != task_id)
428 && let Some(reader_id) = reader
429 && reader_id != task_id
430 {
431 Some(reader_id)
432 } else {
433 None
434 };
435 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
436 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
440 (task, Some(reader))
441 } else {
442 (ctx.task(task_id, TaskDataCategory::All), None)
443 };
444
445 fn listen_to_done_event(
446 reader_description: Option<EventDescription>,
447 tracking: ReadTracking,
448 done_event: &Event,
449 ) -> EventListener {
450 done_event.listen_with_note(move || {
451 move || {
452 if let Some(reader_description) = reader_description.as_ref() {
453 format!(
454 "try_read_task_output from {} ({})",
455 reader_description, tracking
456 )
457 } else {
458 format!("try_read_task_output ({})", tracking)
459 }
460 }
461 })
462 }
463
464 fn check_in_progress(
465 task: &impl TaskGuard,
466 reader_description: Option<EventDescription>,
467 tracking: ReadTracking,
468 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
469 {
470 match task.get_in_progress() {
471 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
472 listen_to_done_event(reader_description, tracking, done_event),
473 ))),
474 Some(InProgressState::InProgress(box InProgressStateInner {
475 done_event, ..
476 })) => Some(Ok(Err(listen_to_done_event(
477 reader_description,
478 tracking,
479 done_event,
480 )))),
481 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
482 "{} was canceled",
483 task.get_task_description()
484 ))),
485 None => None,
486 }
487 }
488
489 if matches!(options.consistency, ReadConsistency::Strong) {
490 if task
491 .get_persistent_task_type()
492 .is_some_and(|t| !t.native_fn.is_root)
493 {
494 drop(task);
495 drop(reader_task);
496 panic!(
497 "Strongly consistent read of non-root task {} (reader: {}). The `root` \
498 attribute is missing on the task.",
499 self.debug_get_task_description(task_id),
500 reader.map_or_else(
501 || "unknown".to_string(),
502 |r| self.debug_get_task_description(r)
503 )
504 );
505 }
506
507 let is_dirty = task.is_dirty();
508
509 let has_dirty_containers = task.has_dirty_containers();
511 if has_dirty_containers || is_dirty.is_some() {
512 let activeness = task.get_activeness_mut();
513 let mut task_ids_to_schedule: Vec<_> = Vec::new();
514 let activeness = if let Some(activeness) = activeness {
516 activeness.set_active_until_clean();
520 activeness
521 } else {
522 if ctx.should_track_activeness() {
526 task_ids_to_schedule = task.dirty_containers().collect();
528 task_ids_to_schedule.push(task_id);
529 }
530 let activeness =
531 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
532 activeness.set_active_until_clean();
533 activeness
534 };
535 let listener = activeness.all_clean_event.listen_with_note(move || {
536 let tt = turbo_tasks.pin();
539 move || {
540 let mut ctx = tt.backend().execute_context(&tt);
541 let mut visited = FxHashSet::default();
542 fn indent(s: &str) -> String {
543 s.split_inclusive('\n')
544 .flat_map(|line: &str| [" ", line].into_iter())
545 .collect::<String>()
546 }
547 fn get_info(
548 ctx: &mut impl ExecuteContext<'_>,
549 task_id: TaskId,
550 parent_and_count: Option<(TaskId, i32)>,
551 visited: &mut FxHashSet<TaskId>,
552 ) -> String {
553 let task = ctx.task(task_id, TaskDataCategory::All);
554 let is_dirty = task.is_dirty();
555 let in_progress =
556 task.get_in_progress()
557 .map_or("not in progress", |p| match p {
558 InProgressState::InProgress(_) => "in progress",
559 InProgressState::Scheduled { .. } => "scheduled",
560 InProgressState::Canceled => "canceled",
561 });
562 let activeness = task.get_activeness().map_or_else(
563 || "not active".to_string(),
564 |activeness| format!("{activeness:?}"),
565 );
566 let aggregation_number = get_aggregation_number(&task);
567 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
568 {
569 let uppers = get_uppers(&task);
570 !uppers.contains(&parent_task_id)
571 } else {
572 false
573 };
574
575 let has_dirty_containers = task.has_dirty_containers();
577
578 let task_description = task.get_task_description();
579 let is_dirty_label = if let Some(parent_priority) = is_dirty {
580 format!(", dirty({parent_priority})")
581 } else {
582 String::new()
583 };
584 let has_dirty_containers_label = if has_dirty_containers {
585 ", dirty containers"
586 } else {
587 ""
588 };
589 let count = if let Some((_, count)) = parent_and_count {
590 format!(" {count}")
591 } else {
592 String::new()
593 };
594 let mut info = format!(
595 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
596 {in_progress}, \
597 {activeness}{is_dirty_label}{has_dirty_containers_label})",
598 );
599 let children: Vec<_> = task.dirty_containers_with_count().collect();
600 drop(task);
601
602 if missing_upper {
603 info.push_str("\n ERROR: missing upper connection");
604 }
605
606 if has_dirty_containers || !children.is_empty() {
607 writeln!(info, "\n dirty tasks:").unwrap();
608
609 for (child_task_id, count) in children {
610 let task_description = ctx
611 .task(child_task_id, TaskDataCategory::Data)
612 .get_task_description();
613 if visited.insert(child_task_id) {
614 let child_info = get_info(
615 ctx,
616 child_task_id,
617 Some((task_id, count)),
618 visited,
619 );
620 info.push_str(&indent(&child_info));
621 if !info.ends_with('\n') {
622 info.push('\n');
623 }
624 } else {
625 writeln!(
626 info,
627 " {child_task_id} {task_description} {count} \
628 (already visited)"
629 )
630 .unwrap();
631 }
632 }
633 }
634 info
635 }
636 let info = get_info(&mut ctx, task_id, None, &mut visited);
637 format!(
638 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
639 )
640 }
641 });
642 drop(reader_task);
643 drop(task);
644 if !task_ids_to_schedule.is_empty() {
645 let mut queue = AggregationUpdateQueue::new();
646 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
647 queue.execute(&mut ctx);
648 }
649
650 return Ok(Err(listener));
651 }
652 }
653
654 let reader_description = reader_task
655 .as_ref()
656 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
657 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
658 {
659 return value;
660 }
661
662 if let Some(output) = task.get_output() {
663 let result = match output {
664 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
665 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
666 OutputValue::Error(error) => Err(error.clone()),
667 };
668 if let Some(mut reader_task) = reader_task.take()
669 && options.tracking.should_track(result.is_err())
670 && (!task.immutable() || cfg!(feature = "verify_immutable"))
671 {
672 #[cfg(feature = "trace_task_output_dependencies")]
673 let _span = tracing::trace_span!(
674 "add output dependency",
675 task = %task_id,
676 dependent_task = ?reader
677 )
678 .entered();
679 let mut queue = LeafDistanceUpdateQueue::new();
680 let reader = reader.unwrap();
681 if task.add_output_dependent(reader) {
682 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
684 let reader_leaf_distance =
685 reader_task.get_leaf_distance().copied().unwrap_or_default();
686 if reader_leaf_distance.distance <= leaf_distance.distance {
687 queue.push(
688 reader,
689 leaf_distance.distance,
690 leaf_distance.max_distance_in_buffer,
691 );
692 }
693 }
694
695 drop(task);
696
697 if !reader_task.remove_outdated_output_dependencies(&task_id) {
703 let _ = reader_task.add_output_dependencies(task_id);
704 }
705 drop(reader_task);
706
707 queue.execute(&mut ctx);
708 } else {
709 drop(task);
710 }
711
712 return result.map_err(|error| {
713 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
714 .with_task_context(task_id, turbo_tasks.pin())
715 .into()
716 });
717 }
718 drop(reader_task);
719
720 let note = EventDescription::new(|| {
721 move || {
722 if let Some(reader) = reader_description.as_ref() {
723 format!("try_read_task_output (recompute) from {reader}",)
724 } else {
725 "try_read_task_output (recompute, untracked)".to_string()
726 }
727 }
728 });
729
730 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
732 TaskExecutionReason::OutputNotAvailable,
733 EventDescription::new(|| task.get_task_desc_fn()),
734 note,
735 );
736
737 let old = task.set_in_progress(in_progress_state);
740 debug_assert!(old.is_none(), "InProgress already exists");
741 ctx.schedule_task(task, TaskPriority::Recomputation);
742
743 Ok(Err(listener))
744 }
745
746 fn try_read_task_cell(
747 &self,
748 task_id: TaskId,
749 reader: Option<TaskId>,
750 cell: CellId,
751 options: ReadCellOptions,
752 turbo_tasks: &TurboTasks<TurboTasksBackend>,
753 ) -> Result<Result<TypedCellContent, EventListener>> {
754 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
755
756 fn add_cell_dependency(
757 task_id: TaskId,
758 mut task: impl TaskGuard,
759 reader: Option<TaskId>,
760 reader_task: Option<impl TaskGuard>,
761 cell: CellId,
762 key: Option<u64>,
763 ) {
764 if let Some(mut reader_task) = reader_task
765 && (!task.immutable() || cfg!(feature = "verify_immutable"))
766 {
767 let reader = reader.unwrap();
768 let _ = task
769 .add_cell_dependents(CellDependency::new(CellRef { task: reader, cell }, key));
770 drop(task);
771
772 let target = CellRef {
778 task: task_id,
779 cell,
780 };
781 let dep = CellDependency::new(target, key);
782 if !reader_task.remove_outdated_cell_dependencies(&dep) {
783 let _ = reader_task.add_cell_dependencies(dep);
784 }
785 drop(reader_task);
786 }
787 }
788
789 let ReadCellOptions {
790 tracking,
791 final_read_hint,
792 } = options;
793
794 let mut ctx = self.execute_context(turbo_tasks);
795 let (mut task, reader_task) = if self.should_track_dependencies()
796 && !matches!(tracking, ReadCellTracking::Untracked)
797 && let Some(reader_id) = reader
798 && reader_id != task_id
799 {
800 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
804 (task, Some(reader))
805 } else {
806 (ctx.task(task_id, TaskDataCategory::All), None)
807 };
808
809 let content = if final_read_hint {
810 task.remove_cell_data(&cell)
811 } else {
812 task.get_cell_data(&cell).cloned()
813 };
814 if let Some(content) = content {
815 if tracking.should_track(false) {
816 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
817 }
818 return Ok(Ok(TypedCellContent(
819 cell.type_id,
820 CellContent(Some(content)),
821 )));
822 }
823
824 let in_progress = task.get_in_progress();
825 if matches!(
826 in_progress,
827 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
828 ) {
829 return Ok(Err(self
830 .listen_to_cell(&mut task, task_id, &reader_task, cell)
831 .0));
832 }
833 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
834
835 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
837 let Some(max_id) = max_id else {
838 let task_desc = task.get_task_description();
839 if tracking.should_track(true) {
840 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
841 }
842 bail!(
843 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
844 );
845 };
846 if cell.index >= max_id {
847 let task_desc = task.get_task_description();
848 if tracking.should_track(true) {
849 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
850 }
851 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
852 }
853
854 if is_cancelled {
860 bail!("{} was canceled", task.get_task_description());
861 }
862
863 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
865 drop(reader_task);
866 if !new_listener {
867 return Ok(Err(listener));
868 }
869
870 let _span = tracing::trace_span!(
871 "recomputation",
872 cell_type = get_value_type(cell.type_id).ty.global_name,
873 cell_index = cell.index
874 )
875 .entered();
876
877 let _ = task.add_scheduled(
878 TaskExecutionReason::CellNotAvailable,
879 EventDescription::new(|| task.get_task_desc_fn()),
880 );
881 ctx.schedule_task(task, TaskPriority::Recomputation);
882
883 Ok(Err(listener))
884 }
885
886 fn listen_to_cell(
887 &self,
888 task: &mut impl TaskGuard,
889 task_id: TaskId,
890 reader_task: &Option<impl TaskGuard>,
891 cell: CellId,
892 ) -> (EventListener, bool) {
893 let note = || {
894 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
895 move || {
896 if let Some(reader_desc) = reader_desc.as_ref() {
897 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
898 } else {
899 "try_read_task_cell (in progress, untracked)".to_string()
900 }
901 }
902 };
903 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
904 let listener = in_progress.event.listen_with_note(note);
906 return (listener, false);
907 }
908 let in_progress = InProgressCellState::new(task_id, cell);
909 let listener = in_progress.event.listen_with_note(note);
910 let old = task.insert_in_progress_cells(cell, in_progress);
911 debug_assert!(old.is_none(), "InProgressCell already exists");
912 (listener, true)
913 }
914
915 fn snapshot_and_persist(
916 &self,
917 parent_span: Option<tracing::Id>,
918 reason: &str,
919 turbo_tasks: &TurboTasks<TurboTasksBackend>,
920 ) -> Result<(Instant, bool), anyhow::Error> {
921 let snapshot_span =
922 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
923 .entered();
924 let _snapshot_in_progress = self.snapshot_in_progress.lock();
928 let start = Instant::now();
929 let wall_start = SystemTime::now();
933 debug_assert!(self.should_persist());
934
935 let mut snapshot_phase = {
936 let _span = tracing::info_span!("blocking").entered();
937 self.snapshot_coord.begin_snapshot()
938 };
939 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
942
943 let suspended_operations = snapshot_phase.take_suspended_operations();
944
945 let snapshot_time = Instant::now();
946 drop(snapshot_phase);
947
948 if !has_modifications {
949 drop(snapshot_guard);
952 return Ok((start, false));
953 }
954
955 #[cfg(feature = "print_cache_item_size")]
956 #[derive(Default)]
957 struct TaskCacheStats {
958 data: usize,
959 #[cfg(feature = "print_cache_item_size_with_compressed")]
960 data_compressed: usize,
961 data_count: usize,
962 meta: usize,
963 #[cfg(feature = "print_cache_item_size_with_compressed")]
964 meta_compressed: usize,
965 meta_count: usize,
966 upper_count: usize,
967 collectibles_count: usize,
968 aggregated_collectibles_count: usize,
969 children_count: usize,
970 followers_count: usize,
971 collectibles_dependents_count: usize,
972 aggregated_dirty_containers_count: usize,
973 output_size: usize,
974 }
975 #[cfg(feature = "print_cache_item_size")]
978 struct FormatSizes {
979 size: usize,
980 #[cfg(feature = "print_cache_item_size_with_compressed")]
981 compressed_size: usize,
982 }
983 #[cfg(feature = "print_cache_item_size")]
984 impl std::fmt::Display for FormatSizes {
985 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
986 use turbo_tasks::util::FormatBytes;
987 #[cfg(feature = "print_cache_item_size_with_compressed")]
988 {
989 write!(
990 f,
991 "{} ({} compressed)",
992 FormatBytes(self.size),
993 FormatBytes(self.compressed_size)
994 )
995 }
996 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
997 {
998 write!(f, "{}", FormatBytes(self.size))
999 }
1000 }
1001 }
1002 #[cfg(feature = "print_cache_item_size")]
1003 impl TaskCacheStats {
1004 #[cfg(feature = "print_cache_item_size_with_compressed")]
1005 fn compressed_size(data: &[u8]) -> Result<usize> {
1006 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1007 data,
1008 &mut Vec::new(),
1009 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1010 )?)
1011 }
1012
1013 fn add_data(&mut self, data: &[u8]) {
1014 self.data += data.len();
1015 #[cfg(feature = "print_cache_item_size_with_compressed")]
1016 {
1017 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1018 }
1019 self.data_count += 1;
1020 }
1021
1022 fn add_meta(&mut self, data: &[u8]) {
1023 self.meta += data.len();
1024 #[cfg(feature = "print_cache_item_size_with_compressed")]
1025 {
1026 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1027 }
1028 self.meta_count += 1;
1029 }
1030
1031 fn add_counts(&mut self, storage: &TaskStorage) {
1032 let counts = storage.meta_counts();
1033 self.upper_count += counts.upper;
1034 self.collectibles_count += counts.collectibles;
1035 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1036 self.children_count += counts.children;
1037 self.followers_count += counts.followers;
1038 self.collectibles_dependents_count += counts.collectibles_dependents;
1039 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1040 if let Some(output) = storage.get_output() {
1041 use turbo_bincode::turbo_bincode_encode;
1042
1043 self.output_size += turbo_bincode_encode(&output)
1044 .map(|data| data.len())
1045 .unwrap_or(0);
1046 }
1047 }
1048
1049 fn task_name(storage: &TaskStorage) -> String {
1051 storage
1052 .get_persistent_task_type()
1053 .map(|t| t.to_string())
1054 .unwrap_or_else(|| "<unknown>".to_string())
1055 }
1056
1057 fn sort_key(&self) -> usize {
1060 #[cfg(feature = "print_cache_item_size_with_compressed")]
1061 {
1062 self.data_compressed + self.meta_compressed
1063 }
1064 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1065 {
1066 self.data + self.meta
1067 }
1068 }
1069
1070 fn format_total(&self) -> FormatSizes {
1071 FormatSizes {
1072 size: self.data + self.meta,
1073 #[cfg(feature = "print_cache_item_size_with_compressed")]
1074 compressed_size: self.data_compressed + self.meta_compressed,
1075 }
1076 }
1077
1078 fn format_data(&self) -> FormatSizes {
1079 FormatSizes {
1080 size: self.data,
1081 #[cfg(feature = "print_cache_item_size_with_compressed")]
1082 compressed_size: self.data_compressed,
1083 }
1084 }
1085
1086 fn format_avg_data(&self) -> FormatSizes {
1087 FormatSizes {
1088 size: self.data.checked_div(self.data_count).unwrap_or(0),
1089 #[cfg(feature = "print_cache_item_size_with_compressed")]
1090 compressed_size: self
1091 .data_compressed
1092 .checked_div(self.data_count)
1093 .unwrap_or(0),
1094 }
1095 }
1096
1097 fn format_meta(&self) -> FormatSizes {
1098 FormatSizes {
1099 size: self.meta,
1100 #[cfg(feature = "print_cache_item_size_with_compressed")]
1101 compressed_size: self.meta_compressed,
1102 }
1103 }
1104
1105 fn format_avg_meta(&self) -> FormatSizes {
1106 FormatSizes {
1107 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1108 #[cfg(feature = "print_cache_item_size_with_compressed")]
1109 compressed_size: self
1110 .meta_compressed
1111 .checked_div(self.meta_count)
1112 .unwrap_or(0),
1113 }
1114 }
1115 }
1116 #[cfg(feature = "print_cache_item_size")]
1117 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1118 Mutex::new(FxHashMap::default());
1119
1120 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1127 let encode_category = |task_id: TaskId,
1128 data: &TaskStorage,
1129 category: SpecificTaskDataCategory,
1130 buffer: &mut TurboBincodeBuffer|
1131 -> Option<TurboBincodeBuffer> {
1132 match encode_task_data(task_id, data, category, buffer) {
1133 Ok(encoded) => {
1134 #[cfg(feature = "print_cache_item_size")]
1135 {
1136 let mut stats = task_cache_stats.lock();
1137 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1138 match category {
1139 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1140 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1141 }
1142 }
1143 Some(encoded)
1144 }
1145 Err(err) => {
1146 panic!(
1147 "Serializing task {} failed ({:?}): {:?}",
1148 self.debug_get_task_description(task_id),
1149 category,
1150 err
1151 );
1152 }
1153 }
1154 };
1155 if task_id.is_transient() {
1156 unreachable!("transient task_ids should never be enqueued to be persisted");
1157 }
1158
1159 let encode_meta = inner.flags.meta_modified();
1160 let encode_data = inner.flags.data_modified();
1161
1162 #[cfg(feature = "print_cache_item_size")]
1163 if encode_data || encode_meta {
1164 task_cache_stats
1165 .lock()
1166 .entry(TaskCacheStats::task_name(inner))
1167 .or_default()
1168 .add_counts(inner);
1169 }
1170
1171 let meta = if encode_meta {
1172 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1173 } else {
1174 None
1175 };
1176
1177 let data = if encode_data {
1178 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1179 } else {
1180 None
1181 };
1182 let task_type_hash = if inner.flags.new_task() {
1183 let task_type = inner.get_persistent_task_type().expect(
1184 "It is not possible for a new_task to not have a persistent_task_type. Task \
1185 creation for persistent tasks uses a single ExecutionContextImpl for \
1186 creating the task (which sets new_task) and connect_child (which sets \
1187 persistent_task_type) and take_snapshot waits for all operations to complete \
1188 or suspend before we start snapshotting. So task creation will always set \
1189 the task_type.",
1190 );
1191 Some(compute_task_type_hash(task_type))
1192 } else {
1193 None
1194 };
1195
1196 SnapshotItem {
1197 task_id,
1198 meta,
1199 data,
1200 task_type_hash,
1201 }
1202 };
1203
1204 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1205
1206 drop(snapshot_span);
1207 let snapshot_duration = start.elapsed();
1208 let task_count = task_snapshots.len();
1209
1210 if task_snapshots.is_empty() {
1211 std::hint::cold_path();
1214 return Ok((snapshot_time, false));
1215 }
1216
1217 let persist_start = Instant::now();
1218 let span = tracing::info_span!(
1219 parent: parent_span,
1220 "persist",
1221 reason = reason,
1222 data_items= tracing::field::Empty,
1223 meta_items= tracing::field::Empty,
1224 task_cache_items= tracing::field::Empty,
1225 next_task_id= tracing::field::Empty,)
1226 .entered();
1227 {
1228 let SnapshotMeta {
1232 task_cache_items,
1233 data_items,
1234 meta_items,
1235 max_next_task_id,
1236 } = self
1237 .backing_storage
1238 .save_snapshot(suspended_operations, task_snapshots)?;
1239 span.record("data_items", data_items);
1240 span.record("meta_items", meta_items);
1241 span.record("task_cache_items", task_cache_items);
1242 span.record("next_task_id", max_next_task_id);
1243
1244 #[cfg(feature = "print_cache_item_size")]
1245 {
1246 let mut task_cache_stats = task_cache_stats
1247 .into_inner()
1248 .into_iter()
1249 .collect::<Vec<_>>();
1250 if !task_cache_stats.is_empty() {
1251 use turbo_tasks::util::FormatBytes;
1252
1253 use crate::utils::markdown_table::print_markdown_table;
1254
1255 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1256 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1257 });
1258
1259 println!(
1260 "Task cache stats: {}",
1261 FormatSizes {
1262 size: task_cache_stats
1263 .iter()
1264 .map(|(_, s)| s.data + s.meta)
1265 .sum::<usize>(),
1266 #[cfg(feature = "print_cache_item_size_with_compressed")]
1267 compressed_size: task_cache_stats
1268 .iter()
1269 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1270 .sum::<usize>()
1271 },
1272 );
1273
1274 print_markdown_table(
1275 [
1276 "Task",
1277 " Total Size",
1278 " Data Size",
1279 " Data Count x Avg",
1280 " Data Count x Avg",
1281 " Meta Size",
1282 " Meta Count x Avg",
1283 " Meta Count x Avg",
1284 " Uppers",
1285 " Coll",
1286 " Agg Coll",
1287 " Children",
1288 " Followers",
1289 " Coll Deps",
1290 " Agg Dirty",
1291 " Output Size",
1292 ],
1293 task_cache_stats.iter(),
1294 |(task_desc, stats)| {
1295 [
1296 task_desc.to_string(),
1297 format!(" {}", stats.format_total()),
1298 format!(" {}", stats.format_data()),
1299 format!(" {} x", stats.data_count),
1300 format!("{}", stats.format_avg_data()),
1301 format!(" {}", stats.format_meta()),
1302 format!(" {} x", stats.meta_count),
1303 format!("{}", stats.format_avg_meta()),
1304 format!(" {}", stats.upper_count),
1305 format!(" {}", stats.collectibles_count),
1306 format!(" {}", stats.aggregated_collectibles_count),
1307 format!(" {}", stats.children_count),
1308 format!(" {}", stats.followers_count),
1309 format!(" {}", stats.collectibles_dependents_count),
1310 format!(" {}", stats.aggregated_dirty_containers_count),
1311 format!(" {}", FormatBytes(stats.output_size)),
1312 ]
1313 },
1314 );
1315 }
1316 }
1317 }
1318
1319 let elapsed = start.elapsed();
1320 let persist_duration = persist_start.elapsed();
1321 if elapsed > Duration::from_secs(10) {
1323 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1324 "Finished writing to filesystem cache".to_string(),
1325 elapsed,
1326 )));
1327 }
1328
1329 let wall_start_ms = wall_start
1330 .duration_since(SystemTime::UNIX_EPOCH)
1331 .unwrap_or_default()
1332 .as_secs_f64()
1334 * 1000.0;
1335 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1336 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1337 "turbopack-persistence",
1338 wall_start_ms,
1339 wall_end_ms,
1340 vec![
1341 ("reason", serde_json::Value::from(reason)),
1342 (
1343 "snapshot_duration_ms",
1344 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1345 ),
1346 (
1347 "persist_duration_ms",
1348 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1349 ),
1350 ("task_count", serde_json::Value::from(task_count)),
1351 ],
1352 )));
1353
1354 Ok((snapshot_time, true))
1355 }
1356
1357 fn startup(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1358 if self.should_restore() {
1359 let uncompleted_operations = self
1363 .backing_storage
1364 .uncompleted_operations()
1365 .expect("Failed to get uncompleted operations");
1366 if !uncompleted_operations.is_empty() {
1367 let mut ctx = self.execute_context(turbo_tasks);
1368 for op in uncompleted_operations {
1369 op.execute(&mut ctx);
1370 }
1371 }
1372 }
1373
1374 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1377 let _span = trace_span!("persisting background job").entered();
1379 let _span = tracing::info_span!("thread").entered();
1380 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1381 }
1382 }
1383
1384 fn stopping(&self) {
1385 self.stopping.store(true, Ordering::Release);
1386 self.stopping_event.notify(usize::MAX);
1387 }
1388
1389 #[allow(unused_variables)]
1390 fn stop(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1391 #[cfg(feature = "verify_aggregation_graph")]
1392 {
1393 self.is_idle.store(false, Ordering::Release);
1394 self.verify_aggregation_graph(turbo_tasks, false);
1395 }
1396 if self.should_persist()
1397 && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1398 {
1399 eprintln!("Persisting failed during shutdown: {err:?}");
1400 }
1401 self.storage.drop_contents();
1402 if let Err(err) = self.backing_storage.shutdown() {
1403 println!("Shutting down failed: {err}");
1404 }
1405 }
1406
1407 #[allow(unused_variables)]
1408 fn idle_start(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1409 self.idle_start_event.notify(usize::MAX);
1410
1411 #[cfg(feature = "verify_aggregation_graph")]
1412 {
1413 use tokio::select;
1414
1415 self.is_idle.store(true, Ordering::Release);
1416 let turbo_tasks = turbo_tasks.pin();
1420 tokio::task::spawn(async move {
1421 let backend = &turbo_tasks.backend();
1422 select! {
1423 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1424 }
1426 _ = backend.idle_end_event.listen() => {
1427 return;
1428 }
1429 }
1430 if !backend.is_idle.load(Ordering::Relaxed) {
1431 return;
1432 }
1433 backend.verify_aggregation_graph(&turbo_tasks, true);
1434 });
1435 }
1436 }
1437
1438 fn idle_end(&self) {
1439 #[cfg(feature = "verify_aggregation_graph")]
1440 self.is_idle.store(false, Ordering::Release);
1441 self.idle_end_event.notify(usize::MAX);
1442 }
1443
1444 fn get_or_create_task(
1445 &self,
1446 native_fn: &'static NativeFunction,
1447 this: Option<RawVc>,
1448 arg: &mut dyn DynTaskInputsStorage,
1449 parent_task: Option<TaskId>,
1450 persistence: TaskPersistence,
1451 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1452 ) -> TaskId {
1453 let transient = matches!(persistence, TaskPersistence::Transient);
1454
1455 if transient
1456 && let Some(parent_task) = parent_task
1457 && !parent_task.is_transient()
1458 {
1459 let task_type = CachedTaskType {
1460 native_fn,
1461 this,
1462 arg: arg.take_box(),
1463 };
1464 self.panic_persistent_calling_transient(
1465 self.debug_get_task_description(parent_task),
1466 Some(&task_type),
1467 None,
1468 );
1469 }
1470
1471 let is_root = native_fn.is_root;
1472
1473 let arg_ref = arg.as_ref();
1475 let hash = CachedTaskType::hash_from_components(
1476 self.storage.task_cache.hasher(),
1477 native_fn,
1478 this,
1479 arg_ref,
1480 );
1481 let shard = get_shard(&self.storage.task_cache, hash);
1485
1486 let mut ctx = self.execute_context(turbo_tasks);
1487 if let Some(task_id) =
1491 raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1492 {
1493 self.track_cache_hit_by_fn(native_fn);
1494 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1495 return task_id;
1496 }
1497
1498 let task_id = if !transient
1503 && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1504 {
1505 self.track_cache_hit_by_fn(native_fn);
1506 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1509 k.eq_components(native_fn, this, arg_ref)
1510 }) {
1511 RawEntry::Occupied(_) => {}
1512 RawEntry::Vacant(e) => {
1513 e.insert(stored_type, task_id);
1514 }
1515 };
1516 task_id
1517 } else {
1518 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1519 k.eq_components(native_fn, this, arg_ref)
1520 }) {
1521 RawEntry::Occupied(e) => {
1522 let task_id = *e.get();
1525 drop(e);
1526 self.track_cache_hit_by_fn(native_fn);
1527 task_id
1528 }
1529 RawEntry::Vacant(e) => {
1530 let task_type = CachedTaskTypeArc::new(CachedTaskType {
1534 native_fn,
1535 this,
1536 arg: arg.take_box(),
1537 });
1538 let task_id = if transient {
1539 self.transient_task_id_factory.get()
1540 } else {
1541 self.persisted_task_id_factory.get()
1542 };
1543 self.storage
1547 .initialize_new_task(task_id, Some(task_type.clone()));
1548 e.insert(task_type, task_id);
1550 self.track_cache_miss_by_fn(native_fn);
1551 if is_root {
1555 AggregationUpdateQueue::run(
1556 AggregationUpdateJob::UpdateAggregationNumber {
1557 task_id,
1558 base_aggregation_number: u32::MAX,
1559 distance: None,
1560 },
1561 &mut ctx,
1562 );
1563 } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1564 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1565 AggregationUpdateQueue::run(
1566 AggregationUpdateJob::UpdateAggregationNumber {
1567 task_id,
1568 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1569 distance: None,
1570 },
1571 &mut ctx,
1572 );
1573 };
1574
1575 task_id
1576 }
1577 }
1578 };
1579
1580 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1581
1582 task_id
1583 }
1584
1585 fn debug_trace_transient_task(
1588 &self,
1589 task_type: &CachedTaskType,
1590 cell_id: Option<CellId>,
1591 ) -> DebugTraceTransientTask {
1592 fn inner_id(
1595 backend: &TurboTasksBackend,
1596 task_id: TaskId,
1597 cell_type_id: Option<ValueTypeId>,
1598 visited_set: &mut FxHashSet<TaskId>,
1599 ) -> DebugTraceTransientTask {
1600 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1601 if visited_set.contains(&task_id) {
1602 let task_name = task_type.get_name();
1603 DebugTraceTransientTask::Collapsed {
1604 task_name,
1605 cell_type_id,
1606 }
1607 } else {
1608 inner_cached(backend, &task_type, cell_type_id, visited_set)
1609 }
1610 } else {
1611 DebugTraceTransientTask::Uncached { cell_type_id }
1612 }
1613 }
1614 fn inner_cached(
1615 backend: &TurboTasksBackend,
1616 task_type: &CachedTaskType,
1617 cell_type_id: Option<ValueTypeId>,
1618 visited_set: &mut FxHashSet<TaskId>,
1619 ) -> DebugTraceTransientTask {
1620 let task_name = task_type.get_name();
1621
1622 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1623 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1624 return None;
1628 };
1629 if task_id.is_transient() {
1630 Some(Box::new(inner_id(
1631 backend,
1632 task_id,
1633 cause_self_raw_vc.try_get_type_id(),
1634 visited_set,
1635 )))
1636 } else {
1637 None
1638 }
1639 });
1640 let cause_args = task_type
1641 .arg
1642 .get_raw_vcs()
1643 .into_iter()
1644 .filter_map(|raw_vc| {
1645 let Some(task_id) = raw_vc.try_get_task_id() else {
1646 return None;
1648 };
1649 if !task_id.is_transient() {
1650 return None;
1651 }
1652 Some((task_id, raw_vc.try_get_type_id()))
1653 })
1654 .collect::<IndexSet<_>>() .into_iter()
1656 .map(|(task_id, cell_type_id)| {
1657 inner_id(backend, task_id, cell_type_id, visited_set)
1658 })
1659 .collect();
1660
1661 DebugTraceTransientTask::Cached {
1662 task_name,
1663 cell_type_id,
1664 cause_self,
1665 cause_args,
1666 }
1667 }
1668 inner_cached(
1669 self,
1670 task_type,
1671 cell_id.map(|c| c.type_id),
1672 &mut FxHashSet::default(),
1673 )
1674 }
1675
1676 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1677 if !self.should_track_dependencies() {
1678 panic!("Dependency tracking is disabled so invalidation is not allowed");
1679 }
1680 operation::InvalidateOperation::run(
1681 smallvec![task_id],
1682 #[cfg(feature = "task_dirty_cause")]
1683 TaskDirtyCause::Invalidator,
1684 self.execute_context(turbo_tasks),
1685 );
1686 }
1687
1688 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1689 if !self.should_track_dependencies() {
1690 panic!("Dependency tracking is disabled so invalidation is not allowed");
1691 }
1692 operation::InvalidateOperation::run(
1693 tasks.iter().copied().collect(),
1694 #[cfg(feature = "task_dirty_cause")]
1695 TaskDirtyCause::Unknown,
1696 self.execute_context(turbo_tasks),
1697 );
1698 }
1699
1700 fn invalidate_tasks_set(
1701 &self,
1702 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1703 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1704 ) {
1705 if !self.should_track_dependencies() {
1706 panic!("Dependency tracking is disabled so invalidation is not allowed");
1707 }
1708 operation::InvalidateOperation::run(
1709 tasks.iter().copied().collect(),
1710 #[cfg(feature = "task_dirty_cause")]
1711 TaskDirtyCause::Unknown,
1712 self.execute_context(turbo_tasks),
1713 );
1714 }
1715
1716 fn invalidate_serialization(
1717 &self,
1718 task_id: TaskId,
1719 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1720 ) {
1721 if task_id.is_transient() {
1722 return;
1723 }
1724 let mut ctx = self.execute_context(turbo_tasks);
1725 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1726 task.invalidate_serialization();
1727 }
1728
1729 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1730 let task = self.storage.access_mut(task_id);
1731 if let Some(value) = task.get_persistent_task_type() {
1732 format!("{task_id:?} {}", value)
1733 } else if let Some(value) = task.get_transient_task_type() {
1734 format!("{task_id:?} {}", value)
1735 } else {
1736 format!("{task_id:?} unknown")
1737 }
1738 }
1739
1740 fn get_task_name(
1741 &self,
1742 task_id: TaskId,
1743 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1744 ) -> String {
1745 let mut ctx = self.execute_context(turbo_tasks);
1746 let task = ctx.task(task_id, TaskDataCategory::Data);
1747 if let Some(value) = task.get_persistent_task_type() {
1748 value.to_string()
1749 } else if let Some(value) = task.get_transient_task_type() {
1750 value.to_string()
1751 } else {
1752 "unknown".to_string()
1753 }
1754 }
1755
1756 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1757 let task = self.storage.access_mut(task_id);
1758 task.get_persistent_task_type().cloned()
1759 }
1760
1761 fn task_execution_canceled(
1762 &self,
1763 task_id: TaskId,
1764 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1765 ) {
1766 let mut ctx = self.execute_context(turbo_tasks);
1767 let mut task = ctx.task(task_id, TaskDataCategory::All);
1768 if let Some(in_progress) = task.take_in_progress() {
1769 match in_progress {
1770 InProgressState::Scheduled {
1771 done_event,
1772 reason: _,
1773 } => done_event.notify(usize::MAX),
1774 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1775 done_event.notify(usize::MAX)
1776 }
1777 InProgressState::Canceled => {}
1778 }
1779 }
1780 let in_progress_cells = task.take_in_progress_cells();
1783 if let Some(ref cells) = in_progress_cells {
1784 for state in cells.values() {
1785 state.event.notify(usize::MAX);
1786 }
1787 }
1788
1789 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1795 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1796 } else {
1797 None
1798 };
1799
1800 let old = task.set_in_progress(InProgressState::Canceled);
1801 debug_assert!(old.is_none(), "InProgress already exists");
1802 drop(task);
1803
1804 if let Some(data_update) = data_update {
1805 AggregationUpdateQueue::run(data_update, &mut ctx);
1806 }
1807
1808 drop(in_progress_cells);
1809 }
1810
1811 fn try_start_task_execution(
1812 &self,
1813 task_id: TaskId,
1814 priority: TaskPriority,
1815 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1816 ) -> Option<TaskExecutionSpec<'_>> {
1817 let execution_reason;
1818 let task_type;
1819 #[cfg(feature = "task_dirty_cause")]
1820 let cause;
1821 {
1822 let mut ctx = self.execute_context(turbo_tasks);
1823 let mut task = ctx.task(task_id, TaskDataCategory::All);
1824 task_type = task.get_task_type().to_owned();
1825 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1826 if let Some(tasks) = task.prefetch() {
1827 drop(task);
1828 ctx.prepare_tasks(tasks, "prefetch");
1829 task = ctx.task(task_id, TaskDataCategory::All);
1830 }
1831 let in_progress = task.take_in_progress()?;
1832 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1833 let old = task.set_in_progress(in_progress);
1834 debug_assert!(old.is_none(), "InProgress already exists");
1835 return None;
1836 };
1837 execution_reason = reason;
1838 #[cfg(feature = "task_dirty_cause")]
1839 {
1840 cause = match task.get_dirty() {
1841 Some(Dirtyness::Dirty { cause, .. }) => Some(cause.clone()),
1842 _ => None,
1843 };
1844 }
1845 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1846 InProgressStateInner {
1847 stale: false,
1848 once_task,
1849 done_event,
1850 marked_as_completed: false,
1851 new_children: Default::default(),
1852 },
1853 )));
1854 debug_assert!(old.is_none(), "InProgress already exists");
1855
1856 enum Collectible {
1858 Current(CollectibleRef, i32),
1859 Outdated(CollectibleRef),
1860 }
1861 let collectibles = task
1862 .iter_collectibles()
1863 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1864 .chain(
1865 task.iter_outdated_collectibles()
1866 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1867 )
1868 .collect::<Vec<_>>();
1869 for collectible in collectibles {
1870 match collectible {
1871 Collectible::Current(collectible, value) => {
1872 let _ = task.insert_outdated_collectible(collectible, value);
1873 }
1874 Collectible::Outdated(collectible) => {
1875 if task
1876 .collectibles()
1877 .is_none_or(|m| m.get(&collectible).is_none())
1878 {
1879 task.remove_outdated_collectibles(&collectible);
1880 }
1881 }
1882 }
1883 }
1884
1885 if self.should_track_dependencies() {
1886 let cell_dependencies = task.iter_cell_dependencies().collect();
1888 task.set_outdated_cell_dependencies(cell_dependencies);
1889
1890 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1891 task.set_outdated_output_dependencies(outdated_output_dependencies);
1892 }
1893 }
1894
1895 let (span, future) = match task_type {
1896 TaskType::Cached(task_type) => {
1897 let CachedTaskType {
1898 native_fn,
1899 this,
1900 arg,
1901 } = &*task_type;
1902 (
1903 native_fn.span(
1904 task_id.persistence(),
1905 execution_reason,
1906 priority,
1907 #[cfg(feature = "task_dirty_cause")]
1908 cause.as_ref(),
1909 ),
1910 native_fn.execute(*this, &**arg),
1911 )
1912 }
1913 TaskType::Transient(task_type) => {
1914 let span = tracing::trace_span!("turbo_tasks::root_task");
1915 let future = match &*task_type {
1916 TransientTask::Root(f) => f(),
1917 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1918 };
1919 (span, future)
1920 }
1921 };
1922 Some(TaskExecutionSpec { future, span })
1923 }
1924
1925 fn task_execution_completed(
1929 &self,
1930 task_id: TaskId,
1931 result: Result<RawVc, TurboTasksExecutionError>,
1932 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1933 #[cfg(feature = "verify_determinism")] stateful: bool,
1934 has_invalidator: bool,
1935 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1936 ) -> Option<TaskPriority> {
1937 #[cfg(not(feature = "trace_task_details"))]
1952 let span = tracing::trace_span!(
1953 "task execution completed",
1954 new_children = tracing::field::Empty
1955 )
1956 .entered();
1957 #[cfg(feature = "trace_task_details")]
1958 let span = tracing::trace_span!(
1959 "task execution completed",
1960 task_id = display(task_id),
1961 result = match result.as_ref() {
1962 Ok(value) => display(either::Either::Left(value)),
1963 Err(err) => display(either::Either::Right(err)),
1964 },
1965 new_children = tracing::field::Empty,
1966 immutable = tracing::field::Empty,
1967 new_output = tracing::field::Empty,
1968 output_dependents = tracing::field::Empty,
1969 aggregation_number = tracing::field::Empty,
1970 stale = tracing::field::Empty,
1971 )
1972 .entered();
1973
1974 let is_error = result.is_err();
1975
1976 let mut ctx = self.execute_context(turbo_tasks);
1977
1978 let TaskExecutionCompletePrepareResult {
1979 new_children,
1980 is_now_immutable,
1981 #[cfg(feature = "verify_determinism")]
1982 no_output_set,
1983 new_output,
1984 #[cfg(feature = "task_dirty_cause")]
1985 function_id,
1986 output_dependent_tasks,
1987 is_recomputation,
1988 is_session_dependent,
1989 } = match self.task_execution_completed_prepare(
1990 &mut ctx,
1991 #[cfg(feature = "trace_task_details")]
1992 &span,
1993 task_id,
1994 result,
1995 cell_counters,
1996 #[cfg(feature = "verify_determinism")]
1997 stateful,
1998 has_invalidator,
1999 ) {
2000 Ok(r) => r,
2001 Err(stale_priority) => {
2002 #[cfg(feature = "trace_task_details")]
2004 span.record("stale", "prepare");
2005 return Some(stale_priority);
2006 }
2007 };
2008
2009 #[cfg(feature = "trace_task_details")]
2010 span.record("new_output", new_output.is_some());
2011 #[cfg(feature = "trace_task_details")]
2012 span.record("output_dependents", output_dependent_tasks.len());
2013
2014 if !output_dependent_tasks.is_empty() {
2019 self.task_execution_completed_invalidate_output_dependent(
2020 &mut ctx,
2021 task_id,
2022 #[cfg(feature = "task_dirty_cause")]
2023 function_id,
2024 output_dependent_tasks,
2025 );
2026 }
2027
2028 let has_new_children = !new_children.is_empty();
2029 span.record("new_children", new_children.len());
2030
2031 if has_new_children {
2032 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2033 }
2034
2035 if has_new_children
2036 && let Some(stale_priority) =
2037 self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2038 {
2039 #[cfg(feature = "trace_task_details")]
2041 span.record("stale", "connect");
2042 return Some(stale_priority);
2043 }
2044
2045 let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2046 &mut ctx,
2047 task_id,
2048 #[cfg(feature = "verify_determinism")]
2049 no_output_set,
2050 new_output,
2051 is_now_immutable,
2052 is_session_dependent,
2053 );
2054 if let Some(stale_priority) = stale_priority {
2055 #[cfg(feature = "trace_task_details")]
2057 span.record("stale", "finish");
2058 return Some(stale_priority);
2059 }
2060
2061 let removed_data = self.task_execution_completed_cleanup(
2062 &mut ctx,
2063 task_id,
2064 cell_counters,
2065 is_error,
2066 is_recomputation,
2067 );
2068
2069 drop(removed_data);
2071 drop(in_progress_cells);
2072
2073 None
2074 }
2075
2076 fn task_execution_completed_prepare(
2077 &self,
2078 ctx: &mut impl ExecuteContext<'_>,
2079 #[cfg(feature = "trace_task_details")] span: &Span,
2080 task_id: TaskId,
2081 result: Result<RawVc, TurboTasksExecutionError>,
2082 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2083 #[cfg(feature = "verify_determinism")] stateful: bool,
2084 has_invalidator: bool,
2085 ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2086 let mut task = ctx.task(task_id, TaskDataCategory::All);
2087 let is_recomputation = task.is_dirty().is_none();
2088 let is_session_dependent = self.should_track_dependencies()
2091 && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2092 let Some(in_progress) = task.get_in_progress_mut() else {
2093 panic!("Task execution completed, but task is not in progress: {task:#?}");
2094 };
2095 if matches!(in_progress, InProgressState::Canceled) {
2096 return Ok(TaskExecutionCompletePrepareResult {
2097 new_children: Default::default(),
2098 is_now_immutable: false,
2099 #[cfg(feature = "verify_determinism")]
2100 no_output_set: false,
2101 #[cfg(feature = "task_dirty_cause")]
2102 function_id: None,
2103 new_output: None,
2104 output_dependent_tasks: Default::default(),
2105 is_recomputation,
2106 is_session_dependent,
2107 });
2108 }
2109 let &mut InProgressState::InProgress(box InProgressStateInner {
2110 stale,
2111 ref mut new_children,
2112 once_task: is_once_task,
2113 ..
2114 }) = in_progress
2115 else {
2116 panic!("Task execution completed, but task is not in progress: {task:#?}");
2117 };
2118
2119 #[cfg(not(feature = "no_fast_stale"))]
2121 if stale && !is_once_task {
2122 let stale_priority = compute_stale_priority(&task);
2123 let Some(InProgressState::InProgress(box InProgressStateInner {
2124 done_event,
2125 mut new_children,
2126 ..
2127 })) = task.take_in_progress()
2128 else {
2129 unreachable!();
2130 };
2131 let old = task.set_in_progress(InProgressState::Scheduled {
2132 done_event,
2133 reason: TaskExecutionReason::Stale,
2134 });
2135 debug_assert!(old.is_none(), "InProgress already exists");
2136 for task in task.iter_children() {
2139 new_children.remove(&task);
2140 }
2141 drop(task);
2142
2143 AggregationUpdateQueue::run(
2146 AggregationUpdateJob::DecreaseActiveCounts {
2147 task_ids: new_children.into_iter().collect(),
2148 },
2149 ctx,
2150 );
2151 return Err(stale_priority);
2152 }
2153
2154 let mut new_children = take(new_children);
2156
2157 #[cfg(feature = "task_dirty_cause")]
2159 let function_id = match task.get_task_type() {
2160 TaskTypeRef::Cached(task_type) => {
2161 Some(turbo_tasks::registry::get_function_id(task_type.native_fn))
2162 }
2163 TaskTypeRef::Transient(_) => None,
2164 };
2165
2166 #[cfg(feature = "verify_determinism")]
2168 if stateful {
2169 task.set_stateful(true);
2170 }
2171
2172 if has_invalidator {
2174 task.set_invalidator(true);
2175 }
2176
2177 if result.is_ok() || is_recomputation {
2187 let old_counters: FxHashMap<_, _> = task
2188 .iter_cell_type_max_index()
2189 .map(|(&k, &v)| (k, v))
2190 .collect();
2191 let mut counters_to_remove = old_counters.clone();
2192
2193 for (&cell_type, &max_index) in cell_counters.iter() {
2194 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2195 if old_max_index != max_index {
2196 task.insert_cell_type_max_index(cell_type, max_index);
2197 }
2198 } else {
2199 task.insert_cell_type_max_index(cell_type, max_index);
2200 }
2201 }
2202 for (cell_type, _) in counters_to_remove {
2203 task.remove_cell_type_max_index(&cell_type);
2204 }
2205 }
2206
2207 let mut queue = AggregationUpdateQueue::new();
2208
2209 let mut old_edges = Vec::new();
2210
2211 let has_children = !new_children.is_empty();
2212 let is_immutable = task.immutable();
2213 let task_dependencies_for_immutable =
2214 if !is_immutable
2216 && !is_session_dependent
2218 && !task.invalidator()
2220 && task.is_collectibles_dependencies_empty()
2222 {
2223 Some(
2224 task.iter_output_dependencies()
2226 .chain(task.iter_cell_dependencies().map(|dep| dep.cell_ref().task))
2227 .collect::<FxHashSet<_>>(),
2228 )
2229 } else {
2230 None
2231 };
2232
2233 if has_children {
2234 let _aggregation_number =
2236 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2237
2238 #[cfg(feature = "trace_task_details")]
2239 span.record("aggregation_number", _aggregation_number);
2240
2241 old_edges.extend(
2243 task.iter_children()
2244 .filter(|task| !new_children.remove(task))
2245 .map(OutdatedEdge::Child),
2246 );
2247 } else {
2248 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2249 }
2250
2251 old_edges.extend(
2252 task.iter_outdated_collectibles()
2253 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2254 );
2255
2256 if self.should_track_dependencies() {
2257 old_edges.extend(
2264 task.iter_outdated_cell_dependencies()
2265 .map(OutdatedEdge::CellDependency),
2266 );
2267 old_edges.extend(
2268 task.iter_outdated_output_dependencies()
2269 .map(OutdatedEdge::OutputDependency),
2270 );
2271 }
2272
2273 let current_output = task.get_output();
2275 #[cfg(feature = "verify_determinism")]
2276 let no_output_set = current_output.is_none();
2277 let new_output = match result {
2278 Ok(RawVc::TaskOutput(output_task_id)) => {
2279 if let Some(OutputValue::Output(current_task_id)) = current_output
2280 && *current_task_id == output_task_id
2281 {
2282 None
2283 } else {
2284 Some(OutputValue::Output(output_task_id))
2285 }
2286 }
2287 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2288 if let Some(OutputValue::Cell(CellRef {
2289 task: current_task_id,
2290 cell: current_cell,
2291 })) = current_output
2292 && *current_task_id == output_task_id
2293 && *current_cell == cell
2294 {
2295 None
2296 } else {
2297 Some(OutputValue::Cell(CellRef {
2298 task: output_task_id,
2299 cell,
2300 }))
2301 }
2302 }
2303 Ok(RawVc::LocalOutput(..)) => {
2304 panic!("Non-local tasks must not return a local Vc");
2305 }
2306 Err(err) => {
2307 if let Some(OutputValue::Error(old_error)) = current_output
2308 && **old_error == err
2309 {
2310 None
2311 } else {
2312 Some(OutputValue::Error(Arc::new((&err).into())))
2313 }
2314 }
2315 };
2316 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2317 if new_output.is_some() && ctx.should_track_dependencies() {
2319 output_dependent_tasks = task.iter_output_dependent().collect();
2320 }
2321
2322 drop(task);
2323
2324 let mut is_now_immutable = false;
2326 if let Some(dependencies) = task_dependencies_for_immutable
2327 && dependencies
2328 .iter()
2329 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2330 {
2331 is_now_immutable = true;
2332 }
2333 #[cfg(feature = "trace_task_details")]
2334 span.record("immutable", is_immutable || is_now_immutable);
2335
2336 if !queue.is_empty() || !old_edges.is_empty() {
2337 #[cfg(any(
2338 feature = "trace_task_completion",
2339 feature = "trace_aggregation_update_stats"
2340 ))]
2341 let _span =
2342 tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2343 .entered();
2344 #[cfg(feature = "trace_aggregation_update_stats")]
2348 {
2349 let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2350 _span.record("stats", tracing::field::debug(stats));
2351 }
2352 #[cfg(not(feature = "trace_aggregation_update_stats"))]
2353 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2354 }
2355
2356 Ok(TaskExecutionCompletePrepareResult {
2357 new_children,
2358 is_now_immutable,
2359 #[cfg(feature = "verify_determinism")]
2360 no_output_set,
2361 #[cfg(feature = "task_dirty_cause")]
2362 function_id,
2363 new_output,
2364 output_dependent_tasks,
2365 is_recomputation,
2366 is_session_dependent,
2367 })
2368 }
2369
2370 fn task_execution_completed_invalidate_output_dependent(
2371 &self,
2372 ctx: &mut impl ExecuteContext<'_>,
2373 task_id: TaskId,
2374 #[cfg(feature = "task_dirty_cause")] function_id: Option<FunctionId>,
2375 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2376 ) {
2377 debug_assert!(!output_dependent_tasks.is_empty());
2378
2379 #[cfg(feature = "task_dirty_cause")]
2380 let cause = match function_id {
2381 Some(function) => TaskDirtyCause::OutputChange { function },
2382 None => TaskDirtyCause::RootOutputChange,
2383 };
2384
2385 if output_dependent_tasks.len() > 1 {
2386 ctx.prepare_tasks(
2387 output_dependent_tasks
2388 .iter()
2389 .map(|&id| (id, TaskDataCategory::All)),
2390 "invalidate output dependents",
2391 );
2392 }
2393
2394 fn process_output_dependents(
2395 ctx: &mut impl ExecuteContext<'_>,
2396 task_id: TaskId,
2397 #[cfg(feature = "task_dirty_cause")] cause: &TaskDirtyCause,
2398 dependent_task_id: TaskId,
2399 queue: &mut AggregationUpdateQueue,
2400 ) {
2401 #[cfg(feature = "trace_task_output_dependencies")]
2402 let span = tracing::trace_span!(
2403 "invalidate output dependency",
2404 task = %task_id,
2405 dependent_task = %dependent_task_id,
2406 result = tracing::field::Empty,
2407 )
2408 .entered();
2409 let mut make_stale = true;
2410 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2411 let transient_task_type = dependent.get_transient_task_type();
2412 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2413 #[cfg(feature = "trace_task_output_dependencies")]
2415 span.record("result", "once task");
2416 return;
2417 }
2418 if dependent.outdated_output_dependencies_contains(&task_id) {
2419 #[cfg(feature = "trace_task_output_dependencies")]
2420 span.record("result", "outdated dependency");
2421 make_stale = false;
2426 } else if !dependent.output_dependencies_contains(&task_id) {
2427 #[cfg(feature = "trace_task_output_dependencies")]
2430 span.record("result", "no backward dependency");
2431 return;
2432 }
2433 make_task_dirty_internal(
2434 dependent,
2435 dependent_task_id,
2436 make_stale,
2437 #[cfg(feature = "task_dirty_cause")]
2438 cause.clone(),
2439 queue,
2440 ctx,
2441 );
2442 #[cfg(feature = "trace_task_output_dependencies")]
2443 span.record("result", "marked dirty");
2444 }
2445
2446 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2447 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2448 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2449 let _ = scope_and_block(chunks.len(), |scope| {
2450 for chunk in chunks {
2451 let child_ctx = ctx.child_context();
2452 #[cfg(feature = "task_dirty_cause")]
2453 let cause = &cause;
2454 scope.spawn(move || {
2455 let mut ctx = child_ctx.create();
2456 let mut queue = AggregationUpdateQueue::new();
2457 for dependent_task_id in chunk {
2458 process_output_dependents(
2459 &mut ctx,
2460 task_id,
2461 #[cfg(feature = "task_dirty_cause")]
2462 cause,
2463 dependent_task_id,
2464 &mut queue,
2465 )
2466 }
2467 queue.execute(&mut ctx);
2468 });
2469 }
2470 });
2471 } else {
2472 let mut queue = AggregationUpdateQueue::new();
2473 for dependent_task_id in output_dependent_tasks {
2474 process_output_dependents(
2475 ctx,
2476 task_id,
2477 #[cfg(feature = "task_dirty_cause")]
2478 &cause,
2479 dependent_task_id,
2480 &mut queue,
2481 );
2482 }
2483 queue.execute(ctx);
2484 }
2485 }
2486
2487 fn task_execution_completed_unfinished_children_dirty(
2488 &self,
2489 ctx: &mut impl ExecuteContext<'_>,
2490 new_children: &FxHashSet<TaskId>,
2491 ) {
2492 debug_assert!(!new_children.is_empty());
2493
2494 let mut queue = AggregationUpdateQueue::new();
2495 ctx.for_each_task_all(
2496 new_children.iter().copied(),
2497 "unfinished children dirty",
2498 |child_task, ctx| {
2499 if !child_task.has_output() {
2500 let child_id = child_task.id();
2501 make_task_dirty_internal(
2502 child_task,
2503 child_id,
2504 false,
2505 #[cfg(feature = "task_dirty_cause")]
2506 TaskDirtyCause::InitialDirty,
2507 &mut queue,
2508 ctx,
2509 );
2510 }
2511 },
2512 );
2513
2514 queue.execute(ctx);
2515 }
2516
2517 fn task_execution_completed_connect(
2518 &self,
2519 ctx: &mut impl ExecuteContext<'_>,
2520 task_id: TaskId,
2521 new_children: FxHashSet<TaskId>,
2522 ) -> Option<TaskPriority> {
2523 debug_assert!(!new_children.is_empty());
2524
2525 let mut task = ctx.task(task_id, TaskDataCategory::All);
2526 let Some(in_progress) = task.get_in_progress() else {
2527 panic!("Task execution completed, but task is not in progress: {task:#?}");
2528 };
2529 if matches!(in_progress, InProgressState::Canceled) {
2530 return None;
2532 }
2533 let InProgressState::InProgress(box InProgressStateInner {
2534 #[cfg(not(feature = "no_fast_stale"))]
2535 stale,
2536 once_task: is_once_task,
2537 ..
2538 }) = in_progress
2539 else {
2540 panic!("Task execution completed, but task is not in progress: {task:#?}");
2541 };
2542
2543 #[cfg(not(feature = "no_fast_stale"))]
2545 if *stale && !is_once_task {
2546 let stale_priority = compute_stale_priority(&task);
2547 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2548 task.take_in_progress()
2549 else {
2550 unreachable!();
2551 };
2552 let old = task.set_in_progress(InProgressState::Scheduled {
2553 done_event,
2554 reason: TaskExecutionReason::Stale,
2555 });
2556 debug_assert!(old.is_none(), "InProgress already exists");
2557 drop(task);
2558
2559 AggregationUpdateQueue::run(
2562 AggregationUpdateJob::DecreaseActiveCounts {
2563 task_ids: new_children.into_iter().collect(),
2564 },
2565 ctx,
2566 );
2567 return Some(stale_priority);
2568 }
2569
2570 let has_active_count = ctx.should_track_activeness()
2571 && task
2572 .get_activeness()
2573 .is_some_and(|activeness| activeness.active_counter > 0);
2574 connect_children(
2575 ctx,
2576 task_id,
2577 task,
2578 new_children,
2579 has_active_count,
2580 ctx.should_track_activeness(),
2581 );
2582
2583 None
2584 }
2585
2586 #[allow(clippy::type_complexity)]
2587 fn task_execution_completed_finish(
2588 &self,
2589 ctx: &mut impl ExecuteContext<'_>,
2590 task_id: TaskId,
2591 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2592 new_output: Option<OutputValue>,
2593 is_now_immutable: bool,
2594 is_session_dependent: bool,
2595 ) -> (
2596 Option<TaskPriority>,
2597 Option<
2598 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2599 >,
2600 ) {
2601 let mut task = ctx.task(task_id, TaskDataCategory::All);
2602 let Some(in_progress) = task.take_in_progress() else {
2603 panic!("Task execution completed, but task is not in progress: {task:#?}");
2604 };
2605 if matches!(in_progress, InProgressState::Canceled) {
2606 return (None, None);
2608 }
2609 let InProgressState::InProgress(box InProgressStateInner {
2610 done_event,
2611 once_task: is_once_task,
2612 stale,
2613 marked_as_completed: _,
2614 new_children,
2615 }) = in_progress
2616 else {
2617 panic!("Task execution completed, but task is not in progress: {task:#?}");
2618 };
2619 debug_assert!(new_children.is_empty());
2620
2621 if stale && !is_once_task {
2623 let stale_priority = compute_stale_priority(&task);
2624 let old = task.set_in_progress(InProgressState::Scheduled {
2625 done_event,
2626 reason: TaskExecutionReason::Stale,
2627 });
2628 debug_assert!(old.is_none(), "InProgress already exists");
2629 return (Some(stale_priority), None);
2630 }
2631
2632 let mut old_content = None;
2634 if let Some(value) = new_output {
2635 old_content = task.set_output(value);
2636 }
2637
2638 if is_now_immutable {
2641 task.set_immutable(true);
2642 }
2643
2644 let in_progress_cells = task.take_in_progress_cells();
2646 if let Some(ref cells) = in_progress_cells {
2647 for state in cells.values() {
2648 state.event.notify(usize::MAX);
2649 }
2650 }
2651
2652 let new_dirtyness = if is_session_dependent {
2654 Some(Dirtyness::SessionDependent)
2655 } else {
2656 None
2657 };
2658 #[cfg(feature = "verify_determinism")]
2659 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2660 let data_update = task.update_dirty_state(new_dirtyness);
2661
2662 #[cfg(feature = "verify_determinism")]
2666 let stale_priority: Option<TaskPriority> =
2667 ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2668 .then(TaskPriority::leaf);
2669 #[cfg(not(feature = "verify_determinism"))]
2670 let stale_priority: Option<TaskPriority> = None;
2671 if stale_priority.is_some() {
2672 let old = task.set_in_progress(InProgressState::Scheduled {
2673 done_event,
2674 reason: TaskExecutionReason::Stale,
2675 });
2676 debug_assert!(old.is_none(), "InProgress already exists");
2677 drop(task);
2678 } else {
2679 drop(task);
2680
2681 done_event.notify(usize::MAX);
2683 }
2684
2685 drop(old_content);
2686
2687 if let Some(data_update) = data_update {
2688 AggregationUpdateQueue::run(data_update, ctx);
2689 }
2690
2691 (stale_priority, in_progress_cells)
2693 }
2694
2695 fn task_execution_completed_cleanup(
2696 &self,
2697 ctx: &mut impl ExecuteContext<'_>,
2698 task_id: TaskId,
2699 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2700 is_error: bool,
2701 is_recomputation: bool,
2702 ) -> Vec<SharedReference> {
2703 let mut task = ctx.task(task_id, TaskDataCategory::All);
2704 let mut removed_cell_data = Vec::new();
2705 if !is_error || is_recomputation {
2711 let to_remove: Vec<_> = task
2717 .iter_cell_data()
2718 .filter_map(|(cell, _)| {
2719 cell_counters
2720 .get(&cell.type_id)
2721 .is_none_or(|start_index| cell.index >= *start_index)
2722 .then_some(*cell)
2723 })
2724 .collect();
2725 removed_cell_data.reserve_exact(to_remove.len());
2726 for cell in to_remove {
2727 if let Some(data) = task.remove_cell_data(&cell) {
2728 removed_cell_data.push(data);
2729 }
2730 }
2731 let to_remove_hash: Vec<_> = task
2733 .iter_cell_data_hash()
2734 .filter_map(|(cell, _)| {
2735 cell_counters
2736 .get(&cell.type_id)
2737 .is_none_or(|start_index| cell.index >= *start_index)
2738 .then_some(*cell)
2739 })
2740 .collect();
2741 for cell in to_remove_hash {
2742 task.remove_cell_data_hash(&cell);
2743 }
2744 }
2745
2746 task.cleanup_after_execution();
2750
2751 drop(task);
2752
2753 removed_cell_data
2755 }
2756
2757 fn log_unrecoverable_persist_error() {
2760 eprintln!(
2761 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2762 background persisting process."
2763 );
2764 }
2765
2766 fn run_backend_job<'a>(
2767 &'a self,
2768 job: TurboTasksBackendJob,
2769 turbo_tasks: &'a TurboTasks<TurboTasksBackend>,
2770 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2771 Box::pin(async move {
2772 match job {
2773 TurboTasksBackendJob::Snapshot => {
2774 debug_assert!(self.should_persist());
2775
2776 let mut last_snapshot = self.start_time;
2777 let mut idle_start_listener = self.idle_start_event.listen();
2778 let mut idle_end_listener = self.idle_end_event.listen();
2779 let mut fresh_idle = true;
2782 let mut evicted = false;
2783 let mut is_first = true;
2784 'outer: loop {
2785 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2786 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2787 let idle_timeout = *IDLE_TIMEOUT;
2788 let (time, mut reason) = if is_first {
2789 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2790 } else {
2791 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2792 };
2793
2794 let until = last_snapshot + time;
2795 if until > Instant::now() {
2796 let mut stop_listener = self.stopping_event.listen();
2797 if self.stopping.load(Ordering::Acquire) {
2798 return;
2799 }
2800 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2801 Instant::now() + idle_timeout
2802 } else {
2803 far_future()
2804 };
2805 loop {
2806 tokio::select! {
2807 _ = &mut stop_listener => {
2808 return;
2809 },
2810 _ = &mut idle_start_listener => {
2811 idle_time = Instant::now() + idle_timeout;
2812 idle_start_listener = self.idle_start_event.listen()
2813 },
2814 _ = &mut idle_end_listener => {
2815 idle_time = far_future();
2816 idle_end_listener = self.idle_end_event.listen()
2817 },
2818 _ = tokio::time::sleep_until(until) => {
2819 break;
2820 },
2821 _ = tokio::time::sleep_until(idle_time) => {
2822 if turbo_tasks.is_idle() {
2823 reason = "idle timeout";
2824 break;
2825 }
2826 },
2827 }
2828 }
2829 }
2830
2831 let background_span =
2835 tracing::info_span!(parent: None, "background snapshot");
2836 match self.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2837 Err(err) => {
2838 eprintln!("Persisting failed: {err:?}");
2841 Self::log_unrecoverable_persist_error();
2842 return;
2843 }
2844 Ok((snapshot_start, new_data)) => {
2845 fresh_idle = new_data;
2847 is_first = false;
2848 last_snapshot = snapshot_start;
2849
2850 macro_rules! check_idle_ended {
2854 () => {{
2855 tokio::select! {
2856 biased;
2857 _ = &mut idle_end_listener => {
2858 idle_end_listener = self.idle_end_event.listen();
2859 true
2860 },
2861 _ = std::future::ready(()) => false,
2862 }
2863 }};
2864 }
2865 let mut ran_eviction = false;
2883 if self.should_evict() && (new_data || !evicted) {
2884 if check_idle_ended!() {
2885 continue 'outer;
2888 }
2889 evicted = true;
2890 ran_eviction = true;
2891 self.storage.evict_after_snapshot(background_span.id());
2892 }
2893
2894 let mut ran_compaction = false;
2901 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2902 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2903 if check_idle_ended!() {
2904 continue 'outer;
2905 }
2906 let _compact_span = tracing::info_span!(
2910 parent: background_span.id(),
2911 "compact database"
2912 )
2913 .entered();
2914 match self.backing_storage.compact() {
2915 Ok(true) => {
2916 ran_compaction = true;
2917 }
2918 Ok(false) => break,
2919 Err(err) => {
2920 eprintln!("Compaction failed: {err:?}");
2921 if self.backing_storage.has_unrecoverable_write_error()
2922 {
2923 Self::log_unrecoverable_persist_error();
2924 return;
2925 }
2926 break;
2927 }
2928 }
2929 }
2930 if check_idle_ended!() {
2931 continue 'outer;
2932 }
2933 if new_data || ran_compaction || ran_eviction {
2938 turbo_tasks_malloc::TurboMalloc::collect(true);
2939 }
2940 }
2941 }
2942 }
2943 }
2944 }
2945 })
2946 }
2947
2948 fn try_read_own_task_cell(
2949 &self,
2950 task_id: TaskId,
2951 cell: CellId,
2952 turbo_tasks: &TurboTasks<TurboTasksBackend>,
2953 ) -> Result<TypedCellContent> {
2954 let mut ctx = self.execute_context(turbo_tasks);
2955 let task = ctx.task(task_id, TaskDataCategory::Data);
2956 if let Some(content) = task.get_cell_data(&cell).cloned() {
2957 Ok(CellContent(Some(content)).into_typed(cell.type_id))
2958 } else {
2959 Ok(CellContent(None).into_typed(cell.type_id))
2960 }
2961 }
2962
2963 fn read_task_collectibles(
2964 &self,
2965 task_id: TaskId,
2966 collectible_type: TraitTypeId,
2967 reader_id: Option<TaskId>,
2968 turbo_tasks: &TurboTasks<TurboTasksBackend>,
2969 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2970 let mut ctx = self.execute_context(turbo_tasks);
2971 let mut collectibles = AutoMap::default();
2972 {
2973 let mut task = ctx.task(task_id, TaskDataCategory::All);
2974 if task
2975 .get_persistent_task_type()
2976 .is_some_and(|t| !t.native_fn.is_root)
2977 {
2978 drop(task);
2979 panic!(
2980 "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
2981 is missing on the task.",
2982 self.debug_get_task_description(task_id),
2983 reader_id.map_or_else(
2984 || "unknown".to_string(),
2985 |r| self.debug_get_task_description(r)
2986 )
2987 );
2988 }
2989 for (collectible, count) in task.iter_aggregated_collectibles() {
2990 if *count > 0 && collectible.collectible_type == collectible_type {
2991 *collectibles
2992 .entry(RawVc::TaskCell(
2993 collectible.cell.task,
2994 collectible.cell.cell,
2995 ))
2996 .or_insert(0) += 1;
2997 }
2998 }
2999 for (&collectible, &count) in task.iter_collectibles() {
3000 if collectible.collectible_type == collectible_type {
3001 *collectibles
3002 .entry(RawVc::TaskCell(
3003 collectible.cell.task,
3004 collectible.cell.cell,
3005 ))
3006 .or_insert(0) += count;
3007 }
3008 }
3009 if let Some(reader_id) = reader_id {
3010 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3011 }
3012 }
3013 if let Some(reader_id) = reader_id {
3014 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3015 let target = CollectiblesRef {
3016 task: task_id,
3017 collectible_type,
3018 };
3019 if !reader.remove_outdated_collectibles_dependencies(&target) {
3020 let _ = reader.add_collectibles_dependencies(target);
3021 }
3022 }
3023 collectibles
3024 }
3025
3026 fn emit_collectible(
3027 &self,
3028 collectible_type: TraitTypeId,
3029 collectible: RawVc,
3030 task_id: TaskId,
3031 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3032 ) {
3033 self.assert_valid_collectible(task_id, collectible);
3034
3035 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3036 panic!("Collectibles need to be resolved");
3037 };
3038 let cell = CellRef {
3039 task: collectible_task,
3040 cell,
3041 };
3042 operation::UpdateCollectibleOperation::run(
3043 task_id,
3044 CollectibleRef {
3045 collectible_type,
3046 cell,
3047 },
3048 1,
3049 self.execute_context(turbo_tasks),
3050 );
3051 }
3052
3053 fn unemit_collectible(
3054 &self,
3055 collectible_type: TraitTypeId,
3056 collectible: RawVc,
3057 count: u32,
3058 task_id: TaskId,
3059 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3060 ) {
3061 self.assert_valid_collectible(task_id, collectible);
3062
3063 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3064 panic!("Collectibles need to be resolved");
3065 };
3066 let cell = CellRef {
3067 task: collectible_task,
3068 cell,
3069 };
3070 operation::UpdateCollectibleOperation::run(
3071 task_id,
3072 CollectibleRef {
3073 collectible_type,
3074 cell,
3075 },
3076 -(i32::try_from(count).unwrap()),
3077 self.execute_context(turbo_tasks),
3078 );
3079 }
3080
3081 fn update_task_cell(
3082 &self,
3083 task_id: TaskId,
3084 cell: CellId,
3085 content: CellContent,
3086 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3087 content_hash: Option<CellHash>,
3088 verification_mode: VerificationMode,
3089 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3090 ) {
3091 operation::UpdateCellOperation::run(
3092 task_id,
3093 cell,
3094 content,
3095 updated_key_hashes,
3096 content_hash,
3097 verification_mode,
3098 self.execute_context(turbo_tasks),
3099 );
3100 }
3101
3102 fn mark_own_task_as_finished(&self, task: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
3103 let mut ctx = self.execute_context(turbo_tasks);
3104 let mut task = ctx.task(task, TaskDataCategory::Data);
3105 if let Some(InProgressState::InProgress(box InProgressStateInner {
3106 marked_as_completed,
3107 ..
3108 })) = task.get_in_progress_mut()
3109 {
3110 *marked_as_completed = true;
3111 }
3116 }
3117
3118 fn connect_task(
3119 &self,
3120 task: TaskId,
3121 parent_task: Option<TaskId>,
3122 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3123 ) {
3124 self.assert_not_persistent_calling_transient(parent_task, task, None);
3125 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3126 }
3127
3128 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3129 let task_id = self.transient_task_id_factory.get();
3130 {
3131 let mut task = self.storage.access_mut(task_id);
3132 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3133 }
3134 #[cfg(feature = "verify_aggregation_graph")]
3135 self.root_tasks.lock().insert(task_id);
3136 task_id
3137 }
3138
3139 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
3140 #[cfg(feature = "verify_aggregation_graph")]
3141 self.root_tasks.lock().remove(&task_id);
3142
3143 let mut ctx = self.execute_context(turbo_tasks);
3144 let mut task = ctx.task(task_id, TaskDataCategory::All);
3145 let is_dirty = task.is_dirty();
3146 let has_dirty_containers = task.has_dirty_containers();
3147 if is_dirty.is_some() || has_dirty_containers {
3148 if let Some(activeness_state) = task.get_activeness_mut() {
3149 activeness_state.unset_root_type();
3151 activeness_state.set_active_until_clean();
3152 };
3153 } else if let Some(activeness_state) = task.take_activeness() {
3154 activeness_state.all_clean_event.notify(usize::MAX);
3157 }
3158 }
3159
3160 #[cfg(feature = "verify_aggregation_graph")]
3161 fn verify_aggregation_graph(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>, idle: bool) {
3162 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3163 return;
3164 }
3165 use std::{collections::VecDeque, env, io::stdout};
3166
3167 use crate::backend::operation::{get_uppers, is_aggregating_node};
3168
3169 let mut ctx = self.execute_context(turbo_tasks);
3170 let root_tasks = self.root_tasks.lock().clone();
3171
3172 for task_id in root_tasks.into_iter() {
3173 let mut queue = VecDeque::new();
3174 let mut visited = FxHashSet::default();
3175 let mut aggregated_nodes = FxHashSet::default();
3176 let mut collectibles = FxHashMap::default();
3177 let root_task_id = task_id;
3178 visited.insert(task_id);
3179 aggregated_nodes.insert(task_id);
3180 queue.push_back(task_id);
3181 let mut counter = 0;
3182 while let Some(task_id) = queue.pop_front() {
3183 counter += 1;
3184 if counter % 100000 == 0 {
3185 println!(
3186 "queue={}, visited={}, aggregated_nodes={}",
3187 queue.len(),
3188 visited.len(),
3189 aggregated_nodes.len()
3190 );
3191 }
3192 let task = ctx.task(task_id, TaskDataCategory::All);
3193 if idle && !self.is_idle.load(Ordering::Relaxed) {
3194 return;
3195 }
3196
3197 let uppers = get_uppers(&task);
3198 if task_id != root_task_id
3199 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3200 {
3201 panic!(
3202 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3203 {:?})",
3204 task_id,
3205 task.get_task_description(),
3206 uppers
3207 );
3208 }
3209
3210 for (collectible, _) in task.iter_aggregated_collectibles() {
3211 collectibles
3212 .entry(*collectible)
3213 .or_insert_with(|| (false, Vec::new()))
3214 .1
3215 .push(task_id);
3216 }
3217
3218 for (&collectible, &value) in task.iter_collectibles() {
3219 if value > 0 {
3220 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3221 *flag = true
3222 } else {
3223 panic!(
3224 "Task {} has a collectible {:?} that is not in any upper task",
3225 task_id, collectible
3226 );
3227 }
3228 }
3229 }
3230
3231 let is_dirty = task.has_dirty();
3232 let has_dirty_container = task.has_dirty_containers();
3233 let should_be_in_upper = is_dirty || has_dirty_container;
3234
3235 let aggregation_number = get_aggregation_number(&task);
3236 if is_aggregating_node(aggregation_number) {
3237 aggregated_nodes.insert(task_id);
3238 }
3239 for child_id in task.iter_children() {
3246 if visited.insert(child_id) {
3248 queue.push_back(child_id);
3249 }
3250 }
3251 drop(task);
3252
3253 if should_be_in_upper {
3254 for upper_id in uppers {
3255 let upper = ctx.task(upper_id, TaskDataCategory::All);
3256 let in_upper = upper
3257 .get_aggregated_dirty_containers(&task_id)
3258 .is_some_and(|&dirty| dirty > 0);
3259 if !in_upper {
3260 let containers: Vec<_> = upper
3261 .iter_aggregated_dirty_containers()
3262 .map(|(&k, &v)| (k, v))
3263 .collect();
3264 let upper_task_desc = upper.get_task_description();
3265 drop(upper);
3266 panic!(
3267 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3268 ({})\nThese dirty containers are present:\n{:#?}",
3269 task_id,
3270 ctx.task(task_id, TaskDataCategory::Data)
3271 .get_task_description(),
3272 upper_id,
3273 upper_task_desc,
3274 containers,
3275 );
3276 }
3277 }
3278 }
3279 }
3280
3281 for (collectible, (flag, task_ids)) in collectibles {
3282 if !flag {
3283 use std::io::Write;
3284 let mut stdout = stdout().lock();
3285 writeln!(
3286 stdout,
3287 "{:?} that is not emitted in any child task but in these aggregated \
3288 tasks: {:#?}",
3289 collectible,
3290 task_ids
3291 .iter()
3292 .map(|t| format!(
3293 "{t} {}",
3294 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3295 ))
3296 .collect::<Vec<_>>()
3297 )
3298 .unwrap();
3299
3300 let task_id = collectible.cell.task;
3301 let mut queue = {
3302 let task = ctx.task(task_id, TaskDataCategory::All);
3303 get_uppers(&task)
3304 };
3305 let mut visited = FxHashSet::default();
3306 for &upper_id in queue.iter() {
3307 visited.insert(upper_id);
3308 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3309 }
3310 while let Some(task_id) = queue.pop() {
3311 let task = ctx.task(task_id, TaskDataCategory::All);
3312 let desc = task.get_task_description();
3313 let aggregated_collectible = task
3314 .get_aggregated_collectibles(&collectible)
3315 .copied()
3316 .unwrap_or_default();
3317 let uppers = get_uppers(&task);
3318 drop(task);
3319 writeln!(
3320 stdout,
3321 "upper {task_id} {desc} collectible={aggregated_collectible}"
3322 )
3323 .unwrap();
3324 if task_ids.contains(&task_id) {
3325 writeln!(
3326 stdout,
3327 "Task has an upper connection to an aggregated task that doesn't \
3328 reference it. Upper connection is invalid!"
3329 )
3330 .unwrap();
3331 }
3332 for upper_id in uppers {
3333 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3334 if !visited.contains(&upper_id) {
3335 queue.push(upper_id);
3336 }
3337 }
3338 }
3339 panic!("See stdout for more details");
3340 }
3341 }
3342 }
3343 }
3344
3345 fn assert_not_persistent_calling_transient(
3346 &self,
3347 parent_id: Option<TaskId>,
3348 child_id: TaskId,
3349 cell_id: Option<CellId>,
3350 ) {
3351 if let Some(parent_id) = parent_id
3352 && !parent_id.is_transient()
3353 && child_id.is_transient()
3354 {
3355 self.panic_persistent_calling_transient(
3356 self.debug_get_task_description(parent_id),
3357 self.debug_get_cached_task_type(child_id).as_deref(),
3358 cell_id,
3359 );
3360 }
3361 }
3362
3363 fn panic_persistent_calling_transient(
3364 &self,
3365 parent: String,
3366 child: Option<&CachedTaskType>,
3367 cell_id: Option<CellId>,
3368 ) -> ! {
3369 let transient_reason = if let Some(child) = child {
3370 Cow::Owned(format!(
3371 " The callee is transient because it depends on:\n{}",
3372 self.debug_trace_transient_task(child, cell_id),
3373 ))
3374 } else {
3375 Cow::Borrowed("")
3376 };
3377 panic!(
3378 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3379 parent,
3380 child.map_or("unknown", |t| t.get_name()),
3381 transient_reason,
3382 );
3383 }
3384
3385 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3386 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3388 let task_info = if let Some(col_task_ty) = collectible
3390 .try_get_task_id()
3391 .map(|t| self.debug_get_task_description(t))
3392 {
3393 Cow::Owned(format!(" (return type of {col_task_ty})"))
3394 } else {
3395 Cow::Borrowed("")
3396 };
3397 panic!("Collectible{task_info} must be a ResolvedVc")
3398 };
3399 if col_task_id.is_transient() && !task_id.is_transient() {
3400 let transient_reason =
3401 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3402 Cow::Owned(format!(
3403 ". The collectible is transient because it depends on:\n{}",
3404 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3405 ))
3406 } else {
3407 Cow::Borrowed("")
3408 };
3409 panic!(
3411 "Collectible is transient, transient collectibles cannot be emitted from \
3412 persistent tasks{transient_reason}",
3413 )
3414 }
3415 }
3416}
3417
3418impl Backend for TurboTasksBackend {
3419 fn startup(&self, turbo_tasks: &TurboTasks<Self>) {
3420 self.startup(turbo_tasks);
3421 }
3422
3423 fn stopping(&self, _turbo_tasks: &TurboTasks<Self>) {
3424 self.stopping();
3425 }
3426
3427 fn stop(&self, turbo_tasks: &TurboTasks<Self>) {
3428 self.stop(turbo_tasks);
3429 }
3430
3431 fn idle_start(&self, turbo_tasks: &TurboTasks<Self>) {
3432 self.idle_start(turbo_tasks);
3433 }
3434
3435 fn idle_end(&self, _turbo_tasks: &TurboTasks<Self>) {
3436 self.idle_end();
3437 }
3438
3439 fn get_or_create_task(
3440 &self,
3441 native_fn: &'static NativeFunction,
3442 this: Option<RawVc>,
3443 arg: &mut dyn DynTaskInputsStorage,
3444 parent_task: Option<TaskId>,
3445 persistence: TaskPersistence,
3446 turbo_tasks: &TurboTasks<Self>,
3447 ) -> TaskId {
3448 self.get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3449 }
3450
3451 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3452 self.invalidate_task(task_id, turbo_tasks);
3453 }
3454
3455 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<Self>) {
3456 self.invalidate_tasks(tasks, turbo_tasks);
3457 }
3458
3459 fn invalidate_tasks_set(
3460 &self,
3461 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3462 turbo_tasks: &TurboTasks<Self>,
3463 ) {
3464 self.invalidate_tasks_set(tasks, turbo_tasks);
3465 }
3466
3467 fn invalidate_serialization(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3468 self.invalidate_serialization(task_id, turbo_tasks);
3469 }
3470
3471 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) {
3472 self.task_execution_canceled(task, turbo_tasks)
3473 }
3474
3475 fn try_start_task_execution(
3476 &self,
3477 task_id: TaskId,
3478 priority: TaskPriority,
3479 turbo_tasks: &TurboTasks<Self>,
3480 ) -> Option<TaskExecutionSpec<'_>> {
3481 self.try_start_task_execution(task_id, priority, turbo_tasks)
3482 }
3483
3484 fn task_execution_completed(
3485 &self,
3486 task_id: TaskId,
3487 result: Result<RawVc, TurboTasksExecutionError>,
3488 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3489 #[cfg(feature = "verify_determinism")] stateful: bool,
3490 has_invalidator: bool,
3491 turbo_tasks: &TurboTasks<Self>,
3492 ) -> Option<TaskPriority> {
3493 self.task_execution_completed(
3494 task_id,
3495 result,
3496 cell_counters,
3497 #[cfg(feature = "verify_determinism")]
3498 stateful,
3499 has_invalidator,
3500 turbo_tasks,
3501 )
3502 }
3503
3504 type BackendJob = TurboTasksBackendJob;
3505
3506 fn run_backend_job<'a>(
3507 &'a self,
3508 job: Self::BackendJob,
3509 turbo_tasks: &'a TurboTasks<Self>,
3510 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3511 self.run_backend_job(job, turbo_tasks)
3512 }
3513
3514 fn try_read_task_output(
3515 &self,
3516 task_id: TaskId,
3517 reader: Option<TaskId>,
3518 options: ReadOutputOptions,
3519 turbo_tasks: &TurboTasks<Self>,
3520 ) -> Result<Result<RawVc, EventListener>> {
3521 self.try_read_task_output(task_id, reader, options, turbo_tasks)
3522 }
3523
3524 fn try_read_task_cell(
3525 &self,
3526 task_id: TaskId,
3527 cell: CellId,
3528 reader: Option<TaskId>,
3529 options: ReadCellOptions,
3530 turbo_tasks: &TurboTasks<Self>,
3531 ) -> Result<Result<TypedCellContent, EventListener>> {
3532 self.try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3533 }
3534
3535 fn try_read_own_task_cell(
3536 &self,
3537 task_id: TaskId,
3538 cell: CellId,
3539 turbo_tasks: &TurboTasks<Self>,
3540 ) -> Result<TypedCellContent> {
3541 self.try_read_own_task_cell(task_id, cell, turbo_tasks)
3542 }
3543
3544 fn read_task_collectibles(
3545 &self,
3546 task_id: TaskId,
3547 collectible_type: TraitTypeId,
3548 reader: Option<TaskId>,
3549 turbo_tasks: &TurboTasks<Self>,
3550 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3551 self.read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3552 }
3553
3554 fn emit_collectible(
3555 &self,
3556 collectible_type: TraitTypeId,
3557 collectible: RawVc,
3558 task_id: TaskId,
3559 turbo_tasks: &TurboTasks<Self>,
3560 ) {
3561 self.emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3562 }
3563
3564 fn unemit_collectible(
3565 &self,
3566 collectible_type: TraitTypeId,
3567 collectible: RawVc,
3568 count: u32,
3569 task_id: TaskId,
3570 turbo_tasks: &TurboTasks<Self>,
3571 ) {
3572 self.unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3573 }
3574
3575 fn update_task_cell(
3576 &self,
3577 task_id: TaskId,
3578 cell: CellId,
3579 content: CellContent,
3580 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3581 content_hash: Option<CellHash>,
3582 verification_mode: VerificationMode,
3583 turbo_tasks: &TurboTasks<Self>,
3584 ) {
3585 self.update_task_cell(
3586 task_id,
3587 cell,
3588 content,
3589 updated_key_hashes,
3590 content_hash,
3591 verification_mode,
3592 turbo_tasks,
3593 );
3594 }
3595
3596 fn mark_own_task_as_finished(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3597 self.mark_own_task_as_finished(task_id, turbo_tasks);
3598 }
3599
3600 fn connect_task(
3601 &self,
3602 task: TaskId,
3603 parent_task: Option<TaskId>,
3604 turbo_tasks: &TurboTasks<Self>,
3605 ) {
3606 self.connect_task(task, parent_task, turbo_tasks);
3607 }
3608
3609 fn create_transient_task(
3610 &self,
3611 task_type: TransientTaskType,
3612 _turbo_tasks: &TurboTasks<Self>,
3613 ) -> TaskId {
3614 self.create_transient_task(task_type)
3615 }
3616
3617 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3618 self.dispose_root_task(task_id, turbo_tasks);
3619 }
3620
3621 fn task_statistics(&self) -> &TaskStatisticsApi {
3622 &self.task_statistics
3623 }
3624
3625 fn is_tracking_dependencies(&self) -> bool {
3626 self.options.dependency_tracking
3627 }
3628
3629 fn get_task_name(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) -> String {
3630 self.get_task_name(task, turbo_tasks)
3631 }
3632}
3633
3634enum DebugTraceTransientTask {
3635 Cached {
3636 task_name: &'static str,
3637 cell_type_id: Option<ValueTypeId>,
3638 cause_self: Option<Box<DebugTraceTransientTask>>,
3639 cause_args: Vec<DebugTraceTransientTask>,
3640 },
3641 Collapsed {
3643 task_name: &'static str,
3644 cell_type_id: Option<ValueTypeId>,
3645 },
3646 Uncached {
3647 cell_type_id: Option<ValueTypeId>,
3648 },
3649}
3650
3651impl DebugTraceTransientTask {
3652 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3653 let indent = " ".repeat(level);
3654 f.write_str(&indent)?;
3655
3656 fn fmt_cell_type_id(
3657 f: &mut fmt::Formatter<'_>,
3658 cell_type_id: Option<ValueTypeId>,
3659 ) -> fmt::Result {
3660 if let Some(ty) = cell_type_id {
3661 write!(
3662 f,
3663 " (read cell of type {})",
3664 get_value_type(ty).ty.global_name
3665 )
3666 } else {
3667 Ok(())
3668 }
3669 }
3670
3671 match self {
3673 Self::Cached {
3674 task_name,
3675 cell_type_id,
3676 ..
3677 }
3678 | Self::Collapsed {
3679 task_name,
3680 cell_type_id,
3681 ..
3682 } => {
3683 f.write_str(task_name)?;
3684 fmt_cell_type_id(f, *cell_type_id)?;
3685 if matches!(self, Self::Collapsed { .. }) {
3686 f.write_str(" (collapsed)")?;
3687 }
3688 }
3689 Self::Uncached { cell_type_id } => {
3690 f.write_str("unknown transient task")?;
3691 fmt_cell_type_id(f, *cell_type_id)?;
3692 }
3693 }
3694 f.write_char('\n')?;
3695
3696 if let Self::Cached {
3698 cause_self,
3699 cause_args,
3700 ..
3701 } = self
3702 {
3703 if let Some(c) = cause_self {
3704 writeln!(f, "{indent} self:")?;
3705 c.fmt_indented(f, level + 1)?;
3706 }
3707 if !cause_args.is_empty() {
3708 writeln!(f, "{indent} args:")?;
3709 for c in cause_args {
3710 c.fmt_indented(f, level + 1)?;
3711 }
3712 }
3713 }
3714 Ok(())
3715 }
3716}
3717
3718impl fmt::Display for DebugTraceTransientTask {
3719 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3720 self.fmt_indented(f, 0)
3721 }
3722}
3723
3724fn far_future() -> Instant {
3726 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3731}
3732
3733fn encode_task_data(
3745 task: TaskId,
3746 data: &TaskStorage,
3747 category: SpecificTaskDataCategory,
3748 scratch_buffer: &mut TurboBincodeBuffer,
3749) -> Result<TurboBincodeBuffer> {
3750 scratch_buffer.clear();
3751 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3752 data.encode(category, &mut encoder)?;
3753
3754 if cfg!(feature = "verify_serialization") {
3755 TaskStorage::new()
3756 .decode(
3757 category,
3758 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3759 )
3760 .with_context(|| {
3761 format!(
3762 "expected to be able to decode serialized data for '{category:?}' information \
3763 for {task}"
3764 )
3765 })?;
3766 }
3767 Ok(SmallVec::from_slice(scratch_buffer))
3768}