1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9 borrow::Cow,
10 fmt::{self, Write},
11 future::Future,
12 hash::BuildHasherDefault,
13 mem::take,
14 pin::Pin,
15 sync::{
16 Arc, LazyLock,
17 atomic::{AtomicBool, Ordering},
18 },
19 time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32 CellId, DynTaskInputsStorage, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
33 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
34 TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasks, TurboTasksCallApi,
35 TurboTasksPanic, ValueTypeId,
36 backend::{
37 Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38 TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40 VerificationMode,
41 },
42 event::{Event, EventDescription, EventListener},
43 macro_helpers::NativeFunction,
44 message_queue::{TimingEvent, TraceEvent},
45 registry::get_value_type,
46 scope::scope_and_block,
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51#[cfg(feature = "task_dirty_cause")]
52use turbo_tasks::{FunctionId, TaskDirtyCause};
53
54pub use self::{
55 operation::AnyOperation,
56 storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
57};
58use crate::{
59 backend::{
60 operation::{
61 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64 connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65 prepare_new_children,
66 },
67 snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68 storage::Storage,
69 storage_schema::{TaskStorage, TaskStorageAccessors},
70 },
71 backing_storage::{SnapshotItem, SnapshotMeta, compute_task_type_hash},
72 data::{
73 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
74 InProgressState, InProgressStateInner, OutputValue, TransientTask,
75 },
76 error::TaskError,
77 kv_backing_storage::TurboBackingStorage,
78 utils::{
79 dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
80 shard_amount::compute_shard_amount,
81 },
82};
83
84const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
88
89static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
92 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
93 .ok()
94 .and_then(|v| v.parse::<u64>().ok())
95 .map(Duration::from_millis)
96 .unwrap_or(Duration::from_secs(2))
97});
98
99fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
106 TaskPriority::invalidation(
107 task.get_leaf_distance()
108 .copied()
109 .unwrap_or_default()
110 .distance,
111 )
112 .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
113}
114
115pub enum StorageMode {
116 ReadOnly,
118 ReadWrite,
121 ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127 pub dependency_tracking: bool,
132
133 pub active_tracking: bool,
139
140 pub storage_mode: Option<StorageMode>,
142
143 pub num_workers: Option<usize>,
146
147 pub small_preallocation: bool,
149
150 pub evict_after_snapshot: bool,
154}
155
156impl Default for BackendOptions {
157 fn default() -> Self {
158 Self {
159 dependency_tracking: true,
160 active_tracking: true,
161 storage_mode: Some(StorageMode::ReadWrite),
162 num_workers: None,
163 small_preallocation: false,
164 evict_after_snapshot: false,
165 }
166 }
167}
168
169pub enum TurboTasksBackendJob {
170 Snapshot,
171}
172
173#[derive(Clone, Copy, Debug, PartialEq, Eq)]
175enum SnapshotReason {
176 Test,
177 Stop,
178 InitialSnapshotTimeout,
179 RegularSnapshotInterval,
180 IdleTimeout,
181}
182
183impl SnapshotReason {
184 fn as_str(self) -> &'static str {
185 match self {
186 SnapshotReason::Test => "test",
187 SnapshotReason::Stop => "stop",
188 SnapshotReason::InitialSnapshotTimeout => "initial snapshot timeout",
189 SnapshotReason::RegularSnapshotInterval => "regular snapshot interval",
190 SnapshotReason::IdleTimeout => "idle timeout",
191 }
192 }
193
194 fn drain_entries(self) -> bool {
198 matches!(self, SnapshotReason::Stop)
199 }
200}
201
202pub struct TurboTasksBackend {
203 options: BackendOptions,
204
205 start_time: Instant,
206
207 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
208 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
209
210 storage: Storage,
211
212 snapshot_coord: SnapshotCoordinator,
215 snapshot_in_progress: Mutex<()>,
220
221 stopping: AtomicBool,
222 stopping_event: Event,
223 idle_start_event: Event,
224 idle_end_event: Event,
225 #[cfg(feature = "verify_aggregation_graph")]
226 is_idle: AtomicBool,
227
228 task_statistics: TaskStatisticsApi,
229
230 backing_storage: TurboBackingStorage,
231
232 #[cfg(feature = "verify_aggregation_graph")]
233 root_tasks: Mutex<FxHashSet<TaskId>>,
234}
235
236impl TurboTasksBackend {
237 pub fn invalidate_storage(&self, reason_code: &str) -> Result<()> {
243 self.backing_storage.invalidate(reason_code)
244 }
245
246 pub fn new(mut options: BackendOptions, backing_storage: TurboBackingStorage) -> Self {
247 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
248 if !options.dependency_tracking {
249 options.active_tracking = false;
250 }
251 let small_preallocation = options.small_preallocation;
252 let next_task_id = backing_storage
253 .next_free_task_id()
254 .expect("Failed to get task id");
255 Self {
256 options,
257 start_time: Instant::now(),
258 persisted_task_id_factory: IdFactoryWithReuse::new(
259 next_task_id,
260 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
261 ),
262 transient_task_id_factory: IdFactoryWithReuse::new(
263 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
264 TaskId::MAX,
265 ),
266 storage: Storage::new(shard_amount, small_preallocation),
267 snapshot_coord: SnapshotCoordinator::new(),
268 snapshot_in_progress: Mutex::new(()),
269 stopping: AtomicBool::new(false),
270 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
271 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
272 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
273 #[cfg(feature = "verify_aggregation_graph")]
274 is_idle: AtomicBool::new(false),
275 task_statistics: TaskStatisticsApi::default(),
276 backing_storage,
277 #[cfg(feature = "verify_aggregation_graph")]
278 root_tasks: Default::default(),
279 }
280 }
281
282 fn execute_context<'a>(
283 &'a self,
284 turbo_tasks: &'a TurboTasks<TurboTasksBackend>,
285 ) -> impl ExecuteContext<'a> {
286 ExecuteContextImpl::new(self, turbo_tasks)
287 }
288
289 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
290 if self.should_persist() {
291 self.snapshot_coord.suspend_point(suspend);
292 }
293 }
294
295 pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
296 if !self.should_persist() {
297 return OperationGuard::noop();
298 }
299 self.snapshot_coord.begin_operation()
300 }
301
302 fn should_persist(&self) -> bool {
303 matches!(
304 self.options.storage_mode,
305 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
306 )
307 }
308
309 fn should_evict(&self) -> bool {
310 self.options.evict_after_snapshot && self.should_persist()
311 }
312
313 #[doc(hidden)]
320 pub fn snapshot_and_evict_for_testing(
321 &self,
322 turbo_tasks: &TurboTasks<TurboTasksBackend>,
323 ) -> (bool, EvictionCounts) {
324 assert!(
325 self.should_persist(),
326 "snapshot_and_evict requires persistence"
327 );
328 let snapshot_result = self.snapshot_and_persist(None, SnapshotReason::Test, turbo_tasks);
329 let had_new_data = match snapshot_result {
330 Ok((_, new_data)) => new_data,
331 Err(_) => {
332 return (false, EvictionCounts::default());
336 }
337 };
338 let counts = self.storage.evict_after_snapshot(None);
339 (had_new_data, counts)
340 }
341
342 fn should_restore(&self) -> bool {
343 self.options.storage_mode.is_some()
344 }
345
346 fn should_track_dependencies(&self) -> bool {
347 self.options.dependency_tracking
348 }
349
350 fn should_track_activeness(&self) -> bool {
351 self.options.active_tracking
352 }
353
354 fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
355 self.task_statistics
356 .map(|stats| stats.increment_cache_hit(native_fn));
357 }
358
359 fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
360 self.task_statistics
361 .map(|stats| stats.increment_cache_miss(native_fn));
362 }
363
364 fn task_error_to_turbo_tasks_execution_error(
369 &self,
370 error: &TaskError,
371 ctx: &mut impl ExecuteContext<'_>,
372 ) -> TurboTasksExecutionError {
373 match error {
374 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
375 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
376 message: item.message.clone(),
377 source: item
378 .source
379 .as_ref()
380 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
381 })),
382 TaskError::LocalTaskContext(local_task_context) => {
383 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
384 name: local_task_context.name.clone(),
385 source: local_task_context
386 .source
387 .as_ref()
388 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
389 }))
390 }
391 TaskError::TaskChain(chain) => {
392 let task_id = chain.last().unwrap();
393 let error = {
394 let task = ctx.task(*task_id, TaskDataCategory::Meta);
395 if let Some(OutputValue::Error(error)) = task.get_output() {
396 Some(error.clone())
397 } else {
398 None
399 }
400 };
401 let error = error.map_or_else(
402 || {
403 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
405 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
406 "Error no longer available",
407 )),
408 location: None,
409 }))
410 },
411 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
412 );
413 let mut current_error = error;
414 for &task_id in chain.iter().rev() {
415 current_error =
416 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
417 task_id,
418 source: Some(current_error),
419 turbo_tasks: ctx.turbo_tasks(),
420 }));
421 }
422 current_error
423 }
424 }
425 }
426}
427
428struct TaskExecutionCompletePrepareResult {
430 pub new_children: FxHashSet<TaskId>,
431 pub is_now_immutable: bool,
432 #[cfg(feature = "verify_determinism")]
433 pub no_output_set: bool,
434 #[cfg(feature = "task_dirty_cause")]
435 pub function_id: Option<FunctionId>,
436 pub new_output: Option<OutputValue>,
437 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
438 pub is_recomputation: bool,
439 pub is_session_dependent: bool,
440}
441
442impl TurboTasksBackend {
444 fn try_read_task_output(
445 &self,
446 task_id: TaskId,
447 reader: Option<TaskId>,
448 options: ReadOutputOptions,
449 turbo_tasks: &TurboTasks<TurboTasksBackend>,
450 ) -> Result<Result<RawVc, EventListener>> {
451 self.assert_not_persistent_calling_transient(reader, task_id, None);
452
453 let mut ctx = self.execute_context(turbo_tasks);
454 let need_reader_task = if self.should_track_dependencies()
455 && !matches!(options.tracking, ReadTracking::Untracked)
456 && let Some(reader_id) = reader
457 && reader_id != task_id
458 {
459 Some(reader_id)
460 } else {
461 None
462 };
463 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
464 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
468 (task, Some(reader))
469 } else {
470 (ctx.task(task_id, TaskDataCategory::All), None)
471 };
472
473 fn listen_to_done_event(
474 reader_description: Option<EventDescription>,
475 tracking: ReadTracking,
476 done_event: &Event,
477 ) -> EventListener {
478 done_event.listen_with_note(move || {
479 move || {
480 if let Some(reader_description) = reader_description.as_ref() {
481 format!(
482 "try_read_task_output from {} ({})",
483 reader_description, tracking
484 )
485 } else {
486 format!("try_read_task_output ({})", tracking)
487 }
488 }
489 })
490 }
491
492 fn check_in_progress(
493 task: &impl TaskGuard,
494 reader_description: Option<EventDescription>,
495 tracking: ReadTracking,
496 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
497 {
498 match task.get_in_progress() {
499 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
500 listen_to_done_event(reader_description, tracking, done_event),
501 ))),
502 Some(InProgressState::InProgress(box InProgressStateInner {
503 done_event, ..
504 })) => Some(Ok(Err(listen_to_done_event(
505 reader_description,
506 tracking,
507 done_event,
508 )))),
509 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
510 "{} was canceled",
511 task.get_task_description()
512 ))),
513 None => None,
514 }
515 }
516
517 if matches!(options.consistency, ReadConsistency::Strong) {
518 if task
519 .get_persistent_task_type()
520 .is_some_and(|t| !t.native_fn.is_root)
521 {
522 drop(task);
523 drop(reader_task);
524 panic!(
525 "Strongly consistent read of non-root task {} (reader: {}). The `root` \
526 attribute is missing on the task.",
527 self.debug_get_task_description(task_id),
528 reader.map_or_else(
529 || "unknown".to_string(),
530 |r| self.debug_get_task_description(r)
531 )
532 );
533 }
534
535 let is_dirty = task.is_dirty();
536
537 let has_dirty_containers = task.has_dirty_containers();
539 if has_dirty_containers || is_dirty.is_some() {
540 let activeness = task.get_activeness_mut();
541 let mut task_ids_to_schedule: Vec<_> = Vec::new();
542 let activeness = if let Some(activeness) = activeness {
544 activeness.set_active_until_clean();
548 activeness
549 } else {
550 if ctx.should_track_activeness() {
554 task_ids_to_schedule = task.dirty_containers().collect();
556 task_ids_to_schedule.push(task_id);
557 }
558 let activeness =
559 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
560 activeness.set_active_until_clean();
561 activeness
562 };
563 let listener = activeness.all_clean_event.listen_with_note(move || {
564 let tt = turbo_tasks.pin();
567 move || {
568 let mut ctx = tt.backend().execute_context(&tt);
569 let mut visited = FxHashSet::default();
570 fn indent(s: &str) -> String {
571 s.split_inclusive('\n')
572 .flat_map(|line: &str| [" ", line].into_iter())
573 .collect::<String>()
574 }
575 fn get_info(
576 ctx: &mut impl ExecuteContext<'_>,
577 task_id: TaskId,
578 parent_and_count: Option<(TaskId, i32)>,
579 visited: &mut FxHashSet<TaskId>,
580 ) -> String {
581 let task = ctx.task(task_id, TaskDataCategory::All);
582 let is_dirty = task.is_dirty();
583 let in_progress =
584 task.get_in_progress()
585 .map_or("not in progress", |p| match p {
586 InProgressState::InProgress(_) => "in progress",
587 InProgressState::Scheduled { .. } => "scheduled",
588 InProgressState::Canceled => "canceled",
589 });
590 let activeness = task.get_activeness().map_or_else(
591 || "not active".to_string(),
592 |activeness| format!("{activeness:?}"),
593 );
594 let aggregation_number = get_aggregation_number(&task);
595 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
596 {
597 let uppers = get_uppers(&task);
598 !uppers.contains(&parent_task_id)
599 } else {
600 false
601 };
602
603 let has_dirty_containers = task.has_dirty_containers();
605
606 let task_description = task.get_task_description();
607 let is_dirty_label = if let Some(parent_priority) = is_dirty {
608 format!(", dirty({parent_priority})")
609 } else {
610 String::new()
611 };
612 let has_dirty_containers_label = if has_dirty_containers {
613 ", dirty containers"
614 } else {
615 ""
616 };
617 let count = if let Some((_, count)) = parent_and_count {
618 format!(" {count}")
619 } else {
620 String::new()
621 };
622 let mut info = format!(
623 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
624 {in_progress}, \
625 {activeness}{is_dirty_label}{has_dirty_containers_label})",
626 );
627 let children: Vec<_> = task.dirty_containers_with_count().collect();
628 drop(task);
629
630 if missing_upper {
631 info.push_str("\n ERROR: missing upper connection");
632 }
633
634 if has_dirty_containers || !children.is_empty() {
635 writeln!(info, "\n dirty tasks:").unwrap();
636
637 for (child_task_id, count) in children {
638 let task_description = ctx
639 .task(child_task_id, TaskDataCategory::Data)
640 .get_task_description();
641 if visited.insert(child_task_id) {
642 let child_info = get_info(
643 ctx,
644 child_task_id,
645 Some((task_id, count)),
646 visited,
647 );
648 info.push_str(&indent(&child_info));
649 if !info.ends_with('\n') {
650 info.push('\n');
651 }
652 } else {
653 writeln!(
654 info,
655 " {child_task_id} {task_description} {count} \
656 (already visited)"
657 )
658 .unwrap();
659 }
660 }
661 }
662 info
663 }
664 let info = get_info(&mut ctx, task_id, None, &mut visited);
665 format!(
666 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
667 )
668 }
669 });
670 drop(reader_task);
671 drop(task);
672 if !task_ids_to_schedule.is_empty() {
673 let mut queue = AggregationUpdateQueue::new();
674 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
675 queue.execute(&mut ctx);
676 }
677
678 return Ok(Err(listener));
679 }
680 }
681
682 let reader_description = reader_task
683 .as_ref()
684 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
685 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
686 {
687 return value;
688 }
689
690 if let Some(output) = task.get_output() {
691 let result = match output {
692 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
693 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
694 OutputValue::Error(error) => Err(error.clone()),
695 };
696 if let Some(mut reader_task) = reader_task.take()
697 && options.tracking.should_track(result.is_err())
698 && (!task.immutable() || cfg!(feature = "verify_immutable"))
699 {
700 #[cfg(feature = "trace_task_output_dependencies")]
701 let _span = tracing::trace_span!(
702 "add output dependency",
703 task = %task_id,
704 dependent_task = ?reader
705 )
706 .entered();
707 let mut queue = LeafDistanceUpdateQueue::new();
708 let reader = reader.unwrap();
709 if task.add_output_dependent(reader) {
710 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
712 let reader_leaf_distance =
713 reader_task.get_leaf_distance().copied().unwrap_or_default();
714 if reader_leaf_distance.distance <= leaf_distance.distance {
715 queue.push(
716 reader,
717 leaf_distance.distance,
718 leaf_distance.max_distance_in_buffer,
719 );
720 }
721 }
722
723 drop(task);
724
725 if !reader_task.remove_outdated_output_dependencies(&task_id) {
731 let _ = reader_task.add_output_dependencies(task_id);
732 }
733 drop(reader_task);
734
735 queue.execute(&mut ctx);
736 } else {
737 drop(task);
738 }
739
740 return result.map_err(|error| {
741 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
742 .with_task_context(task_id, turbo_tasks.pin())
743 .into()
744 });
745 }
746 drop(reader_task);
747
748 let note = EventDescription::new(|| {
749 move || {
750 if let Some(reader) = reader_description.as_ref() {
751 format!("try_read_task_output (recompute) from {reader}",)
752 } else {
753 "try_read_task_output (recompute, untracked)".to_string()
754 }
755 }
756 });
757
758 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
760 TaskExecutionReason::OutputNotAvailable,
761 EventDescription::new(|| task.get_task_desc_fn()),
762 note,
763 );
764
765 let old = task.set_in_progress(in_progress_state);
768 debug_assert!(old.is_none(), "InProgress already exists");
769 ctx.schedule_task(task, TaskPriority::Recomputation);
770
771 Ok(Err(listener))
772 }
773
774 fn try_read_task_cell(
775 &self,
776 task_id: TaskId,
777 reader: Option<TaskId>,
778 cell: CellId,
779 options: ReadCellOptions,
780 turbo_tasks: &TurboTasks<TurboTasksBackend>,
781 ) -> Result<Result<TypedCellContent, EventListener>> {
782 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
783
784 fn add_cell_dependency(
785 task_id: TaskId,
786 mut task: impl TaskGuard,
787 reader: Option<TaskId>,
788 reader_task: Option<impl TaskGuard>,
789 cell: CellId,
790 key: Option<u64>,
791 ) {
792 if let Some(mut reader_task) = reader_task
793 && (!task.immutable() || cfg!(feature = "verify_immutable"))
794 {
795 let reader = reader.unwrap();
796 let reverse = CellRef { task: reader, cell };
797 if let Some(k) = key {
798 let _ = task.add_cell_dependents_hashed((reverse, k));
799 } else {
800 let _ = task.add_cell_dependents(reverse);
801 }
802 drop(task);
803
804 let target = CellRef {
810 task: task_id,
811 cell,
812 };
813 if let Some(k) = key {
814 if !reader_task.remove_outdated_cell_dependencies_hashed(&(target, k)) {
815 let _ = reader_task.add_cell_dependencies_hashed((target, k));
816 }
817 } else if !reader_task.remove_outdated_cell_dependencies(&target) {
818 let _ = reader_task.add_cell_dependencies(target);
819 }
820 drop(reader_task);
821 }
822 }
823
824 let ReadCellOptions {
825 tracking,
826 final_read_hint,
827 } = options;
828
829 let mut ctx = self.execute_context(turbo_tasks);
830 let (mut task, reader_task) = if self.should_track_dependencies()
831 && !matches!(tracking, ReadCellTracking::Untracked)
832 && let Some(reader_id) = reader
833 && reader_id != task_id
834 {
835 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
839 (task, Some(reader))
840 } else {
841 (ctx.task(task_id, TaskDataCategory::All), None)
842 };
843
844 let content = if final_read_hint {
845 task.remove_cell_data(&cell)
846 } else {
847 task.get_cell_data(&cell).cloned()
848 };
849 if let Some(content) = content {
850 if tracking.should_track(false) {
851 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
852 }
853 return Ok(Ok(TypedCellContent(
854 cell.type_id,
855 CellContent(Some(content)),
856 )));
857 }
858
859 let in_progress = task.get_in_progress();
860 if matches!(
861 in_progress,
862 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
863 ) {
864 return Ok(Err(self
865 .listen_to_cell(&mut task, task_id, &reader_task, cell)
866 .0));
867 }
868 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
869
870 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
872 let Some(max_id) = max_id else {
873 let task_desc = task.get_task_description();
874 if tracking.should_track(true) {
875 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
876 }
877 bail!(
878 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
879 );
880 };
881 if cell.index >= max_id {
882 let task_desc = task.get_task_description();
883 if tracking.should_track(true) {
884 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
885 }
886 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
887 }
888
889 if is_cancelled {
895 bail!("{} was canceled", task.get_task_description());
896 }
897
898 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
900 drop(reader_task);
901 if !new_listener {
902 return Ok(Err(listener));
903 }
904
905 let _span = tracing::trace_span!(
906 "recomputation",
907 cell_type = get_value_type(cell.type_id).ty.global_name,
908 cell_index = cell.index
909 )
910 .entered();
911
912 let _ = task.add_scheduled(
913 TaskExecutionReason::CellNotAvailable,
914 EventDescription::new(|| task.get_task_desc_fn()),
915 );
916 ctx.schedule_task(task, TaskPriority::Recomputation);
917
918 Ok(Err(listener))
919 }
920
921 fn listen_to_cell(
922 &self,
923 task: &mut impl TaskGuard,
924 task_id: TaskId,
925 reader_task: &Option<impl TaskGuard>,
926 cell: CellId,
927 ) -> (EventListener, bool) {
928 let note = || {
929 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
930 move || {
931 if let Some(reader_desc) = reader_desc.as_ref() {
932 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
933 } else {
934 "try_read_task_cell (in progress, untracked)".to_string()
935 }
936 }
937 };
938 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
939 let listener = in_progress.event.listen_with_note(note);
941 return (listener, false);
942 }
943 let in_progress = InProgressCellState::new(task_id, cell);
944 let listener = in_progress.event.listen_with_note(note);
945 let old = task.insert_in_progress_cells(cell, in_progress);
946 debug_assert!(old.is_none(), "InProgressCell already exists");
947 (listener, true)
948 }
949
950 fn snapshot_and_persist(
951 &self,
952 parent_span: Option<tracing::Id>,
953 reason: SnapshotReason,
954 turbo_tasks: &TurboTasks<TurboTasksBackend>,
955 ) -> Result<(Instant, bool), anyhow::Error> {
956 let snapshot_span =
957 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason.as_str())
958 .entered();
959 let _snapshot_in_progress = self.snapshot_in_progress.lock();
963 let start = Instant::now();
964 let wall_start = SystemTime::now();
968 debug_assert!(self.should_persist());
969
970 let mut snapshot_phase = {
971 let _span = tracing::info_span!("blocking").entered();
972 self.snapshot_coord.begin_snapshot()
973 };
974 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
977
978 let suspended_operations = snapshot_phase.take_suspended_operations();
979
980 let snapshot_time = Instant::now();
981 drop(snapshot_phase);
982
983 if !has_modifications {
984 drop(snapshot_guard);
987 return Ok((start, false));
988 }
989
990 #[cfg(feature = "print_cache_item_size")]
991 #[derive(Default)]
992 struct TaskCacheStats {
993 data: usize,
994 #[cfg(feature = "print_cache_item_size_with_compressed")]
995 data_compressed: usize,
996 data_count: usize,
997 meta: usize,
998 #[cfg(feature = "print_cache_item_size_with_compressed")]
999 meta_compressed: usize,
1000 meta_count: usize,
1001 upper_count: usize,
1002 collectibles_count: usize,
1003 aggregated_collectibles_count: usize,
1004 children_count: usize,
1005 followers_count: usize,
1006 collectibles_dependents_count: usize,
1007 aggregated_dirty_containers_count: usize,
1008 output_size: usize,
1009 }
1010 #[cfg(feature = "print_cache_item_size")]
1013 struct FormatSizes {
1014 size: usize,
1015 #[cfg(feature = "print_cache_item_size_with_compressed")]
1016 compressed_size: usize,
1017 }
1018 #[cfg(feature = "print_cache_item_size")]
1019 impl std::fmt::Display for FormatSizes {
1020 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1021 use turbo_tasks::util::FormatBytes;
1022 #[cfg(feature = "print_cache_item_size_with_compressed")]
1023 {
1024 write!(
1025 f,
1026 "{} ({} compressed)",
1027 FormatBytes(self.size),
1028 FormatBytes(self.compressed_size)
1029 )
1030 }
1031 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1032 {
1033 write!(f, "{}", FormatBytes(self.size))
1034 }
1035 }
1036 }
1037 #[cfg(feature = "print_cache_item_size")]
1038 impl TaskCacheStats {
1039 #[cfg(feature = "print_cache_item_size_with_compressed")]
1040 fn compressed_size(data: &[u8]) -> Result<usize> {
1041 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1042 data,
1043 &mut Vec::new(),
1044 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1045 )?)
1046 }
1047
1048 fn add_data(&mut self, data: &[u8]) {
1049 self.data += data.len();
1050 #[cfg(feature = "print_cache_item_size_with_compressed")]
1051 {
1052 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1053 }
1054 self.data_count += 1;
1055 }
1056
1057 fn add_meta(&mut self, data: &[u8]) {
1058 self.meta += data.len();
1059 #[cfg(feature = "print_cache_item_size_with_compressed")]
1060 {
1061 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1062 }
1063 self.meta_count += 1;
1064 }
1065
1066 fn add_counts(&mut self, storage: &TaskStorage) {
1067 let counts = storage.meta_counts();
1068 self.upper_count += counts.upper;
1069 self.collectibles_count += counts.collectibles;
1070 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1071 self.children_count += counts.children;
1072 self.followers_count += counts.followers;
1073 self.collectibles_dependents_count += counts.collectibles_dependents;
1074 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1075 if let Some(output) = storage.get_output() {
1076 use turbo_bincode::turbo_bincode_encode;
1077
1078 self.output_size += turbo_bincode_encode(&output)
1079 .map(|data| data.len())
1080 .unwrap_or(0);
1081 }
1082 }
1083
1084 fn task_name(storage: &TaskStorage) -> String {
1086 storage
1087 .get_persistent_task_type()
1088 .map(|t| t.to_string())
1089 .unwrap_or_else(|| "<unknown>".to_string())
1090 }
1091
1092 fn sort_key(&self) -> usize {
1095 #[cfg(feature = "print_cache_item_size_with_compressed")]
1096 {
1097 self.data_compressed + self.meta_compressed
1098 }
1099 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1100 {
1101 self.data + self.meta
1102 }
1103 }
1104
1105 fn format_total(&self) -> FormatSizes {
1106 FormatSizes {
1107 size: self.data + self.meta,
1108 #[cfg(feature = "print_cache_item_size_with_compressed")]
1109 compressed_size: self.data_compressed + self.meta_compressed,
1110 }
1111 }
1112
1113 fn format_data(&self) -> FormatSizes {
1114 FormatSizes {
1115 size: self.data,
1116 #[cfg(feature = "print_cache_item_size_with_compressed")]
1117 compressed_size: self.data_compressed,
1118 }
1119 }
1120
1121 fn format_avg_data(&self) -> FormatSizes {
1122 FormatSizes {
1123 size: self.data.checked_div(self.data_count).unwrap_or(0),
1124 #[cfg(feature = "print_cache_item_size_with_compressed")]
1125 compressed_size: self
1126 .data_compressed
1127 .checked_div(self.data_count)
1128 .unwrap_or(0),
1129 }
1130 }
1131
1132 fn format_meta(&self) -> FormatSizes {
1133 FormatSizes {
1134 size: self.meta,
1135 #[cfg(feature = "print_cache_item_size_with_compressed")]
1136 compressed_size: self.meta_compressed,
1137 }
1138 }
1139
1140 fn format_avg_meta(&self) -> FormatSizes {
1141 FormatSizes {
1142 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1143 #[cfg(feature = "print_cache_item_size_with_compressed")]
1144 compressed_size: self
1145 .meta_compressed
1146 .checked_div(self.meta_count)
1147 .unwrap_or(0),
1148 }
1149 }
1150 }
1151 #[cfg(feature = "print_cache_item_size")]
1152 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1153 Mutex::new(FxHashMap::default());
1154
1155 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1162 let encode_category = |task_id: TaskId,
1163 data: &TaskStorage,
1164 category: SpecificTaskDataCategory,
1165 buffer: &mut TurboBincodeBuffer|
1166 -> Option<TurboBincodeBuffer> {
1167 match encode_task_data(task_id, data, category, buffer) {
1168 Ok(encoded) => {
1169 #[cfg(feature = "print_cache_item_size")]
1170 {
1171 let mut stats = task_cache_stats.lock();
1172 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1173 match category {
1174 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1175 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1176 }
1177 }
1178 Some(encoded)
1179 }
1180 Err(err) => {
1181 panic!(
1182 "Serializing task {} failed ({:?}): {:?}",
1183 self.debug_get_task_description(task_id),
1184 category,
1185 err
1186 );
1187 }
1188 }
1189 };
1190 if task_id.is_transient() {
1191 unreachable!("transient task_ids should never be enqueued to be persisted");
1192 }
1193
1194 let encode_meta = inner.flags.meta_modified();
1195 let encode_data = inner.flags.data_modified();
1196
1197 #[cfg(feature = "print_cache_item_size")]
1198 if encode_data || encode_meta {
1199 task_cache_stats
1200 .lock()
1201 .entry(TaskCacheStats::task_name(inner))
1202 .or_default()
1203 .add_counts(inner);
1204 }
1205
1206 let meta = if encode_meta {
1207 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1208 } else {
1209 None
1210 };
1211
1212 let data = if encode_data {
1213 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1214 } else {
1215 None
1216 };
1217 let task_type_hash = if inner.flags.new_task() {
1218 let task_type = inner.get_persistent_task_type().expect(
1219 "It is not possible for a new_task to not have a persistent_task_type. Task \
1220 creation for persistent tasks uses a single ExecutionContextImpl for \
1221 creating the task (which sets new_task) and connect_child (which sets \
1222 persistent_task_type) and take_snapshot waits for all operations to complete \
1223 or suspend before we start snapshotting. So task creation will always set \
1224 the task_type.",
1225 );
1226 Some(compute_task_type_hash(task_type))
1227 } else {
1228 None
1229 };
1230
1231 SnapshotItem {
1232 task_id,
1233 meta,
1234 data,
1235 task_type_hash,
1236 }
1237 };
1238
1239 let task_snapshots =
1240 self.storage
1241 .take_snapshot(snapshot_guard, &process, reason.drain_entries());
1242
1243 drop(snapshot_span);
1244 let snapshot_duration = start.elapsed();
1245 let task_count = task_snapshots.len();
1246
1247 if task_snapshots.is_empty() {
1248 std::hint::cold_path();
1251 return Ok((snapshot_time, false));
1252 }
1253
1254 let persist_start = Instant::now();
1255 let span = tracing::info_span!(
1256 parent: parent_span,
1257 "persist",
1258 reason = reason.as_str(),
1259 data_items= tracing::field::Empty,
1260 meta_items= tracing::field::Empty,
1261 task_cache_items= tracing::field::Empty,
1262 next_task_id= tracing::field::Empty,)
1263 .entered();
1264 {
1265 let SnapshotMeta {
1269 task_cache_items,
1270 data_items,
1271 meta_items,
1272 max_next_task_id,
1273 } = self
1274 .backing_storage
1275 .save_snapshot(suspended_operations, task_snapshots)?;
1276 span.record("data_items", data_items);
1277 span.record("meta_items", meta_items);
1278 span.record("task_cache_items", task_cache_items);
1279 span.record("next_task_id", max_next_task_id);
1280
1281 #[cfg(feature = "print_cache_item_size")]
1282 {
1283 let mut task_cache_stats = task_cache_stats
1284 .into_inner()
1285 .into_iter()
1286 .collect::<Vec<_>>();
1287 if !task_cache_stats.is_empty() {
1288 use turbo_tasks::util::FormatBytes;
1289
1290 use crate::utils::markdown_table::print_markdown_table;
1291
1292 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1293 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1294 });
1295
1296 println!(
1297 "Task cache stats: {}",
1298 FormatSizes {
1299 size: task_cache_stats
1300 .iter()
1301 .map(|(_, s)| s.data + s.meta)
1302 .sum::<usize>(),
1303 #[cfg(feature = "print_cache_item_size_with_compressed")]
1304 compressed_size: task_cache_stats
1305 .iter()
1306 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1307 .sum::<usize>()
1308 },
1309 );
1310
1311 print_markdown_table(
1312 [
1313 "Task",
1314 " Total Size",
1315 " Data Size",
1316 " Data Count x Avg",
1317 " Data Count x Avg",
1318 " Meta Size",
1319 " Meta Count x Avg",
1320 " Meta Count x Avg",
1321 " Uppers",
1322 " Coll",
1323 " Agg Coll",
1324 " Children",
1325 " Followers",
1326 " Coll Deps",
1327 " Agg Dirty",
1328 " Output Size",
1329 ],
1330 task_cache_stats.iter(),
1331 |(task_desc, stats)| {
1332 [
1333 task_desc.to_string(),
1334 format!(" {}", stats.format_total()),
1335 format!(" {}", stats.format_data()),
1336 format!(" {} x", stats.data_count),
1337 format!("{}", stats.format_avg_data()),
1338 format!(" {}", stats.format_meta()),
1339 format!(" {} x", stats.meta_count),
1340 format!("{}", stats.format_avg_meta()),
1341 format!(" {}", stats.upper_count),
1342 format!(" {}", stats.collectibles_count),
1343 format!(" {}", stats.aggregated_collectibles_count),
1344 format!(" {}", stats.children_count),
1345 format!(" {}", stats.followers_count),
1346 format!(" {}", stats.collectibles_dependents_count),
1347 format!(" {}", stats.aggregated_dirty_containers_count),
1348 format!(" {}", FormatBytes(stats.output_size)),
1349 ]
1350 },
1351 );
1352 }
1353 }
1354 }
1355
1356 let elapsed = start.elapsed();
1357 let persist_duration = persist_start.elapsed();
1358 if elapsed > Duration::from_secs(10) {
1360 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1361 "Finished writing to filesystem cache".to_string(),
1362 elapsed,
1363 )));
1364 }
1365
1366 let wall_start_ms = wall_start
1367 .duration_since(SystemTime::UNIX_EPOCH)
1368 .unwrap_or_default()
1369 .as_secs_f64()
1371 * 1000.0;
1372 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1373 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1374 "turbopack-persistence",
1375 wall_start_ms,
1376 wall_end_ms,
1377 vec![
1378 ("reason", serde_json::Value::from(reason.as_str())),
1379 (
1380 "snapshot_duration_ms",
1381 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1382 ),
1383 (
1384 "persist_duration_ms",
1385 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1386 ),
1387 ("task_count", serde_json::Value::from(task_count)),
1388 ],
1389 )));
1390
1391 Ok((snapshot_time, true))
1392 }
1393
1394 fn startup(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1395 if self.should_restore() {
1396 let uncompleted_operations = self
1400 .backing_storage
1401 .uncompleted_operations()
1402 .expect("Failed to get uncompleted operations");
1403 if !uncompleted_operations.is_empty() {
1404 let mut ctx = self.execute_context(turbo_tasks);
1405 for op in uncompleted_operations {
1406 op.execute(&mut ctx);
1407 }
1408 }
1409 }
1410
1411 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1414 let _span = trace_span!("persisting background job").entered();
1416 let _span = tracing::info_span!("thread").entered();
1417 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1418 }
1419 }
1420
1421 fn stopping(&self) {
1422 self.stopping.store(true, Ordering::Release);
1423 self.stopping_event.notify(usize::MAX);
1424 }
1425
1426 #[allow(unused_variables)]
1427 fn stop(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1428 #[cfg(feature = "verify_aggregation_graph")]
1429 {
1430 self.is_idle.store(false, Ordering::Release);
1431 self.verify_aggregation_graph(turbo_tasks, false);
1432 }
1433 self.storage.drop_task_cache();
1435 if self.should_persist() {
1436 if let Err(err) =
1440 self.snapshot_and_persist(Span::current().into(), SnapshotReason::Stop, turbo_tasks)
1441 {
1442 eprintln!("Persisting failed during shutdown: {err:?}");
1443 }
1444 }
1445 self.storage.drop_contents();
1446 if let Err(err) = self.backing_storage.shutdown() {
1447 println!("Shutting down failed: {err}");
1448 }
1449 }
1450
1451 #[allow(unused_variables)]
1452 fn idle_start(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1453 self.idle_start_event.notify(usize::MAX);
1454
1455 #[cfg(feature = "verify_aggregation_graph")]
1456 {
1457 use tokio::select;
1458
1459 self.is_idle.store(true, Ordering::Release);
1460 let turbo_tasks = turbo_tasks.pin();
1464 tokio::task::spawn(async move {
1465 let backend = &turbo_tasks.backend();
1466 select! {
1467 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1468 }
1470 _ = backend.idle_end_event.listen() => {
1471 return;
1472 }
1473 }
1474 if !backend.is_idle.load(Ordering::Relaxed) {
1475 return;
1476 }
1477 backend.verify_aggregation_graph(&turbo_tasks, true);
1478 });
1479 }
1480 }
1481
1482 fn idle_end(&self) {
1483 #[cfg(feature = "verify_aggregation_graph")]
1484 self.is_idle.store(false, Ordering::Release);
1485 self.idle_end_event.notify(usize::MAX);
1486 }
1487
1488 fn get_or_create_task(
1489 &self,
1490 native_fn: &'static NativeFunction,
1491 this: Option<RawVc>,
1492 arg: &mut dyn DynTaskInputsStorage,
1493 parent_task: Option<TaskId>,
1494 persistence: TaskPersistence,
1495 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1496 ) -> TaskId {
1497 let transient = matches!(persistence, TaskPersistence::Transient);
1498
1499 if transient
1500 && let Some(parent_task) = parent_task
1501 && !parent_task.is_transient()
1502 {
1503 let task_type = CachedTaskType {
1504 native_fn,
1505 this,
1506 arg: arg.take_box(),
1507 };
1508 self.panic_persistent_calling_transient(
1509 self.debug_get_task_description(parent_task),
1510 Some(&task_type),
1511 None,
1512 );
1513 }
1514
1515 let is_root = native_fn.is_root;
1516
1517 let arg_ref = arg.as_ref();
1519 let hash = CachedTaskType::hash_from_components(
1520 self.storage.task_cache.hasher(),
1521 native_fn,
1522 this,
1523 arg_ref,
1524 );
1525 let shard = get_shard(&self.storage.task_cache, hash);
1529
1530 let mut ctx = self.execute_context(turbo_tasks);
1531 if let Some(task_id) =
1535 raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1536 {
1537 self.track_cache_hit_by_fn(native_fn);
1538 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1539 return task_id;
1540 }
1541
1542 let task_id = if !transient
1547 && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1548 {
1549 self.track_cache_hit_by_fn(native_fn);
1550 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1553 k.eq_components(native_fn, this, arg_ref)
1554 }) {
1555 RawEntry::Occupied(_) => {}
1556 RawEntry::Vacant(e) => {
1557 e.insert(stored_type, task_id);
1558 }
1559 };
1560 task_id
1561 } else {
1562 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1563 k.eq_components(native_fn, this, arg_ref)
1564 }) {
1565 RawEntry::Occupied(e) => {
1566 let task_id = *e.get();
1569 drop(e);
1570 self.track_cache_hit_by_fn(native_fn);
1571 task_id
1572 }
1573 RawEntry::Vacant(e) => {
1574 let task_type = CachedTaskTypeArc::new(CachedTaskType {
1578 native_fn,
1579 this,
1580 arg: arg.take_box(),
1581 });
1582 let task_id = if transient {
1583 self.transient_task_id_factory.get()
1584 } else {
1585 self.persisted_task_id_factory.get()
1586 };
1587 self.storage
1591 .initialize_new_task(task_id, Some(task_type.clone()));
1592 e.insert(task_type, task_id);
1594 self.track_cache_miss_by_fn(native_fn);
1595 if is_root {
1599 AggregationUpdateQueue::run(
1600 AggregationUpdateJob::UpdateAggregationNumber {
1601 task_id,
1602 base_aggregation_number: u32::MAX,
1603 distance: None,
1604 },
1605 &mut ctx,
1606 );
1607 } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1608 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1609 AggregationUpdateQueue::run(
1610 AggregationUpdateJob::UpdateAggregationNumber {
1611 task_id,
1612 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1613 distance: None,
1614 },
1615 &mut ctx,
1616 );
1617 };
1618
1619 task_id
1620 }
1621 }
1622 };
1623
1624 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1625
1626 task_id
1627 }
1628
1629 fn debug_trace_transient_task(
1632 &self,
1633 task_type: &CachedTaskType,
1634 cell_id: Option<CellId>,
1635 ) -> DebugTraceTransientTask {
1636 fn inner_id(
1639 backend: &TurboTasksBackend,
1640 task_id: TaskId,
1641 cell_type_id: Option<ValueTypeId>,
1642 visited_set: &mut FxHashSet<TaskId>,
1643 ) -> DebugTraceTransientTask {
1644 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1645 if visited_set.contains(&task_id) {
1646 let task_name = task_type.get_name();
1647 DebugTraceTransientTask::Collapsed {
1648 task_name,
1649 cell_type_id,
1650 }
1651 } else {
1652 inner_cached(backend, &task_type, cell_type_id, visited_set)
1653 }
1654 } else {
1655 DebugTraceTransientTask::Uncached { cell_type_id }
1656 }
1657 }
1658 fn inner_cached(
1659 backend: &TurboTasksBackend,
1660 task_type: &CachedTaskType,
1661 cell_type_id: Option<ValueTypeId>,
1662 visited_set: &mut FxHashSet<TaskId>,
1663 ) -> DebugTraceTransientTask {
1664 let task_name = task_type.get_name();
1665
1666 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1667 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1668 return None;
1672 };
1673 if task_id.is_transient() {
1674 Some(Box::new(inner_id(
1675 backend,
1676 task_id,
1677 cause_self_raw_vc.try_get_type_id(),
1678 visited_set,
1679 )))
1680 } else {
1681 None
1682 }
1683 });
1684 let cause_args = task_type
1685 .arg
1686 .get_raw_vcs()
1687 .into_iter()
1688 .filter_map(|raw_vc| {
1689 let Some(task_id) = raw_vc.try_get_task_id() else {
1690 return None;
1692 };
1693 if !task_id.is_transient() {
1694 return None;
1695 }
1696 Some((task_id, raw_vc.try_get_type_id()))
1697 })
1698 .collect::<IndexSet<_>>() .into_iter()
1700 .map(|(task_id, cell_type_id)| {
1701 inner_id(backend, task_id, cell_type_id, visited_set)
1702 })
1703 .collect();
1704
1705 DebugTraceTransientTask::Cached {
1706 task_name,
1707 cell_type_id,
1708 cause_self,
1709 cause_args,
1710 }
1711 }
1712 inner_cached(
1713 self,
1714 task_type,
1715 cell_id.map(|c| c.type_id),
1716 &mut FxHashSet::default(),
1717 )
1718 }
1719
1720 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1721 if !self.should_track_dependencies() {
1722 panic!("Dependency tracking is disabled so invalidation is not allowed");
1723 }
1724 operation::InvalidateOperation::run(
1725 smallvec![task_id],
1726 #[cfg(feature = "task_dirty_cause")]
1727 TaskDirtyCause::Invalidator,
1728 self.execute_context(turbo_tasks),
1729 );
1730 }
1731
1732 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<TurboTasksBackend>) {
1733 if !self.should_track_dependencies() {
1734 panic!("Dependency tracking is disabled so invalidation is not allowed");
1735 }
1736 operation::InvalidateOperation::run(
1737 tasks.iter().copied().collect(),
1738 #[cfg(feature = "task_dirty_cause")]
1739 TaskDirtyCause::Unknown,
1740 self.execute_context(turbo_tasks),
1741 );
1742 }
1743
1744 fn invalidate_tasks_set(
1745 &self,
1746 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1747 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1748 ) {
1749 if !self.should_track_dependencies() {
1750 panic!("Dependency tracking is disabled so invalidation is not allowed");
1751 }
1752 operation::InvalidateOperation::run(
1753 tasks.iter().copied().collect(),
1754 #[cfg(feature = "task_dirty_cause")]
1755 TaskDirtyCause::Unknown,
1756 self.execute_context(turbo_tasks),
1757 );
1758 }
1759
1760 fn invalidate_serialization(
1761 &self,
1762 task_id: TaskId,
1763 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1764 ) {
1765 if task_id.is_transient() {
1766 return;
1767 }
1768 let mut ctx = self.execute_context(turbo_tasks);
1769 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1770 task.invalidate_serialization();
1771 }
1772
1773 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1774 let task = self.storage.access_mut(task_id);
1775 if let Some(value) = task.get_persistent_task_type() {
1776 format!("{task_id:?} {}", value)
1777 } else if let Some(value) = task.get_transient_task_type() {
1778 format!("{task_id:?} {}", value)
1779 } else {
1780 format!("{task_id:?} unknown")
1781 }
1782 }
1783
1784 fn get_task_name(
1785 &self,
1786 task_id: TaskId,
1787 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1788 ) -> String {
1789 let mut ctx = self.execute_context(turbo_tasks);
1790 let task = ctx.task(task_id, TaskDataCategory::Data);
1791 if let Some(value) = task.get_persistent_task_type() {
1792 value.to_string()
1793 } else if let Some(value) = task.get_transient_task_type() {
1794 value.to_string()
1795 } else {
1796 "unknown".to_string()
1797 }
1798 }
1799
1800 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1801 let task = self.storage.access_mut(task_id);
1802 task.get_persistent_task_type().cloned()
1803 }
1804
1805 fn task_execution_canceled(
1806 &self,
1807 task_id: TaskId,
1808 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1809 ) {
1810 let mut ctx = self.execute_context(turbo_tasks);
1811 let mut task = ctx.task(task_id, TaskDataCategory::All);
1812 if let Some(in_progress) = task.take_in_progress() {
1813 match in_progress {
1814 InProgressState::Scheduled {
1815 done_event,
1816 reason: _,
1817 } => done_event.notify(usize::MAX),
1818 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1819 done_event.notify(usize::MAX)
1820 }
1821 InProgressState::Canceled => {}
1822 }
1823 }
1824 let in_progress_cells = task.take_in_progress_cells();
1827 if let Some(ref cells) = in_progress_cells {
1828 for state in cells.values() {
1829 state.event.notify(usize::MAX);
1830 }
1831 }
1832
1833 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1839 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1840 } else {
1841 None
1842 };
1843
1844 let old = task.set_in_progress(InProgressState::Canceled);
1845 debug_assert!(old.is_none(), "InProgress already exists");
1846 drop(task);
1847
1848 if let Some(data_update) = data_update {
1849 AggregationUpdateQueue::run(data_update, &mut ctx);
1850 }
1851
1852 drop(in_progress_cells);
1853 }
1854
1855 fn try_start_task_execution(
1856 &self,
1857 task_id: TaskId,
1858 priority: TaskPriority,
1859 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1860 ) -> Option<TaskExecutionSpec<'_>> {
1861 let execution_reason;
1862 let task_type;
1863 #[cfg(feature = "task_dirty_cause")]
1864 let cause;
1865 {
1866 let mut ctx = self.execute_context(turbo_tasks);
1867 let mut task = ctx.task(task_id, TaskDataCategory::All);
1868 task_type = task.get_task_type().to_owned();
1869 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1870 if let Some(tasks) = task.prefetch() {
1871 drop(task);
1872 ctx.prepare_tasks(tasks, "prefetch");
1873 task = ctx.task(task_id, TaskDataCategory::All);
1874 }
1875 let in_progress = task.take_in_progress()?;
1876 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1877 let old = task.set_in_progress(in_progress);
1878 debug_assert!(old.is_none(), "InProgress already exists");
1879 return None;
1880 };
1881 execution_reason = reason;
1882 #[cfg(feature = "task_dirty_cause")]
1883 {
1884 cause = match task.get_dirty() {
1885 Some(Dirtyness::Dirty { cause, .. }) => Some(cause.clone()),
1886 _ => None,
1887 };
1888 }
1889 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1890 InProgressStateInner {
1891 stale: false,
1892 once_task,
1893 done_event,
1894 marked_as_completed: false,
1895 new_children: Default::default(),
1896 },
1897 )));
1898 debug_assert!(old.is_none(), "InProgress already exists");
1899
1900 enum Collectible {
1902 Current(CollectibleRef, i32),
1903 Outdated(CollectibleRef),
1904 }
1905 let collectibles = task
1906 .iter_collectibles()
1907 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1908 .chain(
1909 task.iter_outdated_collectibles()
1910 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1911 )
1912 .collect::<Vec<_>>();
1913 for collectible in collectibles {
1914 match collectible {
1915 Collectible::Current(collectible, value) => {
1916 let _ = task.insert_outdated_collectible(collectible, value);
1917 }
1918 Collectible::Outdated(collectible) => {
1919 if task
1920 .collectibles()
1921 .is_none_or(|m| m.get(&collectible).is_none())
1922 {
1923 task.remove_outdated_collectibles(&collectible);
1924 }
1925 }
1926 }
1927 }
1928
1929 if self.should_track_dependencies() {
1930 let cell_dependencies = task.iter_cell_dependencies().collect();
1935 task.set_outdated_cell_dependencies(cell_dependencies);
1936 let cell_dependencies_hashed = task.iter_cell_dependencies_hashed().collect();
1937 task.set_outdated_cell_dependencies_hashed(cell_dependencies_hashed);
1938
1939 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1940 task.set_outdated_output_dependencies(outdated_output_dependencies);
1941 }
1942 }
1943
1944 let (span, future) = match task_type {
1945 TaskType::Cached(task_type) => {
1946 let CachedTaskType {
1947 native_fn,
1948 this,
1949 arg,
1950 } = &*task_type;
1951 (
1952 native_fn.span(
1953 task_id.persistence(),
1954 execution_reason,
1955 priority,
1956 #[cfg(feature = "task_dirty_cause")]
1957 cause.as_ref(),
1958 ),
1959 native_fn.execute(*this, &**arg),
1960 )
1961 }
1962 TaskType::Transient(task_type) => {
1963 let span = tracing::trace_span!("turbo_tasks::root_task");
1964 let future = match &*task_type {
1965 TransientTask::Root(f) => f(),
1966 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1967 };
1968 (span, future)
1969 }
1970 };
1971 Some(TaskExecutionSpec { future, span })
1972 }
1973
1974 fn task_execution_completed(
1978 &self,
1979 task_id: TaskId,
1980 result: Result<RawVc, TurboTasksExecutionError>,
1981 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1982 #[cfg(feature = "verify_determinism")] stateful: bool,
1983 has_invalidator: bool,
1984 turbo_tasks: &TurboTasks<TurboTasksBackend>,
1985 ) -> Option<TaskPriority> {
1986 #[cfg(not(feature = "trace_task_details"))]
2001 let span = tracing::trace_span!(
2002 "task execution completed",
2003 new_children = tracing::field::Empty
2004 )
2005 .entered();
2006 #[cfg(feature = "trace_task_details")]
2007 let span = tracing::trace_span!(
2008 "task execution completed",
2009 task_id = display(task_id),
2010 result = match result.as_ref() {
2011 Ok(value) => display(either::Either::Left(value)),
2012 Err(err) => display(either::Either::Right(err)),
2013 },
2014 new_children = tracing::field::Empty,
2015 immutable = tracing::field::Empty,
2016 new_output = tracing::field::Empty,
2017 output_dependents = tracing::field::Empty,
2018 aggregation_number = tracing::field::Empty,
2019 stale = tracing::field::Empty,
2020 )
2021 .entered();
2022
2023 let is_error = result.is_err();
2024
2025 let mut ctx = self.execute_context(turbo_tasks);
2026
2027 let TaskExecutionCompletePrepareResult {
2028 new_children,
2029 is_now_immutable,
2030 #[cfg(feature = "verify_determinism")]
2031 no_output_set,
2032 new_output,
2033 #[cfg(feature = "task_dirty_cause")]
2034 function_id,
2035 output_dependent_tasks,
2036 is_recomputation,
2037 is_session_dependent,
2038 } = match self.task_execution_completed_prepare(
2039 &mut ctx,
2040 #[cfg(feature = "trace_task_details")]
2041 &span,
2042 task_id,
2043 result,
2044 cell_counters,
2045 #[cfg(feature = "verify_determinism")]
2046 stateful,
2047 has_invalidator,
2048 ) {
2049 Ok(r) => r,
2050 Err(stale_priority) => {
2051 #[cfg(feature = "trace_task_details")]
2053 span.record("stale", "prepare");
2054 return Some(stale_priority);
2055 }
2056 };
2057
2058 #[cfg(feature = "trace_task_details")]
2059 span.record("new_output", new_output.is_some());
2060 #[cfg(feature = "trace_task_details")]
2061 span.record("output_dependents", output_dependent_tasks.len());
2062
2063 if !output_dependent_tasks.is_empty() {
2068 self.task_execution_completed_invalidate_output_dependent(
2069 &mut ctx,
2070 task_id,
2071 #[cfg(feature = "task_dirty_cause")]
2072 function_id,
2073 output_dependent_tasks,
2074 );
2075 }
2076
2077 let has_new_children = !new_children.is_empty();
2078 span.record("new_children", new_children.len());
2079
2080 if has_new_children {
2081 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2082 }
2083
2084 if has_new_children
2085 && let Some(stale_priority) =
2086 self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2087 {
2088 #[cfg(feature = "trace_task_details")]
2090 span.record("stale", "connect");
2091 return Some(stale_priority);
2092 }
2093
2094 let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2095 &mut ctx,
2096 task_id,
2097 #[cfg(feature = "verify_determinism")]
2098 no_output_set,
2099 new_output,
2100 is_now_immutable,
2101 is_session_dependent,
2102 );
2103 if let Some(stale_priority) = stale_priority {
2104 #[cfg(feature = "trace_task_details")]
2106 span.record("stale", "finish");
2107 return Some(stale_priority);
2108 }
2109
2110 let removed_data = self.task_execution_completed_cleanup(
2111 &mut ctx,
2112 task_id,
2113 cell_counters,
2114 is_error,
2115 is_recomputation,
2116 );
2117
2118 drop(removed_data);
2120 drop(in_progress_cells);
2121
2122 None
2123 }
2124
2125 fn task_execution_completed_prepare(
2126 &self,
2127 ctx: &mut impl ExecuteContext<'_>,
2128 #[cfg(feature = "trace_task_details")] span: &Span,
2129 task_id: TaskId,
2130 result: Result<RawVc, TurboTasksExecutionError>,
2131 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2132 #[cfg(feature = "verify_determinism")] stateful: bool,
2133 has_invalidator: bool,
2134 ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2135 let mut task = ctx.task(task_id, TaskDataCategory::All);
2136 let is_recomputation = task.is_dirty().is_none();
2137 let is_session_dependent = self.should_track_dependencies()
2140 && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2141 let Some(in_progress) = task.get_in_progress_mut() else {
2142 panic!("Task execution completed, but task is not in progress: {task:#?}");
2143 };
2144 if matches!(in_progress, InProgressState::Canceled) {
2145 return Ok(TaskExecutionCompletePrepareResult {
2146 new_children: Default::default(),
2147 is_now_immutable: false,
2148 #[cfg(feature = "verify_determinism")]
2149 no_output_set: false,
2150 #[cfg(feature = "task_dirty_cause")]
2151 function_id: None,
2152 new_output: None,
2153 output_dependent_tasks: Default::default(),
2154 is_recomputation,
2155 is_session_dependent,
2156 });
2157 }
2158 let &mut InProgressState::InProgress(box InProgressStateInner {
2159 stale,
2160 ref mut new_children,
2161 once_task: is_once_task,
2162 ..
2163 }) = in_progress
2164 else {
2165 panic!("Task execution completed, but task is not in progress: {task:#?}");
2166 };
2167
2168 #[cfg(not(feature = "no_fast_stale"))]
2170 if stale && !is_once_task {
2171 let stale_priority = compute_stale_priority(&task);
2172 let Some(InProgressState::InProgress(box InProgressStateInner {
2173 done_event,
2174 mut new_children,
2175 ..
2176 })) = task.take_in_progress()
2177 else {
2178 unreachable!();
2179 };
2180 let old = task.set_in_progress(InProgressState::Scheduled {
2181 done_event,
2182 reason: TaskExecutionReason::Stale,
2183 });
2184 debug_assert!(old.is_none(), "InProgress already exists");
2185 for task in task.iter_children() {
2188 new_children.remove(&task);
2189 }
2190 drop(task);
2191
2192 AggregationUpdateQueue::run(
2195 AggregationUpdateJob::DecreaseActiveCounts {
2196 task_ids: new_children.into_iter().collect(),
2197 },
2198 ctx,
2199 );
2200 return Err(stale_priority);
2201 }
2202
2203 let mut new_children = take(new_children);
2205
2206 #[cfg(feature = "task_dirty_cause")]
2208 let function_id = match task.get_task_type() {
2209 TaskTypeRef::Cached(task_type) => {
2210 Some(turbo_tasks::registry::get_function_id(task_type.native_fn))
2211 }
2212 TaskTypeRef::Transient(_) => None,
2213 };
2214
2215 #[cfg(feature = "verify_determinism")]
2217 if stateful {
2218 task.set_stateful(true);
2219 }
2220
2221 if has_invalidator {
2223 task.set_invalidator(true);
2224 }
2225
2226 if result.is_ok() || is_recomputation {
2236 let old_counters: FxHashMap<_, _> = task
2237 .iter_cell_type_max_index()
2238 .map(|(&k, &v)| (k, v))
2239 .collect();
2240 let mut counters_to_remove = old_counters.clone();
2241
2242 for (&cell_type, &max_index) in cell_counters.iter() {
2243 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2244 if old_max_index != max_index {
2245 task.insert_cell_type_max_index(cell_type, max_index);
2246 }
2247 } else {
2248 task.insert_cell_type_max_index(cell_type, max_index);
2249 }
2250 }
2251 for (cell_type, _) in counters_to_remove {
2252 task.remove_cell_type_max_index(&cell_type);
2253 }
2254 }
2255
2256 let mut queue = AggregationUpdateQueue::new();
2257
2258 let mut old_edges = Vec::new();
2259
2260 let has_children = !new_children.is_empty();
2261 let is_immutable = task.immutable();
2262 let task_dependencies_for_immutable =
2263 if !is_immutable
2265 && !is_session_dependent
2267 && !task.invalidator()
2269 && task.is_collectibles_dependencies_empty()
2271 {
2272 Some(
2273 task.iter_output_dependencies()
2275 .chain(task.iter_cell_dependencies().map(|r| r.task))
2276 .chain(task.iter_cell_dependencies_hashed().map(|(r, _)| r.task))
2277 .collect::<FxHashSet<_>>(),
2278 )
2279 } else {
2280 None
2281 };
2282
2283 if has_children {
2284 let _aggregation_number =
2286 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2287
2288 #[cfg(feature = "trace_task_details")]
2289 span.record("aggregation_number", _aggregation_number);
2290
2291 old_edges.extend(
2293 task.iter_children()
2294 .filter(|task| !new_children.remove(task))
2295 .map(OutdatedEdge::Child),
2296 );
2297 } else {
2298 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2299 }
2300
2301 old_edges.extend(
2302 task.iter_outdated_collectibles()
2303 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2304 );
2305
2306 if self.should_track_dependencies() {
2307 old_edges.extend(
2314 task.iter_outdated_cell_dependencies()
2315 .map(OutdatedEdge::CellDependency),
2316 );
2317 old_edges.extend(
2318 task.iter_outdated_cell_dependencies_hashed()
2319 .map(|(r, k)| OutdatedEdge::HashedCellDependency(r, k)),
2320 );
2321 old_edges.extend(
2322 task.iter_outdated_output_dependencies()
2323 .map(OutdatedEdge::OutputDependency),
2324 );
2325 }
2326
2327 let current_output = task.get_output();
2329 #[cfg(feature = "verify_determinism")]
2330 let no_output_set = current_output.is_none();
2331 let new_output = match result {
2332 Ok(RawVc::TaskOutput(output_task_id)) => {
2333 if let Some(OutputValue::Output(current_task_id)) = current_output
2334 && *current_task_id == output_task_id
2335 {
2336 None
2337 } else {
2338 Some(OutputValue::Output(output_task_id))
2339 }
2340 }
2341 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2342 if let Some(OutputValue::Cell(CellRef {
2343 task: current_task_id,
2344 cell: current_cell,
2345 })) = current_output
2346 && *current_task_id == output_task_id
2347 && *current_cell == cell
2348 {
2349 None
2350 } else {
2351 Some(OutputValue::Cell(CellRef {
2352 task: output_task_id,
2353 cell,
2354 }))
2355 }
2356 }
2357 Ok(RawVc::LocalOutput(..)) => {
2358 panic!("Non-local tasks must not return a local Vc");
2359 }
2360 Err(err) => {
2361 if let Some(OutputValue::Error(old_error)) = current_output
2362 && **old_error == err
2363 {
2364 None
2365 } else {
2366 Some(OutputValue::Error(Arc::new((&err).into())))
2367 }
2368 }
2369 };
2370 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2371 if new_output.is_some() && ctx.should_track_dependencies() {
2373 output_dependent_tasks = task.iter_output_dependent().collect();
2374 }
2375
2376 drop(task);
2377
2378 let mut is_now_immutable = false;
2380 if let Some(dependencies) = task_dependencies_for_immutable
2381 && dependencies
2382 .iter()
2383 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2384 {
2385 is_now_immutable = true;
2386 }
2387 #[cfg(feature = "trace_task_details")]
2388 span.record("immutable", is_immutable || is_now_immutable);
2389
2390 if !queue.is_empty() || !old_edges.is_empty() {
2391 #[cfg(any(
2392 feature = "trace_task_completion",
2393 feature = "trace_aggregation_update_stats"
2394 ))]
2395 let _span =
2396 tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2397 .entered();
2398 #[cfg(feature = "trace_aggregation_update_stats")]
2402 {
2403 let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2404 _span.record("stats", tracing::field::debug(stats));
2405 }
2406 #[cfg(not(feature = "trace_aggregation_update_stats"))]
2407 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2408 }
2409
2410 Ok(TaskExecutionCompletePrepareResult {
2411 new_children,
2412 is_now_immutable,
2413 #[cfg(feature = "verify_determinism")]
2414 no_output_set,
2415 #[cfg(feature = "task_dirty_cause")]
2416 function_id,
2417 new_output,
2418 output_dependent_tasks,
2419 is_recomputation,
2420 is_session_dependent,
2421 })
2422 }
2423
2424 fn task_execution_completed_invalidate_output_dependent(
2425 &self,
2426 ctx: &mut impl ExecuteContext<'_>,
2427 task_id: TaskId,
2428 #[cfg(feature = "task_dirty_cause")] function_id: Option<FunctionId>,
2429 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2430 ) {
2431 debug_assert!(!output_dependent_tasks.is_empty());
2432
2433 #[cfg(feature = "task_dirty_cause")]
2434 let cause = match function_id {
2435 Some(function) => TaskDirtyCause::OutputChange { function },
2436 None => TaskDirtyCause::RootOutputChange,
2437 };
2438
2439 if output_dependent_tasks.len() > 1 {
2440 ctx.prepare_tasks(
2441 output_dependent_tasks
2442 .iter()
2443 .map(|&id| (id, TaskDataCategory::All)),
2444 "invalidate output dependents",
2445 );
2446 }
2447
2448 fn process_output_dependents(
2449 ctx: &mut impl ExecuteContext<'_>,
2450 task_id: TaskId,
2451 #[cfg(feature = "task_dirty_cause")] cause: &TaskDirtyCause,
2452 dependent_task_id: TaskId,
2453 queue: &mut AggregationUpdateQueue,
2454 ) {
2455 #[cfg(feature = "trace_task_output_dependencies")]
2456 let span = tracing::trace_span!(
2457 "invalidate output dependency",
2458 task = %task_id,
2459 dependent_task = %dependent_task_id,
2460 result = tracing::field::Empty,
2461 )
2462 .entered();
2463 let mut make_stale = true;
2464 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2465 let transient_task_type = dependent.get_transient_task_type();
2466 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2467 #[cfg(feature = "trace_task_output_dependencies")]
2469 span.record("result", "once task");
2470 return;
2471 }
2472 if dependent.outdated_output_dependencies_contains(&task_id) {
2473 #[cfg(feature = "trace_task_output_dependencies")]
2474 span.record("result", "outdated dependency");
2475 make_stale = false;
2480 } else if !dependent.output_dependencies_contains(&task_id) {
2481 #[cfg(feature = "trace_task_output_dependencies")]
2484 span.record("result", "no backward dependency");
2485 return;
2486 }
2487 make_task_dirty_internal(
2488 dependent,
2489 dependent_task_id,
2490 make_stale,
2491 #[cfg(feature = "task_dirty_cause")]
2492 cause.clone(),
2493 queue,
2494 ctx,
2495 );
2496 #[cfg(feature = "trace_task_output_dependencies")]
2497 span.record("result", "marked dirty");
2498 }
2499
2500 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2501 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2502 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2503 let _ = scope_and_block(chunks.len(), |scope| {
2504 for chunk in chunks {
2505 let child_ctx = ctx.child_context();
2506 #[cfg(feature = "task_dirty_cause")]
2507 let cause = &cause;
2508 scope.spawn(move || {
2509 let mut ctx = child_ctx.create();
2510 let mut queue = AggregationUpdateQueue::new();
2511 for dependent_task_id in chunk {
2512 process_output_dependents(
2513 &mut ctx,
2514 task_id,
2515 #[cfg(feature = "task_dirty_cause")]
2516 cause,
2517 dependent_task_id,
2518 &mut queue,
2519 )
2520 }
2521 queue.execute(&mut ctx);
2522 });
2523 }
2524 });
2525 } else {
2526 let mut queue = AggregationUpdateQueue::new();
2527 for dependent_task_id in output_dependent_tasks {
2528 process_output_dependents(
2529 ctx,
2530 task_id,
2531 #[cfg(feature = "task_dirty_cause")]
2532 &cause,
2533 dependent_task_id,
2534 &mut queue,
2535 );
2536 }
2537 queue.execute(ctx);
2538 }
2539 }
2540
2541 fn task_execution_completed_unfinished_children_dirty(
2542 &self,
2543 ctx: &mut impl ExecuteContext<'_>,
2544 new_children: &FxHashSet<TaskId>,
2545 ) {
2546 debug_assert!(!new_children.is_empty());
2547
2548 let mut queue = AggregationUpdateQueue::new();
2549 ctx.for_each_task_all(
2550 new_children.iter().copied(),
2551 "unfinished children dirty",
2552 |child_task, ctx| {
2553 if !child_task.has_output() {
2554 let child_id = child_task.id();
2555 make_task_dirty_internal(
2556 child_task,
2557 child_id,
2558 false,
2559 #[cfg(feature = "task_dirty_cause")]
2560 TaskDirtyCause::InitialDirty,
2561 &mut queue,
2562 ctx,
2563 );
2564 }
2565 },
2566 );
2567
2568 queue.execute(ctx);
2569 }
2570
2571 fn task_execution_completed_connect(
2572 &self,
2573 ctx: &mut impl ExecuteContext<'_>,
2574 task_id: TaskId,
2575 new_children: FxHashSet<TaskId>,
2576 ) -> Option<TaskPriority> {
2577 debug_assert!(!new_children.is_empty());
2578
2579 let mut task = ctx.task(task_id, TaskDataCategory::All);
2580 let Some(in_progress) = task.get_in_progress() else {
2581 panic!("Task execution completed, but task is not in progress: {task:#?}");
2582 };
2583 if matches!(in_progress, InProgressState::Canceled) {
2584 return None;
2586 }
2587 let InProgressState::InProgress(box InProgressStateInner {
2588 #[cfg(not(feature = "no_fast_stale"))]
2589 stale,
2590 once_task: is_once_task,
2591 ..
2592 }) = in_progress
2593 else {
2594 panic!("Task execution completed, but task is not in progress: {task:#?}");
2595 };
2596
2597 #[cfg(not(feature = "no_fast_stale"))]
2599 if *stale && !is_once_task {
2600 let stale_priority = compute_stale_priority(&task);
2601 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2602 task.take_in_progress()
2603 else {
2604 unreachable!();
2605 };
2606 let old = task.set_in_progress(InProgressState::Scheduled {
2607 done_event,
2608 reason: TaskExecutionReason::Stale,
2609 });
2610 debug_assert!(old.is_none(), "InProgress already exists");
2611 drop(task);
2612
2613 AggregationUpdateQueue::run(
2616 AggregationUpdateJob::DecreaseActiveCounts {
2617 task_ids: new_children.into_iter().collect(),
2618 },
2619 ctx,
2620 );
2621 return Some(stale_priority);
2622 }
2623
2624 let has_active_count = ctx.should_track_activeness()
2625 && task
2626 .get_activeness()
2627 .is_some_and(|activeness| activeness.active_counter > 0);
2628 connect_children(
2629 ctx,
2630 task_id,
2631 task,
2632 new_children,
2633 has_active_count,
2634 ctx.should_track_activeness(),
2635 );
2636
2637 None
2638 }
2639
2640 #[allow(clippy::type_complexity)]
2641 fn task_execution_completed_finish(
2642 &self,
2643 ctx: &mut impl ExecuteContext<'_>,
2644 task_id: TaskId,
2645 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2646 new_output: Option<OutputValue>,
2647 is_now_immutable: bool,
2648 is_session_dependent: bool,
2649 ) -> (
2650 Option<TaskPriority>,
2651 Option<
2652 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2653 >,
2654 ) {
2655 let mut task = ctx.task(task_id, TaskDataCategory::All);
2656 let Some(in_progress) = task.take_in_progress() else {
2657 panic!("Task execution completed, but task is not in progress: {task:#?}");
2658 };
2659 if matches!(in_progress, InProgressState::Canceled) {
2660 return (None, None);
2662 }
2663 let InProgressState::InProgress(box InProgressStateInner {
2664 done_event,
2665 once_task: is_once_task,
2666 stale,
2667 marked_as_completed: _,
2668 new_children,
2669 }) = in_progress
2670 else {
2671 panic!("Task execution completed, but task is not in progress: {task:#?}");
2672 };
2673 debug_assert!(new_children.is_empty());
2674
2675 if stale && !is_once_task {
2677 let stale_priority = compute_stale_priority(&task);
2678 let old = task.set_in_progress(InProgressState::Scheduled {
2679 done_event,
2680 reason: TaskExecutionReason::Stale,
2681 });
2682 debug_assert!(old.is_none(), "InProgress already exists");
2683 return (Some(stale_priority), None);
2684 }
2685
2686 let mut old_content = None;
2688 if let Some(value) = new_output {
2689 old_content = task.set_output(value);
2690 }
2691
2692 if is_now_immutable {
2695 task.set_immutable(true);
2696 }
2697
2698 let in_progress_cells = task.take_in_progress_cells();
2700 if let Some(ref cells) = in_progress_cells {
2701 for state in cells.values() {
2702 state.event.notify(usize::MAX);
2703 }
2704 }
2705
2706 let new_dirtyness = if is_session_dependent {
2708 Some(Dirtyness::SessionDependent)
2709 } else {
2710 None
2711 };
2712 #[cfg(feature = "verify_determinism")]
2713 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2714 let data_update = task.update_dirty_state(new_dirtyness);
2715
2716 #[cfg(feature = "verify_determinism")]
2720 let stale_priority: Option<TaskPriority> =
2721 ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2722 .then(TaskPriority::leaf);
2723 #[cfg(not(feature = "verify_determinism"))]
2724 let stale_priority: Option<TaskPriority> = None;
2725 if stale_priority.is_some() {
2726 let old = task.set_in_progress(InProgressState::Scheduled {
2727 done_event,
2728 reason: TaskExecutionReason::Stale,
2729 });
2730 debug_assert!(old.is_none(), "InProgress already exists");
2731 drop(task);
2732 } else {
2733 drop(task);
2734
2735 done_event.notify(usize::MAX);
2737 }
2738
2739 drop(old_content);
2740
2741 if let Some(data_update) = data_update {
2742 AggregationUpdateQueue::run(data_update, ctx);
2743 }
2744
2745 (stale_priority, in_progress_cells)
2747 }
2748
2749 fn task_execution_completed_cleanup(
2750 &self,
2751 ctx: &mut impl ExecuteContext<'_>,
2752 task_id: TaskId,
2753 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2754 is_error: bool,
2755 is_recomputation: bool,
2756 ) -> Vec<SharedReference> {
2757 let mut task = ctx.task(task_id, TaskDataCategory::All);
2758 let mut removed_cell_data = Vec::new();
2759 if !is_error || is_recomputation {
2765 let to_remove: Vec<_> = task
2771 .iter_cell_data()
2772 .filter_map(|(cell, _)| {
2773 cell_counters
2774 .get(&cell.type_id)
2775 .is_none_or(|start_index| cell.index >= *start_index)
2776 .then_some(*cell)
2777 })
2778 .collect();
2779 removed_cell_data.reserve_exact(to_remove.len());
2780 for cell in to_remove {
2781 if let Some(data) = task.remove_cell_data(&cell) {
2782 removed_cell_data.push(data);
2783 }
2784 }
2785 let to_remove_hash: Vec<_> = task
2787 .iter_cell_data_hash()
2788 .filter_map(|(cell, _)| {
2789 cell_counters
2790 .get(&cell.type_id)
2791 .is_none_or(|start_index| cell.index >= *start_index)
2792 .then_some(*cell)
2793 })
2794 .collect();
2795 for cell in to_remove_hash {
2796 task.remove_cell_data_hash(&cell);
2797 }
2798 }
2799
2800 task.cleanup_after_execution();
2804
2805 drop(task);
2806
2807 removed_cell_data
2809 }
2810
2811 fn log_unrecoverable_persist_error() {
2814 eprintln!(
2815 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2816 background persisting process."
2817 );
2818 }
2819
2820 fn run_backend_job<'a>(
2821 &'a self,
2822 job: TurboTasksBackendJob,
2823 turbo_tasks: &'a TurboTasks<TurboTasksBackend>,
2824 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2825 Box::pin(async move {
2826 match job {
2827 TurboTasksBackendJob::Snapshot => {
2828 debug_assert!(self.should_persist());
2829
2830 let mut last_snapshot = self.start_time;
2831 let mut idle_start_listener = self.idle_start_event.listen();
2832 let mut idle_end_listener = self.idle_end_event.listen();
2833 let mut fresh_idle = true;
2836 let mut evicted = false;
2837 let mut is_first = true;
2838 'outer: loop {
2839 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2840 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2841 let idle_timeout = *IDLE_TIMEOUT;
2842 let (time, mut reason) = if is_first {
2843 (FIRST_SNAPSHOT_WAIT, SnapshotReason::InitialSnapshotTimeout)
2844 } else {
2845 (SNAPSHOT_INTERVAL, SnapshotReason::RegularSnapshotInterval)
2846 };
2847
2848 let until = last_snapshot + time;
2849 if until > Instant::now() {
2850 let mut stop_listener = self.stopping_event.listen();
2851 if self.stopping.load(Ordering::Acquire) {
2852 return;
2853 }
2854 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2855 Instant::now() + idle_timeout
2856 } else {
2857 far_future()
2858 };
2859 loop {
2860 tokio::select! {
2861 _ = &mut stop_listener => {
2862 return;
2863 },
2864 _ = &mut idle_start_listener => {
2865 idle_time = Instant::now() + idle_timeout;
2866 idle_start_listener = self.idle_start_event.listen()
2867 },
2868 _ = &mut idle_end_listener => {
2869 idle_time = far_future();
2870 idle_end_listener = self.idle_end_event.listen()
2871 },
2872 _ = tokio::time::sleep_until(until) => {
2873 break;
2874 },
2875 _ = tokio::time::sleep_until(idle_time) => {
2876 if turbo_tasks.is_idle() {
2877 reason = SnapshotReason::IdleTimeout;
2878 break;
2879 }
2880 },
2881 }
2882 }
2883 }
2884
2885 let background_span =
2889 tracing::info_span!(parent: None, "background snapshot");
2890 match self.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2891 Err(err) => {
2892 eprintln!("Persisting failed: {err:?}");
2895 Self::log_unrecoverable_persist_error();
2896 return;
2897 }
2898 Ok((snapshot_start, new_data)) => {
2899 fresh_idle = new_data;
2901 is_first = false;
2902 last_snapshot = snapshot_start;
2903
2904 macro_rules! check_idle_ended {
2908 () => {{
2909 tokio::select! {
2910 biased;
2911 _ = &mut idle_end_listener => {
2912 idle_end_listener = self.idle_end_event.listen();
2913 true
2914 },
2915 _ = std::future::ready(()) => false,
2916 }
2917 }};
2918 }
2919 let mut ran_eviction = false;
2937 if self.should_evict() && (new_data || !evicted) {
2938 if check_idle_ended!() {
2939 continue 'outer;
2942 }
2943 evicted = true;
2944 ran_eviction = true;
2945 self.storage.evict_after_snapshot(background_span.id());
2946 }
2947
2948 let mut ran_compaction = false;
2955 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2956 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2957 if check_idle_ended!() {
2958 continue 'outer;
2959 }
2960 let _compact_span = tracing::info_span!(
2964 parent: background_span.id(),
2965 "compact database"
2966 )
2967 .entered();
2968 match self.backing_storage.compact() {
2969 Ok(true) => {
2970 ran_compaction = true;
2971 }
2972 Ok(false) => break,
2973 Err(err) => {
2974 eprintln!("Compaction failed: {err:?}");
2975 if self.backing_storage.has_unrecoverable_write_error()
2976 {
2977 Self::log_unrecoverable_persist_error();
2978 return;
2979 }
2980 break;
2981 }
2982 }
2983 }
2984 if check_idle_ended!() {
2985 continue 'outer;
2986 }
2987 if new_data || ran_compaction || ran_eviction {
2992 turbo_tasks_malloc::TurboMalloc::collect(true);
2993 }
2994 }
2995 }
2996 }
2997 }
2998 }
2999 })
3000 }
3001
3002 fn try_read_own_task_cell(
3003 &self,
3004 task_id: TaskId,
3005 cell: CellId,
3006 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3007 ) -> Result<TypedCellContent> {
3008 let mut ctx = self.execute_context(turbo_tasks);
3009 let task = ctx.task(task_id, TaskDataCategory::Data);
3010 if let Some(content) = task.get_cell_data(&cell).cloned() {
3011 Ok(CellContent(Some(content)).into_typed(cell.type_id))
3012 } else {
3013 Ok(CellContent(None).into_typed(cell.type_id))
3014 }
3015 }
3016
3017 fn read_task_collectibles(
3018 &self,
3019 task_id: TaskId,
3020 collectible_type: TraitTypeId,
3021 reader_id: Option<TaskId>,
3022 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3023 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3024 let mut ctx = self.execute_context(turbo_tasks);
3025 let mut collectibles = AutoMap::default();
3026 {
3027 let mut task = ctx.task(task_id, TaskDataCategory::All);
3028 if task
3029 .get_persistent_task_type()
3030 .is_some_and(|t| !t.native_fn.is_root)
3031 {
3032 drop(task);
3033 panic!(
3034 "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
3035 is missing on the task.",
3036 self.debug_get_task_description(task_id),
3037 reader_id.map_or_else(
3038 || "unknown".to_string(),
3039 |r| self.debug_get_task_description(r)
3040 )
3041 );
3042 }
3043 for (collectible, count) in task.iter_aggregated_collectibles() {
3044 if *count > 0 && collectible.collectible_type == collectible_type {
3045 *collectibles
3046 .entry(RawVc::TaskCell(
3047 collectible.cell.task,
3048 collectible.cell.cell,
3049 ))
3050 .or_insert(0) += 1;
3051 }
3052 }
3053 for (&collectible, &count) in task.iter_collectibles() {
3054 if collectible.collectible_type == collectible_type {
3055 *collectibles
3056 .entry(RawVc::TaskCell(
3057 collectible.cell.task,
3058 collectible.cell.cell,
3059 ))
3060 .or_insert(0) += count;
3061 }
3062 }
3063 if let Some(reader_id) = reader_id {
3064 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3065 }
3066 }
3067 if let Some(reader_id) = reader_id {
3068 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3069 let target = CollectiblesRef {
3070 task: task_id,
3071 collectible_type,
3072 };
3073 if !reader.remove_outdated_collectibles_dependencies(&target) {
3074 let _ = reader.add_collectibles_dependencies(target);
3075 }
3076 }
3077 collectibles
3078 }
3079
3080 fn emit_collectible(
3081 &self,
3082 collectible_type: TraitTypeId,
3083 collectible: RawVc,
3084 task_id: TaskId,
3085 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3086 ) {
3087 self.assert_valid_collectible(task_id, collectible);
3088
3089 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3090 panic!("Collectibles need to be resolved");
3091 };
3092 let cell = CellRef {
3093 task: collectible_task,
3094 cell,
3095 };
3096 operation::UpdateCollectibleOperation::run(
3097 task_id,
3098 CollectibleRef {
3099 collectible_type,
3100 cell,
3101 },
3102 1,
3103 self.execute_context(turbo_tasks),
3104 );
3105 }
3106
3107 fn unemit_collectible(
3108 &self,
3109 collectible_type: TraitTypeId,
3110 collectible: RawVc,
3111 count: u32,
3112 task_id: TaskId,
3113 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3114 ) {
3115 self.assert_valid_collectible(task_id, collectible);
3116
3117 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3118 panic!("Collectibles need to be resolved");
3119 };
3120 let cell = CellRef {
3121 task: collectible_task,
3122 cell,
3123 };
3124 operation::UpdateCollectibleOperation::run(
3125 task_id,
3126 CollectibleRef {
3127 collectible_type,
3128 cell,
3129 },
3130 -(i32::try_from(count).unwrap()),
3131 self.execute_context(turbo_tasks),
3132 );
3133 }
3134
3135 fn update_task_cell(
3136 &self,
3137 task_id: TaskId,
3138 cell: CellId,
3139 content: CellContent,
3140 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3141 content_hash: Option<CellHash>,
3142 verification_mode: VerificationMode,
3143 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3144 ) {
3145 operation::UpdateCellOperation::run(
3146 task_id,
3147 cell,
3148 content,
3149 updated_key_hashes,
3150 content_hash,
3151 verification_mode,
3152 self.execute_context(turbo_tasks),
3153 );
3154 }
3155
3156 fn mark_own_task_as_finished(&self, task: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
3157 let mut ctx = self.execute_context(turbo_tasks);
3158 let mut task = ctx.task(task, TaskDataCategory::Data);
3159 if let Some(InProgressState::InProgress(box InProgressStateInner {
3160 marked_as_completed,
3161 ..
3162 })) = task.get_in_progress_mut()
3163 {
3164 *marked_as_completed = true;
3165 }
3170 }
3171
3172 fn connect_task(
3173 &self,
3174 task: TaskId,
3175 parent_task: Option<TaskId>,
3176 turbo_tasks: &TurboTasks<TurboTasksBackend>,
3177 ) {
3178 self.assert_not_persistent_calling_transient(parent_task, task, None);
3179 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3180 }
3181
3182 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3183 let task_id = self.transient_task_id_factory.get();
3184 {
3185 let mut task = self.storage.access_mut(task_id);
3186 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3187 }
3188 #[cfg(feature = "verify_aggregation_graph")]
3189 self.root_tasks.lock().insert(task_id);
3190 task_id
3191 }
3192
3193 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend>) {
3194 #[cfg(feature = "verify_aggregation_graph")]
3195 self.root_tasks.lock().remove(&task_id);
3196
3197 let mut ctx = self.execute_context(turbo_tasks);
3198 let mut task = ctx.task(task_id, TaskDataCategory::All);
3199 let is_dirty = task.is_dirty();
3200 let has_dirty_containers = task.has_dirty_containers();
3201 if is_dirty.is_some() || has_dirty_containers {
3202 if let Some(activeness_state) = task.get_activeness_mut() {
3203 activeness_state.unset_root_type();
3205 activeness_state.set_active_until_clean();
3206 };
3207 } else if let Some(activeness_state) = task.take_activeness() {
3208 activeness_state.all_clean_event.notify(usize::MAX);
3211 }
3212 }
3213
3214 #[cfg(feature = "verify_aggregation_graph")]
3215 fn verify_aggregation_graph(&self, turbo_tasks: &TurboTasks<TurboTasksBackend>, idle: bool) {
3216 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3217 return;
3218 }
3219 use std::{collections::VecDeque, env, io::stdout};
3220
3221 use crate::backend::operation::{get_uppers, is_aggregating_node};
3222
3223 let mut ctx = self.execute_context(turbo_tasks);
3224 let root_tasks = self.root_tasks.lock().clone();
3225
3226 for task_id in root_tasks.into_iter() {
3227 let mut queue = VecDeque::new();
3228 let mut visited = FxHashSet::default();
3229 let mut aggregated_nodes = FxHashSet::default();
3230 let mut collectibles = FxHashMap::default();
3231 let root_task_id = task_id;
3232 visited.insert(task_id);
3233 aggregated_nodes.insert(task_id);
3234 queue.push_back(task_id);
3235 let mut counter = 0;
3236 while let Some(task_id) = queue.pop_front() {
3237 counter += 1;
3238 if counter % 100000 == 0 {
3239 println!(
3240 "queue={}, visited={}, aggregated_nodes={}",
3241 queue.len(),
3242 visited.len(),
3243 aggregated_nodes.len()
3244 );
3245 }
3246 let task = ctx.task(task_id, TaskDataCategory::All);
3247 if idle && !self.is_idle.load(Ordering::Relaxed) {
3248 return;
3249 }
3250
3251 let uppers = get_uppers(&task);
3252 if task_id != root_task_id
3253 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3254 {
3255 panic!(
3256 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3257 {:?})",
3258 task_id,
3259 task.get_task_description(),
3260 uppers
3261 );
3262 }
3263
3264 for (collectible, _) in task.iter_aggregated_collectibles() {
3265 collectibles
3266 .entry(*collectible)
3267 .or_insert_with(|| (false, Vec::new()))
3268 .1
3269 .push(task_id);
3270 }
3271
3272 for (&collectible, &value) in task.iter_collectibles() {
3273 if value > 0 {
3274 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3275 *flag = true
3276 } else {
3277 panic!(
3278 "Task {} has a collectible {:?} that is not in any upper task",
3279 task_id, collectible
3280 );
3281 }
3282 }
3283 }
3284
3285 let is_dirty = task.has_dirty();
3286 let has_dirty_container = task.has_dirty_containers();
3287 let should_be_in_upper = is_dirty || has_dirty_container;
3288
3289 let aggregation_number = get_aggregation_number(&task);
3290 if is_aggregating_node(aggregation_number) {
3291 aggregated_nodes.insert(task_id);
3292 }
3293 for child_id in task.iter_children() {
3300 if visited.insert(child_id) {
3302 queue.push_back(child_id);
3303 }
3304 }
3305 drop(task);
3306
3307 if should_be_in_upper {
3308 for upper_id in uppers {
3309 let upper = ctx.task(upper_id, TaskDataCategory::All);
3310 let in_upper = upper
3311 .get_aggregated_dirty_containers(&task_id)
3312 .is_some_and(|&dirty| dirty > 0);
3313 if !in_upper {
3314 let containers: Vec<_> = upper
3315 .iter_aggregated_dirty_containers()
3316 .map(|(&k, &v)| (k, v))
3317 .collect();
3318 let upper_task_desc = upper.get_task_description();
3319 drop(upper);
3320 panic!(
3321 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3322 ({})\nThese dirty containers are present:\n{:#?}",
3323 task_id,
3324 ctx.task(task_id, TaskDataCategory::Data)
3325 .get_task_description(),
3326 upper_id,
3327 upper_task_desc,
3328 containers,
3329 );
3330 }
3331 }
3332 }
3333 }
3334
3335 for (collectible, (flag, task_ids)) in collectibles {
3336 if !flag {
3337 use std::io::Write;
3338 let mut stdout = stdout().lock();
3339 writeln!(
3340 stdout,
3341 "{:?} that is not emitted in any child task but in these aggregated \
3342 tasks: {:#?}",
3343 collectible,
3344 task_ids
3345 .iter()
3346 .map(|t| format!(
3347 "{t} {}",
3348 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3349 ))
3350 .collect::<Vec<_>>()
3351 )
3352 .unwrap();
3353
3354 let task_id = collectible.cell.task;
3355 let mut queue = {
3356 let task = ctx.task(task_id, TaskDataCategory::All);
3357 get_uppers(&task)
3358 };
3359 let mut visited = FxHashSet::default();
3360 for &upper_id in queue.iter() {
3361 visited.insert(upper_id);
3362 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3363 }
3364 while let Some(task_id) = queue.pop() {
3365 let task = ctx.task(task_id, TaskDataCategory::All);
3366 let desc = task.get_task_description();
3367 let aggregated_collectible = task
3368 .get_aggregated_collectibles(&collectible)
3369 .copied()
3370 .unwrap_or_default();
3371 let uppers = get_uppers(&task);
3372 drop(task);
3373 writeln!(
3374 stdout,
3375 "upper {task_id} {desc} collectible={aggregated_collectible}"
3376 )
3377 .unwrap();
3378 if task_ids.contains(&task_id) {
3379 writeln!(
3380 stdout,
3381 "Task has an upper connection to an aggregated task that doesn't \
3382 reference it. Upper connection is invalid!"
3383 )
3384 .unwrap();
3385 }
3386 for upper_id in uppers {
3387 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3388 if !visited.contains(&upper_id) {
3389 queue.push(upper_id);
3390 }
3391 }
3392 }
3393 panic!("See stdout for more details");
3394 }
3395 }
3396 }
3397 }
3398
3399 fn assert_not_persistent_calling_transient(
3400 &self,
3401 parent_id: Option<TaskId>,
3402 child_id: TaskId,
3403 cell_id: Option<CellId>,
3404 ) {
3405 if let Some(parent_id) = parent_id
3406 && !parent_id.is_transient()
3407 && child_id.is_transient()
3408 {
3409 self.panic_persistent_calling_transient(
3410 self.debug_get_task_description(parent_id),
3411 self.debug_get_cached_task_type(child_id).as_deref(),
3412 cell_id,
3413 );
3414 }
3415 }
3416
3417 fn panic_persistent_calling_transient(
3418 &self,
3419 parent: String,
3420 child: Option<&CachedTaskType>,
3421 cell_id: Option<CellId>,
3422 ) -> ! {
3423 let transient_reason = if let Some(child) = child {
3424 Cow::Owned(format!(
3425 " The callee is transient because it depends on:\n{}",
3426 self.debug_trace_transient_task(child, cell_id),
3427 ))
3428 } else {
3429 Cow::Borrowed("")
3430 };
3431 panic!(
3432 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3433 parent,
3434 child.map_or("unknown", |t| t.get_name()),
3435 transient_reason,
3436 );
3437 }
3438
3439 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3440 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3442 let task_info = if let Some(col_task_ty) = collectible
3444 .try_get_task_id()
3445 .map(|t| self.debug_get_task_description(t))
3446 {
3447 Cow::Owned(format!(" (return type of {col_task_ty})"))
3448 } else {
3449 Cow::Borrowed("")
3450 };
3451 panic!("Collectible{task_info} must be a ResolvedVc")
3452 };
3453 if col_task_id.is_transient() && !task_id.is_transient() {
3454 let transient_reason =
3455 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3456 Cow::Owned(format!(
3457 ". The collectible is transient because it depends on:\n{}",
3458 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3459 ))
3460 } else {
3461 Cow::Borrowed("")
3462 };
3463 panic!(
3465 "Collectible is transient, transient collectibles cannot be emitted from \
3466 persistent tasks{transient_reason}",
3467 )
3468 }
3469 }
3470}
3471
3472impl Backend for TurboTasksBackend {
3473 fn startup(&self, turbo_tasks: &TurboTasks<Self>) {
3474 self.startup(turbo_tasks);
3475 }
3476
3477 fn stopping(&self, _turbo_tasks: &TurboTasks<Self>) {
3478 self.stopping();
3479 }
3480
3481 fn stop(&self, turbo_tasks: &TurboTasks<Self>) {
3482 self.stop(turbo_tasks);
3483 }
3484
3485 fn idle_start(&self, turbo_tasks: &TurboTasks<Self>) {
3486 self.idle_start(turbo_tasks);
3487 }
3488
3489 fn idle_end(&self, _turbo_tasks: &TurboTasks<Self>) {
3490 self.idle_end();
3491 }
3492
3493 fn get_or_create_task(
3494 &self,
3495 native_fn: &'static NativeFunction,
3496 this: Option<RawVc>,
3497 arg: &mut dyn DynTaskInputsStorage,
3498 parent_task: Option<TaskId>,
3499 persistence: TaskPersistence,
3500 turbo_tasks: &TurboTasks<Self>,
3501 ) -> TaskId {
3502 self.get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3503 }
3504
3505 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3506 self.invalidate_task(task_id, turbo_tasks);
3507 }
3508
3509 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<Self>) {
3510 self.invalidate_tasks(tasks, turbo_tasks);
3511 }
3512
3513 fn invalidate_tasks_set(
3514 &self,
3515 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3516 turbo_tasks: &TurboTasks<Self>,
3517 ) {
3518 self.invalidate_tasks_set(tasks, turbo_tasks);
3519 }
3520
3521 fn invalidate_serialization(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3522 self.invalidate_serialization(task_id, turbo_tasks);
3523 }
3524
3525 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) {
3526 self.task_execution_canceled(task, turbo_tasks)
3527 }
3528
3529 fn try_start_task_execution(
3530 &self,
3531 task_id: TaskId,
3532 priority: TaskPriority,
3533 turbo_tasks: &TurboTasks<Self>,
3534 ) -> Option<TaskExecutionSpec<'_>> {
3535 self.try_start_task_execution(task_id, priority, turbo_tasks)
3536 }
3537
3538 fn task_execution_completed(
3539 &self,
3540 task_id: TaskId,
3541 result: Result<RawVc, TurboTasksExecutionError>,
3542 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3543 #[cfg(feature = "verify_determinism")] stateful: bool,
3544 has_invalidator: bool,
3545 turbo_tasks: &TurboTasks<Self>,
3546 ) -> Option<TaskPriority> {
3547 self.task_execution_completed(
3548 task_id,
3549 result,
3550 cell_counters,
3551 #[cfg(feature = "verify_determinism")]
3552 stateful,
3553 has_invalidator,
3554 turbo_tasks,
3555 )
3556 }
3557
3558 type BackendJob = TurboTasksBackendJob;
3559
3560 fn run_backend_job<'a>(
3561 &'a self,
3562 job: Self::BackendJob,
3563 turbo_tasks: &'a TurboTasks<Self>,
3564 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3565 self.run_backend_job(job, turbo_tasks)
3566 }
3567
3568 fn try_read_task_output(
3569 &self,
3570 task_id: TaskId,
3571 reader: Option<TaskId>,
3572 options: ReadOutputOptions,
3573 turbo_tasks: &TurboTasks<Self>,
3574 ) -> Result<Result<RawVc, EventListener>> {
3575 self.try_read_task_output(task_id, reader, options, turbo_tasks)
3576 }
3577
3578 fn try_read_task_cell(
3579 &self,
3580 task_id: TaskId,
3581 cell: CellId,
3582 reader: Option<TaskId>,
3583 options: ReadCellOptions,
3584 turbo_tasks: &TurboTasks<Self>,
3585 ) -> Result<Result<TypedCellContent, EventListener>> {
3586 self.try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3587 }
3588
3589 fn try_read_own_task_cell(
3590 &self,
3591 task_id: TaskId,
3592 cell: CellId,
3593 turbo_tasks: &TurboTasks<Self>,
3594 ) -> Result<TypedCellContent> {
3595 self.try_read_own_task_cell(task_id, cell, turbo_tasks)
3596 }
3597
3598 fn read_task_collectibles(
3599 &self,
3600 task_id: TaskId,
3601 collectible_type: TraitTypeId,
3602 reader: Option<TaskId>,
3603 turbo_tasks: &TurboTasks<Self>,
3604 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3605 self.read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3606 }
3607
3608 fn emit_collectible(
3609 &self,
3610 collectible_type: TraitTypeId,
3611 collectible: RawVc,
3612 task_id: TaskId,
3613 turbo_tasks: &TurboTasks<Self>,
3614 ) {
3615 self.emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3616 }
3617
3618 fn unemit_collectible(
3619 &self,
3620 collectible_type: TraitTypeId,
3621 collectible: RawVc,
3622 count: u32,
3623 task_id: TaskId,
3624 turbo_tasks: &TurboTasks<Self>,
3625 ) {
3626 self.unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3627 }
3628
3629 fn update_task_cell(
3630 &self,
3631 task_id: TaskId,
3632 cell: CellId,
3633 content: CellContent,
3634 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3635 content_hash: Option<CellHash>,
3636 verification_mode: VerificationMode,
3637 turbo_tasks: &TurboTasks<Self>,
3638 ) {
3639 self.update_task_cell(
3640 task_id,
3641 cell,
3642 content,
3643 updated_key_hashes,
3644 content_hash,
3645 verification_mode,
3646 turbo_tasks,
3647 );
3648 }
3649
3650 fn mark_own_task_as_finished(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3651 self.mark_own_task_as_finished(task_id, turbo_tasks);
3652 }
3653
3654 fn connect_task(
3655 &self,
3656 task: TaskId,
3657 parent_task: Option<TaskId>,
3658 turbo_tasks: &TurboTasks<Self>,
3659 ) {
3660 self.connect_task(task, parent_task, turbo_tasks);
3661 }
3662
3663 fn create_transient_task(
3664 &self,
3665 task_type: TransientTaskType,
3666 _turbo_tasks: &TurboTasks<Self>,
3667 ) -> TaskId {
3668 self.create_transient_task(task_type)
3669 }
3670
3671 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3672 self.dispose_root_task(task_id, turbo_tasks);
3673 }
3674
3675 fn task_statistics(&self) -> &TaskStatisticsApi {
3676 &self.task_statistics
3677 }
3678
3679 fn is_tracking_dependencies(&self) -> bool {
3680 self.options.dependency_tracking
3681 }
3682
3683 fn get_task_name(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) -> String {
3684 self.get_task_name(task, turbo_tasks)
3685 }
3686}
3687
3688enum DebugTraceTransientTask {
3689 Cached {
3690 task_name: &'static str,
3691 cell_type_id: Option<ValueTypeId>,
3692 cause_self: Option<Box<DebugTraceTransientTask>>,
3693 cause_args: Vec<DebugTraceTransientTask>,
3694 },
3695 Collapsed {
3697 task_name: &'static str,
3698 cell_type_id: Option<ValueTypeId>,
3699 },
3700 Uncached {
3701 cell_type_id: Option<ValueTypeId>,
3702 },
3703}
3704
3705impl DebugTraceTransientTask {
3706 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3707 let indent = " ".repeat(level);
3708 f.write_str(&indent)?;
3709
3710 fn fmt_cell_type_id(
3711 f: &mut fmt::Formatter<'_>,
3712 cell_type_id: Option<ValueTypeId>,
3713 ) -> fmt::Result {
3714 if let Some(ty) = cell_type_id {
3715 write!(
3716 f,
3717 " (read cell of type {})",
3718 get_value_type(ty).ty.global_name
3719 )
3720 } else {
3721 Ok(())
3722 }
3723 }
3724
3725 match self {
3727 Self::Cached {
3728 task_name,
3729 cell_type_id,
3730 ..
3731 }
3732 | Self::Collapsed {
3733 task_name,
3734 cell_type_id,
3735 ..
3736 } => {
3737 f.write_str(task_name)?;
3738 fmt_cell_type_id(f, *cell_type_id)?;
3739 if matches!(self, Self::Collapsed { .. }) {
3740 f.write_str(" (collapsed)")?;
3741 }
3742 }
3743 Self::Uncached { cell_type_id } => {
3744 f.write_str("unknown transient task")?;
3745 fmt_cell_type_id(f, *cell_type_id)?;
3746 }
3747 }
3748 f.write_char('\n')?;
3749
3750 if let Self::Cached {
3752 cause_self,
3753 cause_args,
3754 ..
3755 } = self
3756 {
3757 if let Some(c) = cause_self {
3758 writeln!(f, "{indent} self:")?;
3759 c.fmt_indented(f, level + 1)?;
3760 }
3761 if !cause_args.is_empty() {
3762 writeln!(f, "{indent} args:")?;
3763 for c in cause_args {
3764 c.fmt_indented(f, level + 1)?;
3765 }
3766 }
3767 }
3768 Ok(())
3769 }
3770}
3771
3772impl fmt::Display for DebugTraceTransientTask {
3773 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3774 self.fmt_indented(f, 0)
3775 }
3776}
3777
3778fn far_future() -> Instant {
3780 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3785}
3786
3787fn encode_task_data(
3799 task: TaskId,
3800 data: &TaskStorage,
3801 category: SpecificTaskDataCategory,
3802 scratch_buffer: &mut TurboBincodeBuffer,
3803) -> Result<TurboBincodeBuffer> {
3804 scratch_buffer.clear();
3805 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3806 data.encode(category, &mut encoder)?;
3807
3808 if cfg!(feature = "verify_serialization") {
3809 TaskStorage::new()
3810 .decode(
3811 category,
3812 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3813 )
3814 .with_context(|| {
3815 format!(
3816 "expected to be able to decode serialized data for '{category:?}' information \
3817 for {task}"
3818 )
3819 })?;
3820 }
3821 Ok(SmallVec::from_slice(scratch_buffer))
3822}