1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9 borrow::Cow,
10 fmt::{self, Write},
11 future::Future,
12 hash::BuildHasherDefault,
13 mem::take,
14 pin::Pin,
15 sync::{
16 Arc, LazyLock,
17 atomic::{AtomicBool, Ordering},
18 },
19 time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32 CellId, DynTaskInputsStorage, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
33 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
34 TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasks, TurboTasksCallApi,
35 TurboTasksPanic, ValueTypeId,
36 backend::{
37 Backend, CachedTaskType, CachedTaskTypeArc, CellContent, CellHash, TaskExecutionSpec,
38 TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40 VerificationMode,
41 },
42 event::{Event, EventDescription, EventListener},
43 macro_helpers::NativeFunction,
44 message_queue::{TimingEvent, TraceEvent},
45 registry::get_value_type,
46 scope::scope_and_block,
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51#[cfg(feature = "task_dirty_cause")]
52use turbo_tasks::{FunctionId, TaskDirtyCause};
53
54pub use self::{
55 operation::AnyOperation,
56 storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory},
57};
58use crate::{
59 backend::{
60 operation::{
61 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType, TaskTypeRef,
64 connect_children, get_aggregation_number, get_uppers, make_task_dirty_internal,
65 prepare_new_children,
66 },
67 snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68 storage::Storage,
69 storage_schema::{TaskStorage, TaskStorageAccessors},
70 },
71 backing_storage::{BackingStorage, SnapshotItem, SnapshotMeta, compute_task_type_hash},
72 data::{
73 ActivenessState, CellDependency, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
74 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, TransientTask,
75 },
76 error::TaskError,
77 utils::{
78 dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
79 shard_amount::compute_shard_amount,
80 },
81};
82
83const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
87
88static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92 .ok()
93 .and_then(|v| v.parse::<u64>().ok())
94 .map(Duration::from_millis)
95 .unwrap_or(Duration::from_secs(2))
96});
97
98fn compute_stale_priority(task: &impl TaskGuard) -> TaskPriority {
105 TaskPriority::invalidation(
106 task.get_leaf_distance()
107 .copied()
108 .unwrap_or_default()
109 .distance,
110 )
111 .in_parent(task.is_dirty().unwrap_or(TaskPriority::leaf()))
112}
113
114pub enum StorageMode {
115 ReadOnly,
117 ReadWrite,
120 ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126 pub dependency_tracking: bool,
131
132 pub active_tracking: bool,
138
139 pub storage_mode: Option<StorageMode>,
141
142 pub num_workers: Option<usize>,
145
146 pub small_preallocation: bool,
148
149 pub evict_after_snapshot: bool,
153}
154
155impl Default for BackendOptions {
156 fn default() -> Self {
157 Self {
158 dependency_tracking: true,
159 active_tracking: true,
160 storage_mode: Some(StorageMode::ReadWrite),
161 num_workers: None,
162 small_preallocation: false,
163 evict_after_snapshot: false,
164 }
165 }
166}
167
168pub enum TurboTasksBackendJob {
169 Snapshot,
170}
171
172pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
173
174struct TurboTasksBackendInner<B: BackingStorage> {
175 options: BackendOptions,
176
177 start_time: Instant,
178
179 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
180 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
181
182 storage: Storage,
183
184 snapshot_coord: SnapshotCoordinator,
187 snapshot_in_progress: Mutex<()>,
192
193 stopping: AtomicBool,
194 stopping_event: Event,
195 idle_start_event: Event,
196 idle_end_event: Event,
197 #[cfg(feature = "verify_aggregation_graph")]
198 is_idle: AtomicBool,
199
200 task_statistics: TaskStatisticsApi,
201
202 backing_storage: B,
203
204 #[cfg(feature = "verify_aggregation_graph")]
205 root_tasks: Mutex<FxHashSet<TaskId>>,
206}
207
208impl<B: BackingStorage> TurboTasksBackend<B> {
209 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
210 Self(Arc::new(TurboTasksBackendInner::new(
211 options,
212 backing_storage,
213 )))
214 }
215
216 pub fn backing_storage(&self) -> &B {
217 &self.0.backing_storage
218 }
219
220 pub fn snapshot_and_evict_for_testing(
227 &self,
228 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
229 ) -> (bool, EvictionCounts) {
230 self.0.snapshot_and_evict_for_testing(turbo_tasks)
231 }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237 if !options.dependency_tracking {
238 options.active_tracking = false;
239 }
240 let small_preallocation = options.small_preallocation;
241 let next_task_id = backing_storage
242 .next_free_task_id()
243 .expect("Failed to get task id");
244 Self {
245 options,
246 start_time: Instant::now(),
247 persisted_task_id_factory: IdFactoryWithReuse::new(
248 next_task_id,
249 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
250 ),
251 transient_task_id_factory: IdFactoryWithReuse::new(
252 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
253 TaskId::MAX,
254 ),
255 storage: Storage::new(shard_amount, small_preallocation),
256 snapshot_coord: SnapshotCoordinator::new(),
257 snapshot_in_progress: Mutex::new(()),
258 stopping: AtomicBool::new(false),
259 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
260 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
261 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
262 #[cfg(feature = "verify_aggregation_graph")]
263 is_idle: AtomicBool::new(false),
264 task_statistics: TaskStatisticsApi::default(),
265 backing_storage,
266 #[cfg(feature = "verify_aggregation_graph")]
267 root_tasks: Default::default(),
268 }
269 }
270
271 fn execute_context<'a>(
272 &'a self,
273 turbo_tasks: &'a TurboTasks<TurboTasksBackend<B>>,
274 ) -> impl ExecuteContext<'a> {
275 ExecuteContextImpl::new(self, turbo_tasks)
276 }
277
278 fn suspending_requested(&self) -> bool {
279 self.should_persist() && self.snapshot_coord.snapshot_pending()
280 }
281
282 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
283 if self.should_persist() {
284 self.snapshot_coord.suspend_point(suspend);
285 }
286 }
287
288 pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
289 if !self.should_persist() {
290 return OperationGuard::noop();
291 }
292 self.snapshot_coord.begin_operation()
293 }
294
295 fn should_persist(&self) -> bool {
296 matches!(
297 self.options.storage_mode,
298 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
299 )
300 }
301
302 fn should_evict(&self) -> bool {
303 self.options.evict_after_snapshot && self.should_persist()
304 }
305
306 #[doc(hidden)]
313 pub fn snapshot_and_evict_for_testing(
314 &self,
315 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
316 ) -> (bool, EvictionCounts) {
317 assert!(
318 self.should_persist(),
319 "snapshot_and_evict requires persistence"
320 );
321 let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks);
322 let had_new_data = match snapshot_result {
323 Ok((_, new_data)) => new_data,
324 Err(_) => {
325 return (false, EvictionCounts::default());
329 }
330 };
331 let counts = self.storage.evict_after_snapshot(None);
332 (had_new_data, counts)
333 }
334
335 fn should_restore(&self) -> bool {
336 self.options.storage_mode.is_some()
337 }
338
339 fn should_track_dependencies(&self) -> bool {
340 self.options.dependency_tracking
341 }
342
343 fn should_track_activeness(&self) -> bool {
344 self.options.active_tracking
345 }
346
347 fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
348 self.task_statistics
349 .map(|stats| stats.increment_cache_hit(native_fn));
350 }
351
352 fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
353 self.task_statistics
354 .map(|stats| stats.increment_cache_miss(native_fn));
355 }
356
357 fn task_error_to_turbo_tasks_execution_error(
362 &self,
363 error: &TaskError,
364 ctx: &mut impl ExecuteContext<'_>,
365 ) -> TurboTasksExecutionError {
366 match error {
367 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
368 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
369 message: item.message.clone(),
370 source: item
371 .source
372 .as_ref()
373 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
374 })),
375 TaskError::LocalTaskContext(local_task_context) => {
376 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
377 name: local_task_context.name.clone(),
378 source: local_task_context
379 .source
380 .as_ref()
381 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382 }))
383 }
384 TaskError::TaskChain(chain) => {
385 let task_id = chain.last().unwrap();
386 let error = {
387 let task = ctx.task(*task_id, TaskDataCategory::Meta);
388 if let Some(OutputValue::Error(error)) = task.get_output() {
389 Some(error.clone())
390 } else {
391 None
392 }
393 };
394 let error = error.map_or_else(
395 || {
396 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
398 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
399 "Error no longer available",
400 )),
401 location: None,
402 }))
403 },
404 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
405 );
406 let mut current_error = error;
407 for &task_id in chain.iter().rev() {
408 current_error =
409 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
410 task_id,
411 source: Some(current_error),
412 turbo_tasks: ctx.turbo_tasks(),
413 }));
414 }
415 current_error
416 }
417 }
418 }
419}
420
421struct TaskExecutionCompletePrepareResult {
423 pub new_children: FxHashSet<TaskId>,
424 pub is_now_immutable: bool,
425 #[cfg(feature = "verify_determinism")]
426 pub no_output_set: bool,
427 #[cfg(feature = "task_dirty_cause")]
428 pub function_id: Option<FunctionId>,
429 pub new_output: Option<OutputValue>,
430 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
431 pub is_recomputation: bool,
432 pub is_session_dependent: bool,
433}
434
435impl<B: BackingStorage> TurboTasksBackendInner<B> {
437 fn try_read_task_output(
438 self: &Arc<Self>,
439 task_id: TaskId,
440 reader: Option<TaskId>,
441 options: ReadOutputOptions,
442 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
443 ) -> Result<Result<RawVc, EventListener>> {
444 self.assert_not_persistent_calling_transient(reader, task_id, None);
445
446 let mut ctx = self.execute_context(turbo_tasks);
447 let need_reader_task = if self.should_track_dependencies()
448 && !matches!(options.tracking, ReadTracking::Untracked)
449 && reader.is_some_and(|reader_id| reader_id != task_id)
450 && let Some(reader_id) = reader
451 && reader_id != task_id
452 {
453 Some(reader_id)
454 } else {
455 None
456 };
457 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
458 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
462 (task, Some(reader))
463 } else {
464 (ctx.task(task_id, TaskDataCategory::All), None)
465 };
466
467 fn listen_to_done_event(
468 reader_description: Option<EventDescription>,
469 tracking: ReadTracking,
470 done_event: &Event,
471 ) -> EventListener {
472 done_event.listen_with_note(move || {
473 move || {
474 if let Some(reader_description) = reader_description.as_ref() {
475 format!(
476 "try_read_task_output from {} ({})",
477 reader_description, tracking
478 )
479 } else {
480 format!("try_read_task_output ({})", tracking)
481 }
482 }
483 })
484 }
485
486 fn check_in_progress(
487 task: &impl TaskGuard,
488 reader_description: Option<EventDescription>,
489 tracking: ReadTracking,
490 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
491 {
492 match task.get_in_progress() {
493 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
494 listen_to_done_event(reader_description, tracking, done_event),
495 ))),
496 Some(InProgressState::InProgress(box InProgressStateInner {
497 done_event, ..
498 })) => Some(Ok(Err(listen_to_done_event(
499 reader_description,
500 tracking,
501 done_event,
502 )))),
503 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
504 "{} was canceled",
505 task.get_task_description()
506 ))),
507 None => None,
508 }
509 }
510
511 if matches!(options.consistency, ReadConsistency::Strong) {
512 if task
513 .get_persistent_task_type()
514 .is_some_and(|t| !t.native_fn.is_root)
515 {
516 drop(task);
517 drop(reader_task);
518 panic!(
519 "Strongly consistent read of non-root task {} (reader: {}). The `root` \
520 attribute is missing on the task.",
521 self.debug_get_task_description(task_id),
522 reader.map_or_else(
523 || "unknown".to_string(),
524 |r| self.debug_get_task_description(r)
525 )
526 );
527 }
528
529 let is_dirty = task.is_dirty();
530
531 let has_dirty_containers = task.has_dirty_containers();
533 if has_dirty_containers || is_dirty.is_some() {
534 let activeness = task.get_activeness_mut();
535 let mut task_ids_to_schedule: Vec<_> = Vec::new();
536 let activeness = if let Some(activeness) = activeness {
538 activeness.set_active_until_clean();
542 activeness
543 } else {
544 if ctx.should_track_activeness() {
548 task_ids_to_schedule = task.dirty_containers().collect();
550 task_ids_to_schedule.push(task_id);
551 }
552 let activeness =
553 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
554 activeness.set_active_until_clean();
555 activeness
556 };
557 let listener = activeness.all_clean_event.listen_with_note(move || {
558 let this = self.clone();
559 let tt = turbo_tasks.pin();
560 move || {
561 let mut ctx = this.execute_context(&tt);
562 let mut visited = FxHashSet::default();
563 fn indent(s: &str) -> String {
564 s.split_inclusive('\n')
565 .flat_map(|line: &str| [" ", line].into_iter())
566 .collect::<String>()
567 }
568 fn get_info(
569 ctx: &mut impl ExecuteContext<'_>,
570 task_id: TaskId,
571 parent_and_count: Option<(TaskId, i32)>,
572 visited: &mut FxHashSet<TaskId>,
573 ) -> String {
574 let task = ctx.task(task_id, TaskDataCategory::All);
575 let is_dirty = task.is_dirty();
576 let in_progress =
577 task.get_in_progress()
578 .map_or("not in progress", |p| match p {
579 InProgressState::InProgress(_) => "in progress",
580 InProgressState::Scheduled { .. } => "scheduled",
581 InProgressState::Canceled => "canceled",
582 });
583 let activeness = task.get_activeness().map_or_else(
584 || "not active".to_string(),
585 |activeness| format!("{activeness:?}"),
586 );
587 let aggregation_number = get_aggregation_number(&task);
588 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
589 {
590 let uppers = get_uppers(&task);
591 !uppers.contains(&parent_task_id)
592 } else {
593 false
594 };
595
596 let has_dirty_containers = task.has_dirty_containers();
598
599 let task_description = task.get_task_description();
600 let is_dirty_label = if let Some(parent_priority) = is_dirty {
601 format!(", dirty({parent_priority})")
602 } else {
603 String::new()
604 };
605 let has_dirty_containers_label = if has_dirty_containers {
606 ", dirty containers"
607 } else {
608 ""
609 };
610 let count = if let Some((_, count)) = parent_and_count {
611 format!(" {count}")
612 } else {
613 String::new()
614 };
615 let mut info = format!(
616 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
617 {in_progress}, \
618 {activeness}{is_dirty_label}{has_dirty_containers_label})",
619 );
620 let children: Vec<_> = task.dirty_containers_with_count().collect();
621 drop(task);
622
623 if missing_upper {
624 info.push_str("\n ERROR: missing upper connection");
625 }
626
627 if has_dirty_containers || !children.is_empty() {
628 writeln!(info, "\n dirty tasks:").unwrap();
629
630 for (child_task_id, count) in children {
631 let task_description = ctx
632 .task(child_task_id, TaskDataCategory::Data)
633 .get_task_description();
634 if visited.insert(child_task_id) {
635 let child_info = get_info(
636 ctx,
637 child_task_id,
638 Some((task_id, count)),
639 visited,
640 );
641 info.push_str(&indent(&child_info));
642 if !info.ends_with('\n') {
643 info.push('\n');
644 }
645 } else {
646 writeln!(
647 info,
648 " {child_task_id} {task_description} {count} \
649 (already visited)"
650 )
651 .unwrap();
652 }
653 }
654 }
655 info
656 }
657 let info = get_info(&mut ctx, task_id, None, &mut visited);
658 format!(
659 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
660 )
661 }
662 });
663 drop(reader_task);
664 drop(task);
665 if !task_ids_to_schedule.is_empty() {
666 let mut queue = AggregationUpdateQueue::new();
667 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
668 queue.execute(&mut ctx);
669 }
670
671 return Ok(Err(listener));
672 }
673 }
674
675 let reader_description = reader_task
676 .as_ref()
677 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
678 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
679 {
680 return value;
681 }
682
683 if let Some(output) = task.get_output() {
684 let result = match output {
685 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
686 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
687 OutputValue::Error(error) => Err(error.clone()),
688 };
689 if let Some(mut reader_task) = reader_task.take()
690 && options.tracking.should_track(result.is_err())
691 && (!task.immutable() || cfg!(feature = "verify_immutable"))
692 {
693 #[cfg(feature = "trace_task_output_dependencies")]
694 let _span = tracing::trace_span!(
695 "add output dependency",
696 task = %task_id,
697 dependent_task = ?reader
698 )
699 .entered();
700 let mut queue = LeafDistanceUpdateQueue::new();
701 let reader = reader.unwrap();
702 if task.add_output_dependent(reader) {
703 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
705 let reader_leaf_distance =
706 reader_task.get_leaf_distance().copied().unwrap_or_default();
707 if reader_leaf_distance.distance <= leaf_distance.distance {
708 queue.push(
709 reader,
710 leaf_distance.distance,
711 leaf_distance.max_distance_in_buffer,
712 );
713 }
714 }
715
716 drop(task);
717
718 if !reader_task.remove_outdated_output_dependencies(&task_id) {
724 let _ = reader_task.add_output_dependencies(task_id);
725 }
726 drop(reader_task);
727
728 queue.execute(&mut ctx);
729 } else {
730 drop(task);
731 }
732
733 return result.map_err(|error| {
734 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
735 .with_task_context(task_id, turbo_tasks.pin())
736 .into()
737 });
738 }
739 drop(reader_task);
740
741 let note = EventDescription::new(|| {
742 move || {
743 if let Some(reader) = reader_description.as_ref() {
744 format!("try_read_task_output (recompute) from {reader}",)
745 } else {
746 "try_read_task_output (recompute, untracked)".to_string()
747 }
748 }
749 });
750
751 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
753 TaskExecutionReason::OutputNotAvailable,
754 EventDescription::new(|| task.get_task_desc_fn()),
755 note,
756 );
757
758 let old = task.set_in_progress(in_progress_state);
761 debug_assert!(old.is_none(), "InProgress already exists");
762 ctx.schedule_task(task, TaskPriority::Recomputation);
763
764 Ok(Err(listener))
765 }
766
767 fn try_read_task_cell(
768 &self,
769 task_id: TaskId,
770 reader: Option<TaskId>,
771 cell: CellId,
772 options: ReadCellOptions,
773 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
774 ) -> Result<Result<TypedCellContent, EventListener>> {
775 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
776
777 fn add_cell_dependency(
778 task_id: TaskId,
779 mut task: impl TaskGuard,
780 reader: Option<TaskId>,
781 reader_task: Option<impl TaskGuard>,
782 cell: CellId,
783 key: Option<u64>,
784 ) {
785 if let Some(mut reader_task) = reader_task
786 && (!task.immutable() || cfg!(feature = "verify_immutable"))
787 {
788 let reader = reader.unwrap();
789 let _ = task
790 .add_cell_dependents(CellDependency::new(CellRef { task: reader, cell }, key));
791 drop(task);
792
793 let target = CellRef {
799 task: task_id,
800 cell,
801 };
802 let dep = CellDependency::new(target, key);
803 if !reader_task.remove_outdated_cell_dependencies(&dep) {
804 let _ = reader_task.add_cell_dependencies(dep);
805 }
806 drop(reader_task);
807 }
808 }
809
810 let ReadCellOptions {
811 tracking,
812 final_read_hint,
813 } = options;
814
815 let mut ctx = self.execute_context(turbo_tasks);
816 let (mut task, reader_task) = if self.should_track_dependencies()
817 && !matches!(tracking, ReadCellTracking::Untracked)
818 && let Some(reader_id) = reader
819 && reader_id != task_id
820 {
821 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
825 (task, Some(reader))
826 } else {
827 (ctx.task(task_id, TaskDataCategory::All), None)
828 };
829
830 let content = if final_read_hint {
831 task.remove_cell_data(&cell)
832 } else {
833 task.get_cell_data(&cell).cloned()
834 };
835 if let Some(content) = content {
836 if tracking.should_track(false) {
837 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
838 }
839 return Ok(Ok(TypedCellContent(
840 cell.type_id,
841 CellContent(Some(content)),
842 )));
843 }
844
845 let in_progress = task.get_in_progress();
846 if matches!(
847 in_progress,
848 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
849 ) {
850 return Ok(Err(self
851 .listen_to_cell(&mut task, task_id, &reader_task, cell)
852 .0));
853 }
854 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
855
856 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
858 let Some(max_id) = max_id else {
859 let task_desc = task.get_task_description();
860 if tracking.should_track(true) {
861 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
862 }
863 bail!(
864 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
865 );
866 };
867 if cell.index >= max_id {
868 let task_desc = task.get_task_description();
869 if tracking.should_track(true) {
870 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
871 }
872 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
873 }
874
875 if is_cancelled {
881 bail!("{} was canceled", task.get_task_description());
882 }
883
884 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
886 drop(reader_task);
887 if !new_listener {
888 return Ok(Err(listener));
889 }
890
891 let _span = tracing::trace_span!(
892 "recomputation",
893 cell_type = get_value_type(cell.type_id).ty.global_name,
894 cell_index = cell.index
895 )
896 .entered();
897
898 let _ = task.add_scheduled(
899 TaskExecutionReason::CellNotAvailable,
900 EventDescription::new(|| task.get_task_desc_fn()),
901 );
902 ctx.schedule_task(task, TaskPriority::Recomputation);
903
904 Ok(Err(listener))
905 }
906
907 fn listen_to_cell(
908 &self,
909 task: &mut impl TaskGuard,
910 task_id: TaskId,
911 reader_task: &Option<impl TaskGuard>,
912 cell: CellId,
913 ) -> (EventListener, bool) {
914 let note = || {
915 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
916 move || {
917 if let Some(reader_desc) = reader_desc.as_ref() {
918 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
919 } else {
920 "try_read_task_cell (in progress, untracked)".to_string()
921 }
922 }
923 };
924 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
925 let listener = in_progress.event.listen_with_note(note);
927 return (listener, false);
928 }
929 let in_progress = InProgressCellState::new(task_id, cell);
930 let listener = in_progress.event.listen_with_note(note);
931 let old = task.insert_in_progress_cells(cell, in_progress);
932 debug_assert!(old.is_none(), "InProgressCell already exists");
933 (listener, true)
934 }
935
936 fn snapshot_and_persist(
937 &self,
938 parent_span: Option<tracing::Id>,
939 reason: &str,
940 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
941 ) -> Result<(Instant, bool), anyhow::Error> {
942 let snapshot_span =
943 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
944 .entered();
945 let _snapshot_in_progress = self.snapshot_in_progress.lock();
949 let start = Instant::now();
950 let wall_start = SystemTime::now();
954 debug_assert!(self.should_persist());
955
956 let mut snapshot_phase = {
957 let _span = tracing::info_span!("blocking").entered();
958 self.snapshot_coord.begin_snapshot()
959 };
960 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
963
964 let suspended_operations = snapshot_phase.take_suspended_operations();
965
966 let snapshot_time = Instant::now();
967 drop(snapshot_phase);
968
969 if !has_modifications {
970 drop(snapshot_guard);
973 return Ok((start, false));
974 }
975
976 #[cfg(feature = "print_cache_item_size")]
977 #[derive(Default)]
978 struct TaskCacheStats {
979 data: usize,
980 #[cfg(feature = "print_cache_item_size_with_compressed")]
981 data_compressed: usize,
982 data_count: usize,
983 meta: usize,
984 #[cfg(feature = "print_cache_item_size_with_compressed")]
985 meta_compressed: usize,
986 meta_count: usize,
987 upper_count: usize,
988 collectibles_count: usize,
989 aggregated_collectibles_count: usize,
990 children_count: usize,
991 followers_count: usize,
992 collectibles_dependents_count: usize,
993 aggregated_dirty_containers_count: usize,
994 output_size: usize,
995 }
996 #[cfg(feature = "print_cache_item_size")]
999 struct FormatSizes {
1000 size: usize,
1001 #[cfg(feature = "print_cache_item_size_with_compressed")]
1002 compressed_size: usize,
1003 }
1004 #[cfg(feature = "print_cache_item_size")]
1005 impl std::fmt::Display for FormatSizes {
1006 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1007 use turbo_tasks::util::FormatBytes;
1008 #[cfg(feature = "print_cache_item_size_with_compressed")]
1009 {
1010 write!(
1011 f,
1012 "{} ({} compressed)",
1013 FormatBytes(self.size),
1014 FormatBytes(self.compressed_size)
1015 )
1016 }
1017 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1018 {
1019 write!(f, "{}", FormatBytes(self.size))
1020 }
1021 }
1022 }
1023 #[cfg(feature = "print_cache_item_size")]
1024 impl TaskCacheStats {
1025 #[cfg(feature = "print_cache_item_size_with_compressed")]
1026 fn compressed_size(data: &[u8]) -> Result<usize> {
1027 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1028 data,
1029 &mut Vec::new(),
1030 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1031 )?)
1032 }
1033
1034 fn add_data(&mut self, data: &[u8]) {
1035 self.data += data.len();
1036 #[cfg(feature = "print_cache_item_size_with_compressed")]
1037 {
1038 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1039 }
1040 self.data_count += 1;
1041 }
1042
1043 fn add_meta(&mut self, data: &[u8]) {
1044 self.meta += data.len();
1045 #[cfg(feature = "print_cache_item_size_with_compressed")]
1046 {
1047 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1048 }
1049 self.meta_count += 1;
1050 }
1051
1052 fn add_counts(&mut self, storage: &TaskStorage) {
1053 let counts = storage.meta_counts();
1054 self.upper_count += counts.upper;
1055 self.collectibles_count += counts.collectibles;
1056 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1057 self.children_count += counts.children;
1058 self.followers_count += counts.followers;
1059 self.collectibles_dependents_count += counts.collectibles_dependents;
1060 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1061 if let Some(output) = storage.get_output() {
1062 use turbo_bincode::turbo_bincode_encode;
1063
1064 self.output_size += turbo_bincode_encode(&output)
1065 .map(|data| data.len())
1066 .unwrap_or(0);
1067 }
1068 }
1069
1070 fn task_name(storage: &TaskStorage) -> String {
1072 storage
1073 .get_persistent_task_type()
1074 .map(|t| t.to_string())
1075 .unwrap_or_else(|| "<unknown>".to_string())
1076 }
1077
1078 fn sort_key(&self) -> usize {
1081 #[cfg(feature = "print_cache_item_size_with_compressed")]
1082 {
1083 self.data_compressed + self.meta_compressed
1084 }
1085 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1086 {
1087 self.data + self.meta
1088 }
1089 }
1090
1091 fn format_total(&self) -> FormatSizes {
1092 FormatSizes {
1093 size: self.data + self.meta,
1094 #[cfg(feature = "print_cache_item_size_with_compressed")]
1095 compressed_size: self.data_compressed + self.meta_compressed,
1096 }
1097 }
1098
1099 fn format_data(&self) -> FormatSizes {
1100 FormatSizes {
1101 size: self.data,
1102 #[cfg(feature = "print_cache_item_size_with_compressed")]
1103 compressed_size: self.data_compressed,
1104 }
1105 }
1106
1107 fn format_avg_data(&self) -> FormatSizes {
1108 FormatSizes {
1109 size: self.data.checked_div(self.data_count).unwrap_or(0),
1110 #[cfg(feature = "print_cache_item_size_with_compressed")]
1111 compressed_size: self
1112 .data_compressed
1113 .checked_div(self.data_count)
1114 .unwrap_or(0),
1115 }
1116 }
1117
1118 fn format_meta(&self) -> FormatSizes {
1119 FormatSizes {
1120 size: self.meta,
1121 #[cfg(feature = "print_cache_item_size_with_compressed")]
1122 compressed_size: self.meta_compressed,
1123 }
1124 }
1125
1126 fn format_avg_meta(&self) -> FormatSizes {
1127 FormatSizes {
1128 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1129 #[cfg(feature = "print_cache_item_size_with_compressed")]
1130 compressed_size: self
1131 .meta_compressed
1132 .checked_div(self.meta_count)
1133 .unwrap_or(0),
1134 }
1135 }
1136 }
1137 #[cfg(feature = "print_cache_item_size")]
1138 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1139 Mutex::new(FxHashMap::default());
1140
1141 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1148 let encode_category = |task_id: TaskId,
1149 data: &TaskStorage,
1150 category: SpecificTaskDataCategory,
1151 buffer: &mut TurboBincodeBuffer|
1152 -> Option<TurboBincodeBuffer> {
1153 match encode_task_data(task_id, data, category, buffer) {
1154 Ok(encoded) => {
1155 #[cfg(feature = "print_cache_item_size")]
1156 {
1157 let mut stats = task_cache_stats.lock();
1158 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1159 match category {
1160 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1161 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1162 }
1163 }
1164 Some(encoded)
1165 }
1166 Err(err) => {
1167 panic!(
1168 "Serializing task {} failed ({:?}): {:?}",
1169 self.debug_get_task_description(task_id),
1170 category,
1171 err
1172 );
1173 }
1174 }
1175 };
1176 if task_id.is_transient() {
1177 unreachable!("transient task_ids should never be enqueued to be persisted");
1178 }
1179
1180 let encode_meta = inner.flags.meta_modified();
1181 let encode_data = inner.flags.data_modified();
1182
1183 #[cfg(feature = "print_cache_item_size")]
1184 if encode_data || encode_meta {
1185 task_cache_stats
1186 .lock()
1187 .entry(TaskCacheStats::task_name(inner))
1188 .or_default()
1189 .add_counts(inner);
1190 }
1191
1192 let meta = if encode_meta {
1193 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1194 } else {
1195 None
1196 };
1197
1198 let data = if encode_data {
1199 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1200 } else {
1201 None
1202 };
1203 let task_type_hash = if inner.flags.new_task() {
1204 let task_type = inner.get_persistent_task_type().expect(
1205 "It is not possible for a new_task to not have a persistent_task_type. Task \
1206 creation for persistent tasks uses a single ExecutionContextImpl for \
1207 creating the task (which sets new_task) and connect_child (which sets \
1208 persistent_task_type) and take_snapshot waits for all operations to complete \
1209 or suspend before we start snapshotting. So task creation will always set \
1210 the task_type.",
1211 );
1212 Some(compute_task_type_hash(task_type))
1213 } else {
1214 None
1215 };
1216
1217 SnapshotItem {
1218 task_id,
1219 meta,
1220 data,
1221 task_type_hash,
1222 }
1223 };
1224
1225 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1226
1227 drop(snapshot_span);
1228 let snapshot_duration = start.elapsed();
1229 let task_count = task_snapshots.len();
1230
1231 if task_snapshots.is_empty() {
1232 std::hint::cold_path();
1235 return Ok((snapshot_time, false));
1236 }
1237
1238 let persist_start = Instant::now();
1239 let span = tracing::info_span!(
1240 parent: parent_span,
1241 "persist",
1242 reason = reason,
1243 data_items= tracing::field::Empty,
1244 meta_items= tracing::field::Empty,
1245 task_cache_items= tracing::field::Empty,
1246 next_task_id= tracing::field::Empty,)
1247 .entered();
1248 {
1249 let SnapshotMeta {
1253 task_cache_items,
1254 data_items,
1255 meta_items,
1256 max_next_task_id,
1257 } = self
1258 .backing_storage
1259 .save_snapshot(suspended_operations, task_snapshots)?;
1260 span.record("data_items", data_items);
1261 span.record("meta_items", meta_items);
1262 span.record("task_cache_items", task_cache_items);
1263 span.record("next_task_id", max_next_task_id);
1264
1265 #[cfg(feature = "print_cache_item_size")]
1266 {
1267 let mut task_cache_stats = task_cache_stats
1268 .into_inner()
1269 .into_iter()
1270 .collect::<Vec<_>>();
1271 if !task_cache_stats.is_empty() {
1272 use turbo_tasks::util::FormatBytes;
1273
1274 use crate::utils::markdown_table::print_markdown_table;
1275
1276 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1277 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1278 });
1279
1280 println!(
1281 "Task cache stats: {}",
1282 FormatSizes {
1283 size: task_cache_stats
1284 .iter()
1285 .map(|(_, s)| s.data + s.meta)
1286 .sum::<usize>(),
1287 #[cfg(feature = "print_cache_item_size_with_compressed")]
1288 compressed_size: task_cache_stats
1289 .iter()
1290 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1291 .sum::<usize>()
1292 },
1293 );
1294
1295 print_markdown_table(
1296 [
1297 "Task",
1298 " Total Size",
1299 " Data Size",
1300 " Data Count x Avg",
1301 " Data Count x Avg",
1302 " Meta Size",
1303 " Meta Count x Avg",
1304 " Meta Count x Avg",
1305 " Uppers",
1306 " Coll",
1307 " Agg Coll",
1308 " Children",
1309 " Followers",
1310 " Coll Deps",
1311 " Agg Dirty",
1312 " Output Size",
1313 ],
1314 task_cache_stats.iter(),
1315 |(task_desc, stats)| {
1316 [
1317 task_desc.to_string(),
1318 format!(" {}", stats.format_total()),
1319 format!(" {}", stats.format_data()),
1320 format!(" {} x", stats.data_count),
1321 format!("{}", stats.format_avg_data()),
1322 format!(" {}", stats.format_meta()),
1323 format!(" {} x", stats.meta_count),
1324 format!("{}", stats.format_avg_meta()),
1325 format!(" {}", stats.upper_count),
1326 format!(" {}", stats.collectibles_count),
1327 format!(" {}", stats.aggregated_collectibles_count),
1328 format!(" {}", stats.children_count),
1329 format!(" {}", stats.followers_count),
1330 format!(" {}", stats.collectibles_dependents_count),
1331 format!(" {}", stats.aggregated_dirty_containers_count),
1332 format!(" {}", FormatBytes(stats.output_size)),
1333 ]
1334 },
1335 );
1336 }
1337 }
1338 }
1339
1340 let elapsed = start.elapsed();
1341 let persist_duration = persist_start.elapsed();
1342 if elapsed > Duration::from_secs(10) {
1344 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1345 "Finished writing to filesystem cache".to_string(),
1346 elapsed,
1347 )));
1348 }
1349
1350 let wall_start_ms = wall_start
1351 .duration_since(SystemTime::UNIX_EPOCH)
1352 .unwrap_or_default()
1353 .as_secs_f64()
1355 * 1000.0;
1356 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1357 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1358 "turbopack-persistence",
1359 wall_start_ms,
1360 wall_end_ms,
1361 vec![
1362 ("reason", serde_json::Value::from(reason)),
1363 (
1364 "snapshot_duration_ms",
1365 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1366 ),
1367 (
1368 "persist_duration_ms",
1369 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1370 ),
1371 ("task_count", serde_json::Value::from(task_count)),
1372 ],
1373 )));
1374
1375 Ok((snapshot_time, true))
1376 }
1377
1378 fn startup(&self, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1379 if self.should_restore() {
1380 let uncompleted_operations = self
1384 .backing_storage
1385 .uncompleted_operations()
1386 .expect("Failed to get uncompleted operations");
1387 if !uncompleted_operations.is_empty() {
1388 let mut ctx = self.execute_context(turbo_tasks);
1389 for op in uncompleted_operations {
1390 op.execute(&mut ctx);
1391 }
1392 }
1393 }
1394
1395 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1398 let _span = trace_span!("persisting background job").entered();
1400 let _span = tracing::info_span!("thread").entered();
1401 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot);
1402 }
1403 }
1404
1405 fn stopping(&self) {
1406 self.stopping.store(true, Ordering::Release);
1407 self.stopping_event.notify(usize::MAX);
1408 }
1409
1410 #[allow(unused_variables)]
1411 fn stop(&self, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1412 #[cfg(feature = "verify_aggregation_graph")]
1413 {
1414 self.is_idle.store(false, Ordering::Release);
1415 self.verify_aggregation_graph(turbo_tasks, false);
1416 }
1417 if self.should_persist()
1418 && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1419 {
1420 eprintln!("Persisting failed during shutdown: {err:?}");
1421 }
1422 self.storage.drop_contents();
1423 if let Err(err) = self.backing_storage.shutdown() {
1424 println!("Shutting down failed: {err}");
1425 }
1426 }
1427
1428 #[allow(unused_variables)]
1429 fn idle_start(self: &Arc<Self>, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1430 self.idle_start_event.notify(usize::MAX);
1431
1432 #[cfg(feature = "verify_aggregation_graph")]
1433 {
1434 use tokio::select;
1435
1436 self.is_idle.store(true, Ordering::Release);
1437 let this = self.clone();
1438 let turbo_tasks = turbo_tasks.pin();
1439 tokio::task::spawn(async move {
1440 select! {
1441 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1442 }
1444 _ = this.idle_end_event.listen() => {
1445 return;
1446 }
1447 }
1448 if !this.is_idle.load(Ordering::Relaxed) {
1449 return;
1450 }
1451 this.verify_aggregation_graph(&*turbo_tasks, true);
1452 });
1453 }
1454 }
1455
1456 fn idle_end(&self) {
1457 #[cfg(feature = "verify_aggregation_graph")]
1458 self.is_idle.store(false, Ordering::Release);
1459 self.idle_end_event.notify(usize::MAX);
1460 }
1461
1462 fn get_or_create_task(
1463 &self,
1464 native_fn: &'static NativeFunction,
1465 this: Option<RawVc>,
1466 arg: &mut dyn DynTaskInputsStorage,
1467 parent_task: Option<TaskId>,
1468 persistence: TaskPersistence,
1469 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1470 ) -> TaskId {
1471 let transient = matches!(persistence, TaskPersistence::Transient);
1472
1473 if transient
1474 && let Some(parent_task) = parent_task
1475 && !parent_task.is_transient()
1476 {
1477 let task_type = CachedTaskType {
1478 native_fn,
1479 this,
1480 arg: arg.take_box(),
1481 };
1482 self.panic_persistent_calling_transient(
1483 self.debug_get_task_description(parent_task),
1484 Some(&task_type),
1485 None,
1486 );
1487 }
1488
1489 let is_root = native_fn.is_root;
1490
1491 let arg_ref = arg.as_ref();
1493 let hash = CachedTaskType::hash_from_components(
1494 self.storage.task_cache.hasher(),
1495 native_fn,
1496 this,
1497 arg_ref,
1498 );
1499 let shard = get_shard(&self.storage.task_cache, hash);
1503
1504 let mut ctx = self.execute_context(turbo_tasks);
1505 if let Some(task_id) =
1509 raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1510 {
1511 self.track_cache_hit_by_fn(native_fn);
1512 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1513 return task_id;
1514 }
1515
1516 let task_id = if !transient
1521 && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1522 {
1523 self.track_cache_hit_by_fn(native_fn);
1524 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1527 k.eq_components(native_fn, this, arg_ref)
1528 }) {
1529 RawEntry::Occupied(_) => {}
1530 RawEntry::Vacant(e) => {
1531 e.insert(stored_type, task_id);
1532 }
1533 };
1534 task_id
1535 } else {
1536 match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| {
1537 k.eq_components(native_fn, this, arg_ref)
1538 }) {
1539 RawEntry::Occupied(e) => {
1540 let task_id = *e.get();
1543 drop(e);
1544 self.track_cache_hit_by_fn(native_fn);
1545 task_id
1546 }
1547 RawEntry::Vacant(e) => {
1548 let task_type = CachedTaskTypeArc::new(CachedTaskType {
1552 native_fn,
1553 this,
1554 arg: arg.take_box(),
1555 });
1556 let task_id = if transient {
1557 self.transient_task_id_factory.get()
1558 } else {
1559 self.persisted_task_id_factory.get()
1560 };
1561 self.storage
1565 .initialize_new_task(task_id, Some(task_type.clone()));
1566 e.insert(task_type, task_id);
1568 self.track_cache_miss_by_fn(native_fn);
1569 if is_root {
1573 AggregationUpdateQueue::run(
1574 AggregationUpdateJob::UpdateAggregationNumber {
1575 task_id,
1576 base_aggregation_number: u32::MAX,
1577 distance: None,
1578 },
1579 &mut ctx,
1580 );
1581 } else if native_fn.is_session_dependent && self.should_track_dependencies() {
1582 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
1583 AggregationUpdateQueue::run(
1584 AggregationUpdateJob::UpdateAggregationNumber {
1585 task_id,
1586 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
1587 distance: None,
1588 },
1589 &mut ctx,
1590 );
1591 };
1592
1593 task_id
1594 }
1595 }
1596 };
1597
1598 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1599
1600 task_id
1601 }
1602
1603 fn debug_trace_transient_task(
1606 &self,
1607 task_type: &CachedTaskType,
1608 cell_id: Option<CellId>,
1609 ) -> DebugTraceTransientTask {
1610 fn inner_id(
1613 backend: &TurboTasksBackendInner<impl BackingStorage>,
1614 task_id: TaskId,
1615 cell_type_id: Option<ValueTypeId>,
1616 visited_set: &mut FxHashSet<TaskId>,
1617 ) -> DebugTraceTransientTask {
1618 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1619 if visited_set.contains(&task_id) {
1620 let task_name = task_type.get_name();
1621 DebugTraceTransientTask::Collapsed {
1622 task_name,
1623 cell_type_id,
1624 }
1625 } else {
1626 inner_cached(backend, &task_type, cell_type_id, visited_set)
1627 }
1628 } else {
1629 DebugTraceTransientTask::Uncached { cell_type_id }
1630 }
1631 }
1632 fn inner_cached(
1633 backend: &TurboTasksBackendInner<impl BackingStorage>,
1634 task_type: &CachedTaskType,
1635 cell_type_id: Option<ValueTypeId>,
1636 visited_set: &mut FxHashSet<TaskId>,
1637 ) -> DebugTraceTransientTask {
1638 let task_name = task_type.get_name();
1639
1640 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1641 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1642 return None;
1646 };
1647 if task_id.is_transient() {
1648 Some(Box::new(inner_id(
1649 backend,
1650 task_id,
1651 cause_self_raw_vc.try_get_type_id(),
1652 visited_set,
1653 )))
1654 } else {
1655 None
1656 }
1657 });
1658 let cause_args = task_type
1659 .arg
1660 .get_raw_vcs()
1661 .into_iter()
1662 .filter_map(|raw_vc| {
1663 let Some(task_id) = raw_vc.try_get_task_id() else {
1664 return None;
1666 };
1667 if !task_id.is_transient() {
1668 return None;
1669 }
1670 Some((task_id, raw_vc.try_get_type_id()))
1671 })
1672 .collect::<IndexSet<_>>() .into_iter()
1674 .map(|(task_id, cell_type_id)| {
1675 inner_id(backend, task_id, cell_type_id, visited_set)
1676 })
1677 .collect();
1678
1679 DebugTraceTransientTask::Cached {
1680 task_name,
1681 cell_type_id,
1682 cause_self,
1683 cause_args,
1684 }
1685 }
1686 inner_cached(
1687 self,
1688 task_type,
1689 cell_id.map(|c| c.type_id),
1690 &mut FxHashSet::default(),
1691 )
1692 }
1693
1694 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1695 if !self.should_track_dependencies() {
1696 panic!("Dependency tracking is disabled so invalidation is not allowed");
1697 }
1698 operation::InvalidateOperation::run(
1699 smallvec![task_id],
1700 #[cfg(feature = "task_dirty_cause")]
1701 TaskDirtyCause::Invalidator,
1702 self.execute_context(turbo_tasks),
1703 );
1704 }
1705
1706 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
1707 if !self.should_track_dependencies() {
1708 panic!("Dependency tracking is disabled so invalidation is not allowed");
1709 }
1710 operation::InvalidateOperation::run(
1711 tasks.iter().copied().collect(),
1712 #[cfg(feature = "task_dirty_cause")]
1713 TaskDirtyCause::Unknown,
1714 self.execute_context(turbo_tasks),
1715 );
1716 }
1717
1718 fn invalidate_tasks_set(
1719 &self,
1720 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1721 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1722 ) {
1723 if !self.should_track_dependencies() {
1724 panic!("Dependency tracking is disabled so invalidation is not allowed");
1725 }
1726 operation::InvalidateOperation::run(
1727 tasks.iter().copied().collect(),
1728 #[cfg(feature = "task_dirty_cause")]
1729 TaskDirtyCause::Unknown,
1730 self.execute_context(turbo_tasks),
1731 );
1732 }
1733
1734 fn invalidate_serialization(
1735 &self,
1736 task_id: TaskId,
1737 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1738 ) {
1739 if task_id.is_transient() {
1740 return;
1741 }
1742 let mut ctx = self.execute_context(turbo_tasks);
1743 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1744 task.invalidate_serialization();
1745 }
1746
1747 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1748 let task = self.storage.access_mut(task_id);
1749 if let Some(value) = task.get_persistent_task_type() {
1750 format!("{task_id:?} {}", value)
1751 } else if let Some(value) = task.get_transient_task_type() {
1752 format!("{task_id:?} {}", value)
1753 } else {
1754 format!("{task_id:?} unknown")
1755 }
1756 }
1757
1758 fn get_task_name(
1759 &self,
1760 task_id: TaskId,
1761 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1762 ) -> String {
1763 let mut ctx = self.execute_context(turbo_tasks);
1764 let task = ctx.task(task_id, TaskDataCategory::Data);
1765 if let Some(value) = task.get_persistent_task_type() {
1766 value.to_string()
1767 } else if let Some(value) = task.get_transient_task_type() {
1768 value.to_string()
1769 } else {
1770 "unknown".to_string()
1771 }
1772 }
1773
1774 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<CachedTaskTypeArc> {
1775 let task = self.storage.access_mut(task_id);
1776 task.get_persistent_task_type().cloned()
1777 }
1778
1779 fn task_execution_canceled(
1780 &self,
1781 task_id: TaskId,
1782 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1783 ) {
1784 let mut ctx = self.execute_context(turbo_tasks);
1785 let mut task = ctx.task(task_id, TaskDataCategory::All);
1786 if let Some(in_progress) = task.take_in_progress() {
1787 match in_progress {
1788 InProgressState::Scheduled {
1789 done_event,
1790 reason: _,
1791 } => done_event.notify(usize::MAX),
1792 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1793 done_event.notify(usize::MAX)
1794 }
1795 InProgressState::Canceled => {}
1796 }
1797 }
1798 let in_progress_cells = task.take_in_progress_cells();
1801 if let Some(ref cells) = in_progress_cells {
1802 for state in cells.values() {
1803 state.event.notify(usize::MAX);
1804 }
1805 }
1806
1807 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1813 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1814 } else {
1815 None
1816 };
1817
1818 let old = task.set_in_progress(InProgressState::Canceled);
1819 debug_assert!(old.is_none(), "InProgress already exists");
1820 drop(task);
1821
1822 if let Some(data_update) = data_update {
1823 AggregationUpdateQueue::run(data_update, &mut ctx);
1824 }
1825
1826 drop(in_progress_cells);
1827 }
1828
1829 fn try_start_task_execution(
1830 &self,
1831 task_id: TaskId,
1832 priority: TaskPriority,
1833 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1834 ) -> Option<TaskExecutionSpec<'_>> {
1835 let execution_reason;
1836 let task_type;
1837 #[cfg(feature = "task_dirty_cause")]
1838 let cause;
1839 {
1840 let mut ctx = self.execute_context(turbo_tasks);
1841 let mut task = ctx.task(task_id, TaskDataCategory::All);
1842 task_type = task.get_task_type().to_owned();
1843 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1844 if let Some(tasks) = task.prefetch() {
1845 drop(task);
1846 ctx.prepare_tasks(tasks, "prefetch");
1847 task = ctx.task(task_id, TaskDataCategory::All);
1848 }
1849 let in_progress = task.take_in_progress()?;
1850 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1851 let old = task.set_in_progress(in_progress);
1852 debug_assert!(old.is_none(), "InProgress already exists");
1853 return None;
1854 };
1855 execution_reason = reason;
1856 #[cfg(feature = "task_dirty_cause")]
1857 {
1858 cause = match task.get_dirty() {
1859 Some(Dirtyness::Dirty { cause, .. }) => Some(cause.clone()),
1860 _ => None,
1861 };
1862 }
1863 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1864 InProgressStateInner {
1865 stale: false,
1866 once_task,
1867 done_event,
1868 marked_as_completed: false,
1869 new_children: Default::default(),
1870 },
1871 )));
1872 debug_assert!(old.is_none(), "InProgress already exists");
1873
1874 enum Collectible {
1876 Current(CollectibleRef, i32),
1877 Outdated(CollectibleRef),
1878 }
1879 let collectibles = task
1880 .iter_collectibles()
1881 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1882 .chain(
1883 task.iter_outdated_collectibles()
1884 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1885 )
1886 .collect::<Vec<_>>();
1887 for collectible in collectibles {
1888 match collectible {
1889 Collectible::Current(collectible, value) => {
1890 let _ = task.insert_outdated_collectible(collectible, value);
1891 }
1892 Collectible::Outdated(collectible) => {
1893 if task
1894 .collectibles()
1895 .is_none_or(|m| m.get(&collectible).is_none())
1896 {
1897 task.remove_outdated_collectibles(&collectible);
1898 }
1899 }
1900 }
1901 }
1902
1903 if self.should_track_dependencies() {
1904 let cell_dependencies = task.iter_cell_dependencies().collect();
1906 task.set_outdated_cell_dependencies(cell_dependencies);
1907
1908 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1909 task.set_outdated_output_dependencies(outdated_output_dependencies);
1910 }
1911 }
1912
1913 let (span, future) = match task_type {
1914 TaskType::Cached(task_type) => {
1915 let CachedTaskType {
1916 native_fn,
1917 this,
1918 arg,
1919 } = &*task_type;
1920 (
1921 native_fn.span(
1922 task_id.persistence(),
1923 execution_reason,
1924 priority,
1925 #[cfg(feature = "task_dirty_cause")]
1926 cause.as_ref(),
1927 ),
1928 native_fn.execute(*this, &**arg),
1929 )
1930 }
1931 TaskType::Transient(task_type) => {
1932 let span = tracing::trace_span!("turbo_tasks::root_task");
1933 let future = match &*task_type {
1934 TransientTask::Root(f) => f(),
1935 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1936 };
1937 (span, future)
1938 }
1939 };
1940 Some(TaskExecutionSpec { future, span })
1941 }
1942
1943 fn task_execution_completed(
1947 &self,
1948 task_id: TaskId,
1949 result: Result<RawVc, TurboTasksExecutionError>,
1950 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1951 #[cfg(feature = "verify_determinism")] stateful: bool,
1952 has_invalidator: bool,
1953 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
1954 ) -> Option<TaskPriority> {
1955 #[cfg(not(feature = "trace_task_details"))]
1970 let span = tracing::trace_span!(
1971 "task execution completed",
1972 new_children = tracing::field::Empty
1973 )
1974 .entered();
1975 #[cfg(feature = "trace_task_details")]
1976 let span = tracing::trace_span!(
1977 "task execution completed",
1978 task_id = display(task_id),
1979 result = match result.as_ref() {
1980 Ok(value) => display(either::Either::Left(value)),
1981 Err(err) => display(either::Either::Right(err)),
1982 },
1983 new_children = tracing::field::Empty,
1984 immutable = tracing::field::Empty,
1985 new_output = tracing::field::Empty,
1986 output_dependents = tracing::field::Empty,
1987 aggregation_number = tracing::field::Empty,
1988 stale = tracing::field::Empty,
1989 )
1990 .entered();
1991
1992 let is_error = result.is_err();
1993
1994 let mut ctx = self.execute_context(turbo_tasks);
1995
1996 let TaskExecutionCompletePrepareResult {
1997 new_children,
1998 is_now_immutable,
1999 #[cfg(feature = "verify_determinism")]
2000 no_output_set,
2001 new_output,
2002 #[cfg(feature = "task_dirty_cause")]
2003 function_id,
2004 output_dependent_tasks,
2005 is_recomputation,
2006 is_session_dependent,
2007 } = match self.task_execution_completed_prepare(
2008 &mut ctx,
2009 #[cfg(feature = "trace_task_details")]
2010 &span,
2011 task_id,
2012 result,
2013 cell_counters,
2014 #[cfg(feature = "verify_determinism")]
2015 stateful,
2016 has_invalidator,
2017 ) {
2018 Ok(r) => r,
2019 Err(stale_priority) => {
2020 #[cfg(feature = "trace_task_details")]
2022 span.record("stale", "prepare");
2023 return Some(stale_priority);
2024 }
2025 };
2026
2027 #[cfg(feature = "trace_task_details")]
2028 span.record("new_output", new_output.is_some());
2029 #[cfg(feature = "trace_task_details")]
2030 span.record("output_dependents", output_dependent_tasks.len());
2031
2032 if !output_dependent_tasks.is_empty() {
2037 self.task_execution_completed_invalidate_output_dependent(
2038 &mut ctx,
2039 task_id,
2040 #[cfg(feature = "task_dirty_cause")]
2041 function_id,
2042 output_dependent_tasks,
2043 );
2044 }
2045
2046 let has_new_children = !new_children.is_empty();
2047 span.record("new_children", new_children.len());
2048
2049 if has_new_children {
2050 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2051 }
2052
2053 if has_new_children
2054 && let Some(stale_priority) =
2055 self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2056 {
2057 #[cfg(feature = "trace_task_details")]
2059 span.record("stale", "connect");
2060 return Some(stale_priority);
2061 }
2062
2063 let (stale_priority, in_progress_cells) = self.task_execution_completed_finish(
2064 &mut ctx,
2065 task_id,
2066 #[cfg(feature = "verify_determinism")]
2067 no_output_set,
2068 new_output,
2069 is_now_immutable,
2070 is_session_dependent,
2071 );
2072 if let Some(stale_priority) = stale_priority {
2073 #[cfg(feature = "trace_task_details")]
2075 span.record("stale", "finish");
2076 return Some(stale_priority);
2077 }
2078
2079 let removed_data = self.task_execution_completed_cleanup(
2080 &mut ctx,
2081 task_id,
2082 cell_counters,
2083 is_error,
2084 is_recomputation,
2085 );
2086
2087 drop(removed_data);
2089 drop(in_progress_cells);
2090
2091 None
2092 }
2093
2094 fn task_execution_completed_prepare(
2095 &self,
2096 ctx: &mut impl ExecuteContext<'_>,
2097 #[cfg(feature = "trace_task_details")] span: &Span,
2098 task_id: TaskId,
2099 result: Result<RawVc, TurboTasksExecutionError>,
2100 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2101 #[cfg(feature = "verify_determinism")] stateful: bool,
2102 has_invalidator: bool,
2103 ) -> Result<TaskExecutionCompletePrepareResult, TaskPriority> {
2104 let mut task = ctx.task(task_id, TaskDataCategory::All);
2105 let is_recomputation = task.is_dirty().is_none();
2106 let is_session_dependent = self.should_track_dependencies()
2109 && matches!(task.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent);
2110 let Some(in_progress) = task.get_in_progress_mut() else {
2111 panic!("Task execution completed, but task is not in progress: {task:#?}");
2112 };
2113 if matches!(in_progress, InProgressState::Canceled) {
2114 return Ok(TaskExecutionCompletePrepareResult {
2115 new_children: Default::default(),
2116 is_now_immutable: false,
2117 #[cfg(feature = "verify_determinism")]
2118 no_output_set: false,
2119 #[cfg(feature = "task_dirty_cause")]
2120 function_id: None,
2121 new_output: None,
2122 output_dependent_tasks: Default::default(),
2123 is_recomputation,
2124 is_session_dependent,
2125 });
2126 }
2127 let &mut InProgressState::InProgress(box InProgressStateInner {
2128 stale,
2129 ref mut new_children,
2130 once_task: is_once_task,
2131 ..
2132 }) = in_progress
2133 else {
2134 panic!("Task execution completed, but task is not in progress: {task:#?}");
2135 };
2136
2137 #[cfg(not(feature = "no_fast_stale"))]
2139 if stale && !is_once_task {
2140 let stale_priority = compute_stale_priority(&task);
2141 let Some(InProgressState::InProgress(box InProgressStateInner {
2142 done_event,
2143 mut new_children,
2144 ..
2145 })) = task.take_in_progress()
2146 else {
2147 unreachable!();
2148 };
2149 let old = task.set_in_progress(InProgressState::Scheduled {
2150 done_event,
2151 reason: TaskExecutionReason::Stale,
2152 });
2153 debug_assert!(old.is_none(), "InProgress already exists");
2154 for task in task.iter_children() {
2157 new_children.remove(&task);
2158 }
2159 drop(task);
2160
2161 AggregationUpdateQueue::run(
2164 AggregationUpdateJob::DecreaseActiveCounts {
2165 task_ids: new_children.into_iter().collect(),
2166 },
2167 ctx,
2168 );
2169 return Err(stale_priority);
2170 }
2171
2172 let mut new_children = take(new_children);
2174
2175 #[cfg(feature = "task_dirty_cause")]
2177 let function_id = match task.get_task_type() {
2178 TaskTypeRef::Cached(task_type) => {
2179 Some(turbo_tasks::registry::get_function_id(task_type.native_fn))
2180 }
2181 TaskTypeRef::Transient(_) => None,
2182 };
2183
2184 #[cfg(feature = "verify_determinism")]
2186 if stateful {
2187 task.set_stateful(true);
2188 }
2189
2190 if has_invalidator {
2192 task.set_invalidator(true);
2193 }
2194
2195 if result.is_ok() || is_recomputation {
2205 let old_counters: FxHashMap<_, _> = task
2206 .iter_cell_type_max_index()
2207 .map(|(&k, &v)| (k, v))
2208 .collect();
2209 let mut counters_to_remove = old_counters.clone();
2210
2211 for (&cell_type, &max_index) in cell_counters.iter() {
2212 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2213 if old_max_index != max_index {
2214 task.insert_cell_type_max_index(cell_type, max_index);
2215 }
2216 } else {
2217 task.insert_cell_type_max_index(cell_type, max_index);
2218 }
2219 }
2220 for (cell_type, _) in counters_to_remove {
2221 task.remove_cell_type_max_index(&cell_type);
2222 }
2223 }
2224
2225 let mut queue = AggregationUpdateQueue::new();
2226
2227 let mut old_edges = Vec::new();
2228
2229 let has_children = !new_children.is_empty();
2230 let is_immutable = task.immutable();
2231 let task_dependencies_for_immutable =
2232 if !is_immutable
2234 && !is_session_dependent
2236 && !task.invalidator()
2238 && task.is_collectibles_dependencies_empty()
2240 {
2241 Some(
2242 task.iter_output_dependencies()
2244 .chain(task.iter_cell_dependencies().map(|dep| dep.cell_ref().task))
2245 .collect::<FxHashSet<_>>(),
2246 )
2247 } else {
2248 None
2249 };
2250
2251 if has_children {
2252 let _aggregation_number =
2254 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2255
2256 #[cfg(feature = "trace_task_details")]
2257 span.record("aggregation_number", _aggregation_number);
2258
2259 old_edges.extend(
2261 task.iter_children()
2262 .filter(|task| !new_children.remove(task))
2263 .map(OutdatedEdge::Child),
2264 );
2265 } else {
2266 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2267 }
2268
2269 old_edges.extend(
2270 task.iter_outdated_collectibles()
2271 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2272 );
2273
2274 if self.should_track_dependencies() {
2275 old_edges.extend(
2282 task.iter_outdated_cell_dependencies()
2283 .map(OutdatedEdge::CellDependency),
2284 );
2285 old_edges.extend(
2286 task.iter_outdated_output_dependencies()
2287 .map(OutdatedEdge::OutputDependency),
2288 );
2289 }
2290
2291 let current_output = task.get_output();
2293 #[cfg(feature = "verify_determinism")]
2294 let no_output_set = current_output.is_none();
2295 let new_output = match result {
2296 Ok(RawVc::TaskOutput(output_task_id)) => {
2297 if let Some(OutputValue::Output(current_task_id)) = current_output
2298 && *current_task_id == output_task_id
2299 {
2300 None
2301 } else {
2302 Some(OutputValue::Output(output_task_id))
2303 }
2304 }
2305 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2306 if let Some(OutputValue::Cell(CellRef {
2307 task: current_task_id,
2308 cell: current_cell,
2309 })) = current_output
2310 && *current_task_id == output_task_id
2311 && *current_cell == cell
2312 {
2313 None
2314 } else {
2315 Some(OutputValue::Cell(CellRef {
2316 task: output_task_id,
2317 cell,
2318 }))
2319 }
2320 }
2321 Ok(RawVc::LocalOutput(..)) => {
2322 panic!("Non-local tasks must not return a local Vc");
2323 }
2324 Err(err) => {
2325 if let Some(OutputValue::Error(old_error)) = current_output
2326 && **old_error == err
2327 {
2328 None
2329 } else {
2330 Some(OutputValue::Error(Arc::new((&err).into())))
2331 }
2332 }
2333 };
2334 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2335 if new_output.is_some() && ctx.should_track_dependencies() {
2337 output_dependent_tasks = task.iter_output_dependent().collect();
2338 }
2339
2340 drop(task);
2341
2342 let mut is_now_immutable = false;
2344 if let Some(dependencies) = task_dependencies_for_immutable
2345 && dependencies
2346 .iter()
2347 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2348 {
2349 is_now_immutable = true;
2350 }
2351 #[cfg(feature = "trace_task_details")]
2352 span.record("immutable", is_immutable || is_now_immutable);
2353
2354 if !queue.is_empty() || !old_edges.is_empty() {
2355 #[cfg(any(
2356 feature = "trace_task_completion",
2357 feature = "trace_aggregation_update_stats"
2358 ))]
2359 let _span =
2360 tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2361 .entered();
2362 #[cfg(feature = "trace_aggregation_update_stats")]
2366 {
2367 let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2368 _span.record("stats", tracing::field::debug(stats));
2369 }
2370 #[cfg(not(feature = "trace_aggregation_update_stats"))]
2371 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2372 }
2373
2374 Ok(TaskExecutionCompletePrepareResult {
2375 new_children,
2376 is_now_immutable,
2377 #[cfg(feature = "verify_determinism")]
2378 no_output_set,
2379 #[cfg(feature = "task_dirty_cause")]
2380 function_id,
2381 new_output,
2382 output_dependent_tasks,
2383 is_recomputation,
2384 is_session_dependent,
2385 })
2386 }
2387
2388 fn task_execution_completed_invalidate_output_dependent(
2389 &self,
2390 ctx: &mut impl ExecuteContext<'_>,
2391 task_id: TaskId,
2392 #[cfg(feature = "task_dirty_cause")] function_id: Option<FunctionId>,
2393 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2394 ) {
2395 debug_assert!(!output_dependent_tasks.is_empty());
2396
2397 #[cfg(feature = "task_dirty_cause")]
2398 let cause = match function_id {
2399 Some(function) => TaskDirtyCause::OutputChange { function },
2400 None => TaskDirtyCause::RootOutputChange,
2401 };
2402
2403 if output_dependent_tasks.len() > 1 {
2404 ctx.prepare_tasks(
2405 output_dependent_tasks
2406 .iter()
2407 .map(|&id| (id, TaskDataCategory::All)),
2408 "invalidate output dependents",
2409 );
2410 }
2411
2412 fn process_output_dependents(
2413 ctx: &mut impl ExecuteContext<'_>,
2414 task_id: TaskId,
2415 #[cfg(feature = "task_dirty_cause")] cause: &TaskDirtyCause,
2416 dependent_task_id: TaskId,
2417 queue: &mut AggregationUpdateQueue,
2418 ) {
2419 #[cfg(feature = "trace_task_output_dependencies")]
2420 let span = tracing::trace_span!(
2421 "invalidate output dependency",
2422 task = %task_id,
2423 dependent_task = %dependent_task_id,
2424 result = tracing::field::Empty,
2425 )
2426 .entered();
2427 let mut make_stale = true;
2428 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2429 let transient_task_type = dependent.get_transient_task_type();
2430 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2431 #[cfg(feature = "trace_task_output_dependencies")]
2433 span.record("result", "once task");
2434 return;
2435 }
2436 if dependent.outdated_output_dependencies_contains(&task_id) {
2437 #[cfg(feature = "trace_task_output_dependencies")]
2438 span.record("result", "outdated dependency");
2439 make_stale = false;
2444 } else if !dependent.output_dependencies_contains(&task_id) {
2445 #[cfg(feature = "trace_task_output_dependencies")]
2448 span.record("result", "no backward dependency");
2449 return;
2450 }
2451 make_task_dirty_internal(
2452 dependent,
2453 dependent_task_id,
2454 make_stale,
2455 #[cfg(feature = "task_dirty_cause")]
2456 cause.clone(),
2457 queue,
2458 ctx,
2459 );
2460 #[cfg(feature = "trace_task_output_dependencies")]
2461 span.record("result", "marked dirty");
2462 }
2463
2464 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2465 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2466 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2467 let _ = scope_and_block(chunks.len(), |scope| {
2468 for chunk in chunks {
2469 let child_ctx = ctx.child_context();
2470 #[cfg(feature = "task_dirty_cause")]
2471 let cause = &cause;
2472 scope.spawn(move || {
2473 let mut ctx = child_ctx.create();
2474 let mut queue = AggregationUpdateQueue::new();
2475 for dependent_task_id in chunk {
2476 process_output_dependents(
2477 &mut ctx,
2478 task_id,
2479 #[cfg(feature = "task_dirty_cause")]
2480 cause,
2481 dependent_task_id,
2482 &mut queue,
2483 )
2484 }
2485 queue.execute(&mut ctx);
2486 });
2487 }
2488 });
2489 } else {
2490 let mut queue = AggregationUpdateQueue::new();
2491 for dependent_task_id in output_dependent_tasks {
2492 process_output_dependents(
2493 ctx,
2494 task_id,
2495 #[cfg(feature = "task_dirty_cause")]
2496 &cause,
2497 dependent_task_id,
2498 &mut queue,
2499 );
2500 }
2501 queue.execute(ctx);
2502 }
2503 }
2504
2505 fn task_execution_completed_unfinished_children_dirty(
2506 &self,
2507 ctx: &mut impl ExecuteContext<'_>,
2508 new_children: &FxHashSet<TaskId>,
2509 ) {
2510 debug_assert!(!new_children.is_empty());
2511
2512 let mut queue = AggregationUpdateQueue::new();
2513 ctx.for_each_task_all(
2514 new_children.iter().copied(),
2515 "unfinished children dirty",
2516 |child_task, ctx| {
2517 if !child_task.has_output() {
2518 let child_id = child_task.id();
2519 make_task_dirty_internal(
2520 child_task,
2521 child_id,
2522 false,
2523 #[cfg(feature = "task_dirty_cause")]
2524 TaskDirtyCause::InitialDirty,
2525 &mut queue,
2526 ctx,
2527 );
2528 }
2529 },
2530 );
2531
2532 queue.execute(ctx);
2533 }
2534
2535 fn task_execution_completed_connect(
2536 &self,
2537 ctx: &mut impl ExecuteContext<'_>,
2538 task_id: TaskId,
2539 new_children: FxHashSet<TaskId>,
2540 ) -> Option<TaskPriority> {
2541 debug_assert!(!new_children.is_empty());
2542
2543 let mut task = ctx.task(task_id, TaskDataCategory::All);
2544 let Some(in_progress) = task.get_in_progress() else {
2545 panic!("Task execution completed, but task is not in progress: {task:#?}");
2546 };
2547 if matches!(in_progress, InProgressState::Canceled) {
2548 return None;
2550 }
2551 let InProgressState::InProgress(box InProgressStateInner {
2552 #[cfg(not(feature = "no_fast_stale"))]
2553 stale,
2554 once_task: is_once_task,
2555 ..
2556 }) = in_progress
2557 else {
2558 panic!("Task execution completed, but task is not in progress: {task:#?}");
2559 };
2560
2561 #[cfg(not(feature = "no_fast_stale"))]
2563 if *stale && !is_once_task {
2564 let stale_priority = compute_stale_priority(&task);
2565 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2566 task.take_in_progress()
2567 else {
2568 unreachable!();
2569 };
2570 let old = task.set_in_progress(InProgressState::Scheduled {
2571 done_event,
2572 reason: TaskExecutionReason::Stale,
2573 });
2574 debug_assert!(old.is_none(), "InProgress already exists");
2575 drop(task);
2576
2577 AggregationUpdateQueue::run(
2580 AggregationUpdateJob::DecreaseActiveCounts {
2581 task_ids: new_children.into_iter().collect(),
2582 },
2583 ctx,
2584 );
2585 return Some(stale_priority);
2586 }
2587
2588 let has_active_count = ctx.should_track_activeness()
2589 && task
2590 .get_activeness()
2591 .is_some_and(|activeness| activeness.active_counter > 0);
2592 connect_children(
2593 ctx,
2594 task_id,
2595 task,
2596 new_children,
2597 has_active_count,
2598 ctx.should_track_activeness(),
2599 );
2600
2601 None
2602 }
2603
2604 #[allow(clippy::type_complexity)]
2605 fn task_execution_completed_finish(
2606 &self,
2607 ctx: &mut impl ExecuteContext<'_>,
2608 task_id: TaskId,
2609 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2610 new_output: Option<OutputValue>,
2611 is_now_immutable: bool,
2612 is_session_dependent: bool,
2613 ) -> (
2614 Option<TaskPriority>,
2615 Option<
2616 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2617 >,
2618 ) {
2619 let mut task = ctx.task(task_id, TaskDataCategory::All);
2620 let Some(in_progress) = task.take_in_progress() else {
2621 panic!("Task execution completed, but task is not in progress: {task:#?}");
2622 };
2623 if matches!(in_progress, InProgressState::Canceled) {
2624 return (None, None);
2626 }
2627 let InProgressState::InProgress(box InProgressStateInner {
2628 done_event,
2629 once_task: is_once_task,
2630 stale,
2631 marked_as_completed: _,
2632 new_children,
2633 }) = in_progress
2634 else {
2635 panic!("Task execution completed, but task is not in progress: {task:#?}");
2636 };
2637 debug_assert!(new_children.is_empty());
2638
2639 if stale && !is_once_task {
2641 let stale_priority = compute_stale_priority(&task);
2642 let old = task.set_in_progress(InProgressState::Scheduled {
2643 done_event,
2644 reason: TaskExecutionReason::Stale,
2645 });
2646 debug_assert!(old.is_none(), "InProgress already exists");
2647 return (Some(stale_priority), None);
2648 }
2649
2650 let mut old_content = None;
2652 if let Some(value) = new_output {
2653 old_content = task.set_output(value);
2654 }
2655
2656 if is_now_immutable {
2659 task.set_immutable(true);
2660 }
2661
2662 let in_progress_cells = task.take_in_progress_cells();
2664 if let Some(ref cells) = in_progress_cells {
2665 for state in cells.values() {
2666 state.event.notify(usize::MAX);
2667 }
2668 }
2669
2670 let new_dirtyness = if is_session_dependent {
2672 Some(Dirtyness::SessionDependent)
2673 } else {
2674 None
2675 };
2676 #[cfg(feature = "verify_determinism")]
2677 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2678 let data_update = task.update_dirty_state(new_dirtyness);
2679
2680 #[cfg(feature = "verify_determinism")]
2684 let stale_priority: Option<TaskPriority> =
2685 ((dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task)
2686 .then(TaskPriority::leaf);
2687 #[cfg(not(feature = "verify_determinism"))]
2688 let stale_priority: Option<TaskPriority> = None;
2689 if stale_priority.is_some() {
2690 let old = task.set_in_progress(InProgressState::Scheduled {
2691 done_event,
2692 reason: TaskExecutionReason::Stale,
2693 });
2694 debug_assert!(old.is_none(), "InProgress already exists");
2695 drop(task);
2696 } else {
2697 drop(task);
2698
2699 done_event.notify(usize::MAX);
2701 }
2702
2703 drop(old_content);
2704
2705 if let Some(data_update) = data_update {
2706 AggregationUpdateQueue::run(data_update, ctx);
2707 }
2708
2709 (stale_priority, in_progress_cells)
2711 }
2712
2713 fn task_execution_completed_cleanup(
2714 &self,
2715 ctx: &mut impl ExecuteContext<'_>,
2716 task_id: TaskId,
2717 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2718 is_error: bool,
2719 is_recomputation: bool,
2720 ) -> Vec<SharedReference> {
2721 let mut task = ctx.task(task_id, TaskDataCategory::All);
2722 let mut removed_cell_data = Vec::new();
2723 if !is_error || is_recomputation {
2729 let to_remove: Vec<_> = task
2735 .iter_cell_data()
2736 .filter_map(|(cell, _)| {
2737 cell_counters
2738 .get(&cell.type_id)
2739 .is_none_or(|start_index| cell.index >= *start_index)
2740 .then_some(*cell)
2741 })
2742 .collect();
2743 removed_cell_data.reserve_exact(to_remove.len());
2744 for cell in to_remove {
2745 if let Some(data) = task.remove_cell_data(&cell) {
2746 removed_cell_data.push(data);
2747 }
2748 }
2749 let to_remove_hash: Vec<_> = task
2751 .iter_cell_data_hash()
2752 .filter_map(|(cell, _)| {
2753 cell_counters
2754 .get(&cell.type_id)
2755 .is_none_or(|start_index| cell.index >= *start_index)
2756 .then_some(*cell)
2757 })
2758 .collect();
2759 for cell in to_remove_hash {
2760 task.remove_cell_data_hash(&cell);
2761 }
2762 }
2763
2764 task.cleanup_after_execution();
2768
2769 drop(task);
2770
2771 removed_cell_data
2773 }
2774
2775 fn log_unrecoverable_persist_error() {
2778 eprintln!(
2779 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2780 background persisting process."
2781 );
2782 }
2783
2784 fn run_backend_job<'a>(
2785 self: &'a Arc<Self>,
2786 job: TurboTasksBackendJob,
2787 turbo_tasks: &'a TurboTasks<TurboTasksBackend<B>>,
2788 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2789 Box::pin(async move {
2790 match job {
2791 TurboTasksBackendJob::Snapshot => {
2792 debug_assert!(self.should_persist());
2793
2794 let mut last_snapshot = self.start_time;
2795 let mut idle_start_listener = self.idle_start_event.listen();
2796 let mut idle_end_listener = self.idle_end_event.listen();
2797 let mut fresh_idle = true;
2800 let mut evicted = false;
2801 let mut is_first = true;
2802 'outer: loop {
2803 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2804 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2805 let idle_timeout = *IDLE_TIMEOUT;
2806 let (time, mut reason) = if is_first {
2807 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2808 } else {
2809 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2810 };
2811
2812 let until = last_snapshot + time;
2813 if until > Instant::now() {
2814 let mut stop_listener = self.stopping_event.listen();
2815 if self.stopping.load(Ordering::Acquire) {
2816 return;
2817 }
2818 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2819 Instant::now() + idle_timeout
2820 } else {
2821 far_future()
2822 };
2823 loop {
2824 tokio::select! {
2825 _ = &mut stop_listener => {
2826 return;
2827 },
2828 _ = &mut idle_start_listener => {
2829 idle_time = Instant::now() + idle_timeout;
2830 idle_start_listener = self.idle_start_event.listen()
2831 },
2832 _ = &mut idle_end_listener => {
2833 idle_time = far_future();
2834 idle_end_listener = self.idle_end_event.listen()
2835 },
2836 _ = tokio::time::sleep_until(until) => {
2837 break;
2838 },
2839 _ = tokio::time::sleep_until(idle_time) => {
2840 if turbo_tasks.is_idle() {
2841 reason = "idle timeout";
2842 break;
2843 }
2844 },
2845 }
2846 }
2847 }
2848
2849 let this = self.clone();
2850 let background_span =
2854 tracing::info_span!(parent: None, "background snapshot");
2855 match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2856 Err(err) => {
2857 eprintln!("Persisting failed: {err:?}");
2860 Self::log_unrecoverable_persist_error();
2861 return;
2862 }
2863 Ok((snapshot_start, new_data)) => {
2864 fresh_idle = new_data;
2866 is_first = false;
2867 last_snapshot = snapshot_start;
2868
2869 macro_rules! check_idle_ended {
2873 () => {{
2874 tokio::select! {
2875 biased;
2876 _ = &mut idle_end_listener => {
2877 idle_end_listener = self.idle_end_event.listen();
2878 true
2879 },
2880 _ = std::future::ready(()) => false,
2881 }
2882 }};
2883 }
2884 let mut ran_eviction = false;
2902 if this.should_evict() && (new_data || !evicted) {
2903 if check_idle_ended!() {
2904 continue 'outer;
2907 }
2908 evicted = true;
2909 ran_eviction = true;
2910 this.storage.evict_after_snapshot(background_span.id());
2911 }
2912
2913 let mut ran_compaction = false;
2920 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2921 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2922 if check_idle_ended!() {
2923 continue 'outer;
2924 }
2925 let _compact_span = tracing::info_span!(
2929 parent: background_span.id(),
2930 "compact database"
2931 )
2932 .entered();
2933 match self.backing_storage.compact() {
2934 Ok(true) => {
2935 ran_compaction = true;
2936 }
2937 Ok(false) => break,
2938 Err(err) => {
2939 eprintln!("Compaction failed: {err:?}");
2940 if self.backing_storage.has_unrecoverable_write_error()
2941 {
2942 Self::log_unrecoverable_persist_error();
2943 return;
2944 }
2945 break;
2946 }
2947 }
2948 }
2949 if check_idle_ended!() {
2950 continue 'outer;
2951 }
2952 if new_data || ran_compaction || ran_eviction {
2957 turbo_tasks_malloc::TurboMalloc::collect(true);
2958 }
2959 }
2960 }
2961 }
2962 }
2963 }
2964 })
2965 }
2966
2967 fn try_read_own_task_cell(
2968 &self,
2969 task_id: TaskId,
2970 cell: CellId,
2971 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
2972 ) -> Result<TypedCellContent> {
2973 let mut ctx = self.execute_context(turbo_tasks);
2974 let task = ctx.task(task_id, TaskDataCategory::Data);
2975 if let Some(content) = task.get_cell_data(&cell).cloned() {
2976 Ok(CellContent(Some(content)).into_typed(cell.type_id))
2977 } else {
2978 Ok(CellContent(None).into_typed(cell.type_id))
2979 }
2980 }
2981
2982 fn read_task_collectibles(
2983 &self,
2984 task_id: TaskId,
2985 collectible_type: TraitTypeId,
2986 reader_id: Option<TaskId>,
2987 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
2988 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2989 let mut ctx = self.execute_context(turbo_tasks);
2990 let mut collectibles = AutoMap::default();
2991 {
2992 let mut task = ctx.task(task_id, TaskDataCategory::All);
2993 if task
2994 .get_persistent_task_type()
2995 .is_some_and(|t| !t.native_fn.is_root)
2996 {
2997 drop(task);
2998 panic!(
2999 "Reading collectibles of non-root task {} (reader: {}). The `root` attribute \
3000 is missing on the task.",
3001 self.debug_get_task_description(task_id),
3002 reader_id.map_or_else(
3003 || "unknown".to_string(),
3004 |r| self.debug_get_task_description(r)
3005 )
3006 );
3007 }
3008 for (collectible, count) in task.iter_aggregated_collectibles() {
3009 if *count > 0 && collectible.collectible_type == collectible_type {
3010 *collectibles
3011 .entry(RawVc::TaskCell(
3012 collectible.cell.task,
3013 collectible.cell.cell,
3014 ))
3015 .or_insert(0) += 1;
3016 }
3017 }
3018 for (&collectible, &count) in task.iter_collectibles() {
3019 if collectible.collectible_type == collectible_type {
3020 *collectibles
3021 .entry(RawVc::TaskCell(
3022 collectible.cell.task,
3023 collectible.cell.cell,
3024 ))
3025 .or_insert(0) += count;
3026 }
3027 }
3028 if let Some(reader_id) = reader_id {
3029 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3030 }
3031 }
3032 if let Some(reader_id) = reader_id {
3033 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3034 let target = CollectiblesRef {
3035 task: task_id,
3036 collectible_type,
3037 };
3038 if !reader.remove_outdated_collectibles_dependencies(&target) {
3039 let _ = reader.add_collectibles_dependencies(target);
3040 }
3041 }
3042 collectibles
3043 }
3044
3045 fn emit_collectible(
3046 &self,
3047 collectible_type: TraitTypeId,
3048 collectible: RawVc,
3049 task_id: TaskId,
3050 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3051 ) {
3052 self.assert_valid_collectible(task_id, collectible);
3053
3054 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3055 panic!("Collectibles need to be resolved");
3056 };
3057 let cell = CellRef {
3058 task: collectible_task,
3059 cell,
3060 };
3061 operation::UpdateCollectibleOperation::run(
3062 task_id,
3063 CollectibleRef {
3064 collectible_type,
3065 cell,
3066 },
3067 1,
3068 self.execute_context(turbo_tasks),
3069 );
3070 }
3071
3072 fn unemit_collectible(
3073 &self,
3074 collectible_type: TraitTypeId,
3075 collectible: RawVc,
3076 count: u32,
3077 task_id: TaskId,
3078 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3079 ) {
3080 self.assert_valid_collectible(task_id, collectible);
3081
3082 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3083 panic!("Collectibles need to be resolved");
3084 };
3085 let cell = CellRef {
3086 task: collectible_task,
3087 cell,
3088 };
3089 operation::UpdateCollectibleOperation::run(
3090 task_id,
3091 CollectibleRef {
3092 collectible_type,
3093 cell,
3094 },
3095 -(i32::try_from(count).unwrap()),
3096 self.execute_context(turbo_tasks),
3097 );
3098 }
3099
3100 fn update_task_cell(
3101 &self,
3102 task_id: TaskId,
3103 cell: CellId,
3104 content: CellContent,
3105 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3106 content_hash: Option<CellHash>,
3107 verification_mode: VerificationMode,
3108 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3109 ) {
3110 operation::UpdateCellOperation::run(
3111 task_id,
3112 cell,
3113 content,
3114 updated_key_hashes,
3115 content_hash,
3116 verification_mode,
3117 self.execute_context(turbo_tasks),
3118 );
3119 }
3120
3121 fn mark_own_task_as_finished(
3122 &self,
3123 task: TaskId,
3124 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3125 ) {
3126 let mut ctx = self.execute_context(turbo_tasks);
3127 let mut task = ctx.task(task, TaskDataCategory::Data);
3128 if let Some(InProgressState::InProgress(box InProgressStateInner {
3129 marked_as_completed,
3130 ..
3131 })) = task.get_in_progress_mut()
3132 {
3133 *marked_as_completed = true;
3134 }
3139 }
3140
3141 fn connect_task(
3142 &self,
3143 task: TaskId,
3144 parent_task: Option<TaskId>,
3145 turbo_tasks: &TurboTasks<TurboTasksBackend<B>>,
3146 ) {
3147 self.assert_not_persistent_calling_transient(parent_task, task, None);
3148 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3149 }
3150
3151 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3152 let task_id = self.transient_task_id_factory.get();
3153 {
3154 let mut task = self.storage.access_mut(task_id);
3155 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3156 }
3157 #[cfg(feature = "verify_aggregation_graph")]
3158 self.root_tasks.lock().insert(task_id);
3159 task_id
3160 }
3161
3162 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>) {
3163 #[cfg(feature = "verify_aggregation_graph")]
3164 self.root_tasks.lock().remove(&task_id);
3165
3166 let mut ctx = self.execute_context(turbo_tasks);
3167 let mut task = ctx.task(task_id, TaskDataCategory::All);
3168 let is_dirty = task.is_dirty();
3169 let has_dirty_containers = task.has_dirty_containers();
3170 if is_dirty.is_some() || has_dirty_containers {
3171 if let Some(activeness_state) = task.get_activeness_mut() {
3172 activeness_state.unset_root_type();
3174 activeness_state.set_active_until_clean();
3175 };
3176 } else if let Some(activeness_state) = task.take_activeness() {
3177 activeness_state.all_clean_event.notify(usize::MAX);
3180 }
3181 }
3182
3183 #[cfg(feature = "verify_aggregation_graph")]
3184 fn verify_aggregation_graph(&self, turbo_tasks: &TurboTasks<TurboTasksBackend<B>>, idle: bool) {
3185 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3186 return;
3187 }
3188 use std::{collections::VecDeque, env, io::stdout};
3189
3190 use crate::backend::operation::{get_uppers, is_aggregating_node};
3191
3192 let mut ctx = self.execute_context(turbo_tasks);
3193 let root_tasks = self.root_tasks.lock().clone();
3194
3195 for task_id in root_tasks.into_iter() {
3196 let mut queue = VecDeque::new();
3197 let mut visited = FxHashSet::default();
3198 let mut aggregated_nodes = FxHashSet::default();
3199 let mut collectibles = FxHashMap::default();
3200 let root_task_id = task_id;
3201 visited.insert(task_id);
3202 aggregated_nodes.insert(task_id);
3203 queue.push_back(task_id);
3204 let mut counter = 0;
3205 while let Some(task_id) = queue.pop_front() {
3206 counter += 1;
3207 if counter % 100000 == 0 {
3208 println!(
3209 "queue={}, visited={}, aggregated_nodes={}",
3210 queue.len(),
3211 visited.len(),
3212 aggregated_nodes.len()
3213 );
3214 }
3215 let task = ctx.task(task_id, TaskDataCategory::All);
3216 if idle && !self.is_idle.load(Ordering::Relaxed) {
3217 return;
3218 }
3219
3220 let uppers = get_uppers(&task);
3221 if task_id != root_task_id
3222 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3223 {
3224 panic!(
3225 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3226 {:?})",
3227 task_id,
3228 task.get_task_description(),
3229 uppers
3230 );
3231 }
3232
3233 for (collectible, _) in task.iter_aggregated_collectibles() {
3234 collectibles
3235 .entry(*collectible)
3236 .or_insert_with(|| (false, Vec::new()))
3237 .1
3238 .push(task_id);
3239 }
3240
3241 for (&collectible, &value) in task.iter_collectibles() {
3242 if value > 0 {
3243 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3244 *flag = true
3245 } else {
3246 panic!(
3247 "Task {} has a collectible {:?} that is not in any upper task",
3248 task_id, collectible
3249 );
3250 }
3251 }
3252 }
3253
3254 let is_dirty = task.has_dirty();
3255 let has_dirty_container = task.has_dirty_containers();
3256 let should_be_in_upper = is_dirty || has_dirty_container;
3257
3258 let aggregation_number = get_aggregation_number(&task);
3259 if is_aggregating_node(aggregation_number) {
3260 aggregated_nodes.insert(task_id);
3261 }
3262 for child_id in task.iter_children() {
3269 if visited.insert(child_id) {
3271 queue.push_back(child_id);
3272 }
3273 }
3274 drop(task);
3275
3276 if should_be_in_upper {
3277 for upper_id in uppers {
3278 let upper = ctx.task(upper_id, TaskDataCategory::All);
3279 let in_upper = upper
3280 .get_aggregated_dirty_containers(&task_id)
3281 .is_some_and(|&dirty| dirty > 0);
3282 if !in_upper {
3283 let containers: Vec<_> = upper
3284 .iter_aggregated_dirty_containers()
3285 .map(|(&k, &v)| (k, v))
3286 .collect();
3287 let upper_task_desc = upper.get_task_description();
3288 drop(upper);
3289 panic!(
3290 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3291 ({})\nThese dirty containers are present:\n{:#?}",
3292 task_id,
3293 ctx.task(task_id, TaskDataCategory::Data)
3294 .get_task_description(),
3295 upper_id,
3296 upper_task_desc,
3297 containers,
3298 );
3299 }
3300 }
3301 }
3302 }
3303
3304 for (collectible, (flag, task_ids)) in collectibles {
3305 if !flag {
3306 use std::io::Write;
3307 let mut stdout = stdout().lock();
3308 writeln!(
3309 stdout,
3310 "{:?} that is not emitted in any child task but in these aggregated \
3311 tasks: {:#?}",
3312 collectible,
3313 task_ids
3314 .iter()
3315 .map(|t| format!(
3316 "{t} {}",
3317 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3318 ))
3319 .collect::<Vec<_>>()
3320 )
3321 .unwrap();
3322
3323 let task_id = collectible.cell.task;
3324 let mut queue = {
3325 let task = ctx.task(task_id, TaskDataCategory::All);
3326 get_uppers(&task)
3327 };
3328 let mut visited = FxHashSet::default();
3329 for &upper_id in queue.iter() {
3330 visited.insert(upper_id);
3331 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3332 }
3333 while let Some(task_id) = queue.pop() {
3334 let task = ctx.task(task_id, TaskDataCategory::All);
3335 let desc = task.get_task_description();
3336 let aggregated_collectible = task
3337 .get_aggregated_collectibles(&collectible)
3338 .copied()
3339 .unwrap_or_default();
3340 let uppers = get_uppers(&task);
3341 drop(task);
3342 writeln!(
3343 stdout,
3344 "upper {task_id} {desc} collectible={aggregated_collectible}"
3345 )
3346 .unwrap();
3347 if task_ids.contains(&task_id) {
3348 writeln!(
3349 stdout,
3350 "Task has an upper connection to an aggregated task that doesn't \
3351 reference it. Upper connection is invalid!"
3352 )
3353 .unwrap();
3354 }
3355 for upper_id in uppers {
3356 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3357 if !visited.contains(&upper_id) {
3358 queue.push(upper_id);
3359 }
3360 }
3361 }
3362 panic!("See stdout for more details");
3363 }
3364 }
3365 }
3366 }
3367
3368 fn assert_not_persistent_calling_transient(
3369 &self,
3370 parent_id: Option<TaskId>,
3371 child_id: TaskId,
3372 cell_id: Option<CellId>,
3373 ) {
3374 if let Some(parent_id) = parent_id
3375 && !parent_id.is_transient()
3376 && child_id.is_transient()
3377 {
3378 self.panic_persistent_calling_transient(
3379 self.debug_get_task_description(parent_id),
3380 self.debug_get_cached_task_type(child_id).as_deref(),
3381 cell_id,
3382 );
3383 }
3384 }
3385
3386 fn panic_persistent_calling_transient(
3387 &self,
3388 parent: String,
3389 child: Option<&CachedTaskType>,
3390 cell_id: Option<CellId>,
3391 ) -> ! {
3392 let transient_reason = if let Some(child) = child {
3393 Cow::Owned(format!(
3394 " The callee is transient because it depends on:\n{}",
3395 self.debug_trace_transient_task(child, cell_id),
3396 ))
3397 } else {
3398 Cow::Borrowed("")
3399 };
3400 panic!(
3401 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3402 parent,
3403 child.map_or("unknown", |t| t.get_name()),
3404 transient_reason,
3405 );
3406 }
3407
3408 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3409 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3411 let task_info = if let Some(col_task_ty) = collectible
3413 .try_get_task_id()
3414 .map(|t| self.debug_get_task_description(t))
3415 {
3416 Cow::Owned(format!(" (return type of {col_task_ty})"))
3417 } else {
3418 Cow::Borrowed("")
3419 };
3420 panic!("Collectible{task_info} must be a ResolvedVc")
3421 };
3422 if col_task_id.is_transient() && !task_id.is_transient() {
3423 let transient_reason =
3424 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3425 Cow::Owned(format!(
3426 ". The collectible is transient because it depends on:\n{}",
3427 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3428 ))
3429 } else {
3430 Cow::Borrowed("")
3431 };
3432 panic!(
3434 "Collectible is transient, transient collectibles cannot be emitted from \
3435 persistent tasks{transient_reason}",
3436 )
3437 }
3438 }
3439}
3440
3441impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3442 fn startup(&self, turbo_tasks: &TurboTasks<Self>) {
3443 self.0.startup(turbo_tasks);
3444 }
3445
3446 fn stopping(&self, _turbo_tasks: &TurboTasks<Self>) {
3447 self.0.stopping();
3448 }
3449
3450 fn stop(&self, turbo_tasks: &TurboTasks<Self>) {
3451 self.0.stop(turbo_tasks);
3452 }
3453
3454 fn idle_start(&self, turbo_tasks: &TurboTasks<Self>) {
3455 self.0.idle_start(turbo_tasks);
3456 }
3457
3458 fn idle_end(&self, _turbo_tasks: &TurboTasks<Self>) {
3459 self.0.idle_end();
3460 }
3461
3462 fn get_or_create_task(
3463 &self,
3464 native_fn: &'static NativeFunction,
3465 this: Option<RawVc>,
3466 arg: &mut dyn DynTaskInputsStorage,
3467 parent_task: Option<TaskId>,
3468 persistence: TaskPersistence,
3469 turbo_tasks: &TurboTasks<Self>,
3470 ) -> TaskId {
3471 self.0
3472 .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3473 }
3474
3475 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3476 self.0.invalidate_task(task_id, turbo_tasks);
3477 }
3478
3479 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &TurboTasks<Self>) {
3480 self.0.invalidate_tasks(tasks, turbo_tasks);
3481 }
3482
3483 fn invalidate_tasks_set(
3484 &self,
3485 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3486 turbo_tasks: &TurboTasks<Self>,
3487 ) {
3488 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3489 }
3490
3491 fn invalidate_serialization(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3492 self.0.invalidate_serialization(task_id, turbo_tasks);
3493 }
3494
3495 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) {
3496 self.0.task_execution_canceled(task, turbo_tasks)
3497 }
3498
3499 fn try_start_task_execution(
3500 &self,
3501 task_id: TaskId,
3502 priority: TaskPriority,
3503 turbo_tasks: &TurboTasks<Self>,
3504 ) -> Option<TaskExecutionSpec<'_>> {
3505 self.0
3506 .try_start_task_execution(task_id, priority, turbo_tasks)
3507 }
3508
3509 fn task_execution_completed(
3510 &self,
3511 task_id: TaskId,
3512 result: Result<RawVc, TurboTasksExecutionError>,
3513 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3514 #[cfg(feature = "verify_determinism")] stateful: bool,
3515 has_invalidator: bool,
3516 turbo_tasks: &TurboTasks<Self>,
3517 ) -> Option<TaskPriority> {
3518 self.0.task_execution_completed(
3519 task_id,
3520 result,
3521 cell_counters,
3522 #[cfg(feature = "verify_determinism")]
3523 stateful,
3524 has_invalidator,
3525 turbo_tasks,
3526 )
3527 }
3528
3529 type BackendJob = TurboTasksBackendJob;
3530
3531 fn run_backend_job<'a>(
3532 &'a self,
3533 job: Self::BackendJob,
3534 turbo_tasks: &'a TurboTasks<Self>,
3535 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3536 self.0.run_backend_job(job, turbo_tasks)
3537 }
3538
3539 fn try_read_task_output(
3540 &self,
3541 task_id: TaskId,
3542 reader: Option<TaskId>,
3543 options: ReadOutputOptions,
3544 turbo_tasks: &TurboTasks<Self>,
3545 ) -> Result<Result<RawVc, EventListener>> {
3546 self.0
3547 .try_read_task_output(task_id, reader, options, turbo_tasks)
3548 }
3549
3550 fn try_read_task_cell(
3551 &self,
3552 task_id: TaskId,
3553 cell: CellId,
3554 reader: Option<TaskId>,
3555 options: ReadCellOptions,
3556 turbo_tasks: &TurboTasks<Self>,
3557 ) -> Result<Result<TypedCellContent, EventListener>> {
3558 self.0
3559 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3560 }
3561
3562 fn try_read_own_task_cell(
3563 &self,
3564 task_id: TaskId,
3565 cell: CellId,
3566 turbo_tasks: &TurboTasks<Self>,
3567 ) -> Result<TypedCellContent> {
3568 self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3569 }
3570
3571 fn read_task_collectibles(
3572 &self,
3573 task_id: TaskId,
3574 collectible_type: TraitTypeId,
3575 reader: Option<TaskId>,
3576 turbo_tasks: &TurboTasks<Self>,
3577 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3578 self.0
3579 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3580 }
3581
3582 fn emit_collectible(
3583 &self,
3584 collectible_type: TraitTypeId,
3585 collectible: RawVc,
3586 task_id: TaskId,
3587 turbo_tasks: &TurboTasks<Self>,
3588 ) {
3589 self.0
3590 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3591 }
3592
3593 fn unemit_collectible(
3594 &self,
3595 collectible_type: TraitTypeId,
3596 collectible: RawVc,
3597 count: u32,
3598 task_id: TaskId,
3599 turbo_tasks: &TurboTasks<Self>,
3600 ) {
3601 self.0
3602 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3603 }
3604
3605 fn update_task_cell(
3606 &self,
3607 task_id: TaskId,
3608 cell: CellId,
3609 content: CellContent,
3610 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3611 content_hash: Option<CellHash>,
3612 verification_mode: VerificationMode,
3613 turbo_tasks: &TurboTasks<Self>,
3614 ) {
3615 self.0.update_task_cell(
3616 task_id,
3617 cell,
3618 content,
3619 updated_key_hashes,
3620 content_hash,
3621 verification_mode,
3622 turbo_tasks,
3623 );
3624 }
3625
3626 fn mark_own_task_as_finished(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3627 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3628 }
3629
3630 fn connect_task(
3631 &self,
3632 task: TaskId,
3633 parent_task: Option<TaskId>,
3634 turbo_tasks: &TurboTasks<Self>,
3635 ) {
3636 self.0.connect_task(task, parent_task, turbo_tasks);
3637 }
3638
3639 fn create_transient_task(
3640 &self,
3641 task_type: TransientTaskType,
3642 _turbo_tasks: &TurboTasks<Self>,
3643 ) -> TaskId {
3644 self.0.create_transient_task(task_type)
3645 }
3646
3647 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &TurboTasks<Self>) {
3648 self.0.dispose_root_task(task_id, turbo_tasks);
3649 }
3650
3651 fn task_statistics(&self) -> &TaskStatisticsApi {
3652 &self.0.task_statistics
3653 }
3654
3655 fn is_tracking_dependencies(&self) -> bool {
3656 self.0.options.dependency_tracking
3657 }
3658
3659 fn get_task_name(&self, task: TaskId, turbo_tasks: &TurboTasks<Self>) -> String {
3660 self.0.get_task_name(task, turbo_tasks)
3661 }
3662}
3663
3664enum DebugTraceTransientTask {
3665 Cached {
3666 task_name: &'static str,
3667 cell_type_id: Option<ValueTypeId>,
3668 cause_self: Option<Box<DebugTraceTransientTask>>,
3669 cause_args: Vec<DebugTraceTransientTask>,
3670 },
3671 Collapsed {
3673 task_name: &'static str,
3674 cell_type_id: Option<ValueTypeId>,
3675 },
3676 Uncached {
3677 cell_type_id: Option<ValueTypeId>,
3678 },
3679}
3680
3681impl DebugTraceTransientTask {
3682 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3683 let indent = " ".repeat(level);
3684 f.write_str(&indent)?;
3685
3686 fn fmt_cell_type_id(
3687 f: &mut fmt::Formatter<'_>,
3688 cell_type_id: Option<ValueTypeId>,
3689 ) -> fmt::Result {
3690 if let Some(ty) = cell_type_id {
3691 write!(
3692 f,
3693 " (read cell of type {})",
3694 get_value_type(ty).ty.global_name
3695 )
3696 } else {
3697 Ok(())
3698 }
3699 }
3700
3701 match self {
3703 Self::Cached {
3704 task_name,
3705 cell_type_id,
3706 ..
3707 }
3708 | Self::Collapsed {
3709 task_name,
3710 cell_type_id,
3711 ..
3712 } => {
3713 f.write_str(task_name)?;
3714 fmt_cell_type_id(f, *cell_type_id)?;
3715 if matches!(self, Self::Collapsed { .. }) {
3716 f.write_str(" (collapsed)")?;
3717 }
3718 }
3719 Self::Uncached { cell_type_id } => {
3720 f.write_str("unknown transient task")?;
3721 fmt_cell_type_id(f, *cell_type_id)?;
3722 }
3723 }
3724 f.write_char('\n')?;
3725
3726 if let Self::Cached {
3728 cause_self,
3729 cause_args,
3730 ..
3731 } = self
3732 {
3733 if let Some(c) = cause_self {
3734 writeln!(f, "{indent} self:")?;
3735 c.fmt_indented(f, level + 1)?;
3736 }
3737 if !cause_args.is_empty() {
3738 writeln!(f, "{indent} args:")?;
3739 for c in cause_args {
3740 c.fmt_indented(f, level + 1)?;
3741 }
3742 }
3743 }
3744 Ok(())
3745 }
3746}
3747
3748impl fmt::Display for DebugTraceTransientTask {
3749 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3750 self.fmt_indented(f, 0)
3751 }
3752}
3753
3754fn far_future() -> Instant {
3756 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3761}
3762
3763fn encode_task_data(
3775 task: TaskId,
3776 data: &TaskStorage,
3777 category: SpecificTaskDataCategory,
3778 scratch_buffer: &mut TurboBincodeBuffer,
3779) -> Result<TurboBincodeBuffer> {
3780 scratch_buffer.clear();
3781 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3782 data.encode(category, &mut encoder)?;
3783
3784 if cfg!(feature = "verify_serialization") {
3785 TaskStorage::new()
3786 .decode(
3787 category,
3788 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3789 )
3790 .with_context(|| {
3791 format!(
3792 "expected to be able to decode serialized data for '{category:?}' information \
3793 for {task}"
3794 )
3795 })?;
3796 }
3797 Ok(SmallVec::from_slice(scratch_buffer))
3798}