1mod cell_data;
2mod counter_map;
3mod operation;
4mod snapshot_coordinator;
5mod storage;
6pub mod storage_schema;
7
8use std::{
9 borrow::Cow,
10 fmt::{self, Write},
11 future::Future,
12 hash::BuildHasherDefault,
13 mem::take,
14 pin::Pin,
15 sync::{
16 Arc, LazyLock,
17 atomic::{AtomicBool, AtomicU64, Ordering},
18 },
19 time::SystemTime,
20};
21
22use anyhow::{Context, Result, bail};
23use auto_hash_map::{AutoMap, AutoSet};
24use indexmap::IndexSet;
25use parking_lot::Mutex;
26use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
27use smallvec::{SmallVec, smallvec};
28use tokio::time::{Duration, Instant};
29use tracing::{Span, trace_span};
30use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
31use turbo_tasks::{
32 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
33 ReadOutputOptions, ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT,
34 TaskExecutionReason, TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi,
35 TurboTasksPanic, ValueTypeId,
36 backend::{
37 Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
38 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
39 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
40 VerificationMode,
41 },
42 event::{Event, EventDescription, EventListener},
43 macro_helpers::NativeFunction,
44 message_queue::{TimingEvent, TraceEvent},
45 registry::get_value_type,
46 scope::scope_and_block,
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
50};
51
52pub use self::{
53 operation::AnyOperation,
54 storage::{SpecificTaskDataCategory, TaskDataCategory},
55};
56#[cfg(feature = "trace_task_dirty")]
57use crate::backend::operation::TaskDirtyCause;
58use crate::{
59 backend::{
60 operation::{
61 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
62 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
63 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
64 connect_children, get_aggregation_number, get_uppers, is_root_node,
65 make_task_dirty_internal, prepare_new_children,
66 },
67 snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
68 storage::Storage,
69 storage_schema::{TaskStorage, TaskStorageAccessors},
70 },
71 backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
72 data::{
73 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
74 InProgressState, InProgressStateInner, OutputValue, TransientTask,
75 },
76 error::TaskError,
77 utils::{
78 dash_map_drop_contents::drop_contents,
79 dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
80 shard_amount::compute_shard_amount,
81 },
82};
83
84const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
88
89static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
92 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
93 .ok()
94 .and_then(|v| v.parse::<u64>().ok())
95 .map(Duration::from_millis)
96 .unwrap_or(Duration::from_secs(2))
97});
98
99pub enum StorageMode {
100 ReadOnly,
102 ReadWrite,
105 ReadWriteOnShutdown,
108}
109
110pub struct BackendOptions {
111 pub dependency_tracking: bool,
116
117 pub active_tracking: bool,
123
124 pub storage_mode: Option<StorageMode>,
126
127 pub num_workers: Option<usize>,
130
131 pub small_preallocation: bool,
133}
134
135impl Default for BackendOptions {
136 fn default() -> Self {
137 Self {
138 dependency_tracking: true,
139 active_tracking: true,
140 storage_mode: Some(StorageMode::ReadWrite),
141 num_workers: None,
142 small_preallocation: false,
143 }
144 }
145}
146
147pub enum TurboTasksBackendJob {
148 InitialSnapshot,
149 FollowUpSnapshot,
150}
151
152pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
153
154struct TurboTasksBackendInner<B: BackingStorage> {
155 options: BackendOptions,
156
157 start_time: Instant,
158
159 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
160 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
161
162 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
163
164 storage: Storage,
165
166 snapshot_coord: SnapshotCoordinator,
169 snapshot_in_progress: Mutex<()>,
174 last_snapshot: AtomicU64,
176
177 stopping: AtomicBool,
178 stopping_event: Event,
179 idle_start_event: Event,
180 idle_end_event: Event,
181 #[cfg(feature = "verify_aggregation_graph")]
182 is_idle: AtomicBool,
183
184 task_statistics: TaskStatisticsApi,
185
186 backing_storage: B,
187
188 #[cfg(feature = "verify_aggregation_graph")]
189 root_tasks: Mutex<FxHashSet<TaskId>>,
190}
191
192impl<B: BackingStorage> TurboTasksBackend<B> {
193 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
194 Self(Arc::new(TurboTasksBackendInner::new(
195 options,
196 backing_storage,
197 )))
198 }
199
200 pub fn backing_storage(&self) -> &B {
201 &self.0.backing_storage
202 }
203}
204
205impl<B: BackingStorage> TurboTasksBackendInner<B> {
206 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
207 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
208 if !options.dependency_tracking {
209 options.active_tracking = false;
210 }
211 let small_preallocation = options.small_preallocation;
212 let next_task_id = backing_storage
213 .next_free_task_id()
214 .expect("Failed to get task id");
215 Self {
216 options,
217 start_time: Instant::now(),
218 persisted_task_id_factory: IdFactoryWithReuse::new(
219 next_task_id,
220 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
221 ),
222 transient_task_id_factory: IdFactoryWithReuse::new(
223 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
224 TaskId::MAX,
225 ),
226 task_cache: FxDashMap::default(),
227 storage: Storage::new(shard_amount, small_preallocation),
228 snapshot_coord: SnapshotCoordinator::new(),
229 snapshot_in_progress: Mutex::new(()),
230 last_snapshot: AtomicU64::new(0),
231 stopping: AtomicBool::new(false),
232 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
233 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
234 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
235 #[cfg(feature = "verify_aggregation_graph")]
236 is_idle: AtomicBool::new(false),
237 task_statistics: TaskStatisticsApi::default(),
238 backing_storage,
239 #[cfg(feature = "verify_aggregation_graph")]
240 root_tasks: Default::default(),
241 }
242 }
243
244 fn execute_context<'a>(
245 &'a self,
246 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
247 ) -> impl ExecuteContext<'a> {
248 ExecuteContextImpl::new(self, turbo_tasks)
249 }
250
251 fn suspending_requested(&self) -> bool {
252 self.should_persist() && self.snapshot_coord.snapshot_pending()
253 }
254
255 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
256 if self.should_persist() {
257 self.snapshot_coord.suspend_point(suspend);
258 }
259 }
260
261 pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
262 if !self.should_persist() {
263 return OperationGuard::noop();
264 }
265 self.snapshot_coord.begin_operation()
266 }
267
268 fn should_persist(&self) -> bool {
269 matches!(
270 self.options.storage_mode,
271 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
272 )
273 }
274
275 fn should_restore(&self) -> bool {
276 self.options.storage_mode.is_some()
277 }
278
279 fn should_track_dependencies(&self) -> bool {
280 self.options.dependency_tracking
281 }
282
283 fn should_track_activeness(&self) -> bool {
284 self.options.active_tracking
285 }
286
287 fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
288 self.task_statistics
289 .map(|stats| stats.increment_cache_hit(native_fn));
290 }
291
292 fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
293 self.task_statistics
294 .map(|stats| stats.increment_cache_miss(native_fn));
295 }
296
297 fn task_error_to_turbo_tasks_execution_error(
302 &self,
303 error: &TaskError,
304 ctx: &mut impl ExecuteContext<'_>,
305 ) -> TurboTasksExecutionError {
306 match error {
307 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
308 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
309 message: item.message.clone(),
310 source: item
311 .source
312 .as_ref()
313 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
314 })),
315 TaskError::LocalTaskContext(local_task_context) => {
316 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
317 name: local_task_context.name.clone(),
318 source: local_task_context
319 .source
320 .as_ref()
321 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
322 }))
323 }
324 TaskError::TaskChain(chain) => {
325 let task_id = chain.last().unwrap();
326 let error = {
327 let task = ctx.task(*task_id, TaskDataCategory::Meta);
328 if let Some(OutputValue::Error(error)) = task.get_output() {
329 Some(error.clone())
330 } else {
331 None
332 }
333 };
334 let error = error.map_or_else(
335 || {
336 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
338 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
339 "Error no longer available",
340 )),
341 location: None,
342 }))
343 },
344 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
345 );
346 let mut current_error = error;
347 for &task_id in chain.iter().rev() {
348 current_error =
349 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
350 task_id,
351 source: Some(current_error),
352 turbo_tasks: ctx.turbo_tasks(),
353 }));
354 }
355 current_error
356 }
357 }
358 }
359}
360
361struct TaskExecutionCompletePrepareResult {
363 pub new_children: FxHashSet<TaskId>,
364 pub is_now_immutable: bool,
365 #[cfg(feature = "verify_determinism")]
366 pub no_output_set: bool,
367 pub new_output: Option<OutputValue>,
368 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
369 pub is_recomputation: bool,
370}
371
372impl<B: BackingStorage> TurboTasksBackendInner<B> {
374 fn connect_child(
375 &self,
376 parent_task: Option<TaskId>,
377 child_task: TaskId,
378 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
379 ) {
380 operation::ConnectChildOperation::run(
381 parent_task,
382 child_task,
383 self.execute_context(turbo_tasks),
384 );
385 }
386
387 fn try_read_task_output(
388 self: &Arc<Self>,
389 task_id: TaskId,
390 reader: Option<TaskId>,
391 options: ReadOutputOptions,
392 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
393 ) -> Result<Result<RawVc, EventListener>> {
394 self.assert_not_persistent_calling_transient(reader, task_id, None);
395
396 let mut ctx = self.execute_context(turbo_tasks);
397 let need_reader_task = if self.should_track_dependencies()
398 && !matches!(options.tracking, ReadTracking::Untracked)
399 && reader.is_some_and(|reader_id| reader_id != task_id)
400 && let Some(reader_id) = reader
401 && reader_id != task_id
402 {
403 Some(reader_id)
404 } else {
405 None
406 };
407 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
408 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
412 (task, Some(reader))
413 } else {
414 (ctx.task(task_id, TaskDataCategory::All), None)
415 };
416
417 fn listen_to_done_event(
418 reader_description: Option<EventDescription>,
419 tracking: ReadTracking,
420 done_event: &Event,
421 ) -> EventListener {
422 done_event.listen_with_note(move || {
423 move || {
424 if let Some(reader_description) = reader_description.as_ref() {
425 format!(
426 "try_read_task_output from {} ({})",
427 reader_description, tracking
428 )
429 } else {
430 format!("try_read_task_output ({})", tracking)
431 }
432 }
433 })
434 }
435
436 fn check_in_progress(
437 task: &impl TaskGuard,
438 reader_description: Option<EventDescription>,
439 tracking: ReadTracking,
440 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
441 {
442 match task.get_in_progress() {
443 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
444 listen_to_done_event(reader_description, tracking, done_event),
445 ))),
446 Some(InProgressState::InProgress(box InProgressStateInner {
447 done_event, ..
448 })) => Some(Ok(Err(listen_to_done_event(
449 reader_description,
450 tracking,
451 done_event,
452 )))),
453 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
454 "{} was canceled",
455 task.get_task_description()
456 ))),
457 None => None,
458 }
459 }
460
461 if matches!(options.consistency, ReadConsistency::Strong) {
462 loop {
464 let aggregation_number = get_aggregation_number(&task);
465 if is_root_node(aggregation_number) {
466 break;
467 }
468 drop(task);
469 drop(reader_task);
470 {
471 let _span = tracing::trace_span!(
472 "make root node for strongly consistent read",
473 task = self.debug_get_task_description(task_id)
474 )
475 .entered();
476 AggregationUpdateQueue::run(
477 AggregationUpdateJob::UpdateAggregationNumber {
478 task_id,
479 base_aggregation_number: u32::MAX,
480 distance: None,
481 },
482 &mut ctx,
483 );
484 }
485 (task, reader_task) = if let Some(reader_id) = need_reader_task {
486 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
488 (task, Some(reader))
489 } else {
490 (ctx.task(task_id, TaskDataCategory::All), None)
491 }
492 }
493
494 let is_dirty = task.is_dirty();
495
496 let has_dirty_containers = task.has_dirty_containers();
498 if has_dirty_containers || is_dirty.is_some() {
499 let activeness = task.get_activeness_mut();
500 let mut task_ids_to_schedule: Vec<_> = Vec::new();
501 let activeness = if let Some(activeness) = activeness {
503 activeness.set_active_until_clean();
507 activeness
508 } else {
509 if ctx.should_track_activeness() {
513 task_ids_to_schedule = task.dirty_containers().collect();
515 task_ids_to_schedule.push(task_id);
516 }
517 let activeness =
518 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
519 activeness.set_active_until_clean();
520 activeness
521 };
522 let listener = activeness.all_clean_event.listen_with_note(move || {
523 let this = self.clone();
524 let tt = turbo_tasks.pin();
525 move || {
526 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
527 let mut ctx = this.execute_context(tt);
528 let mut visited = FxHashSet::default();
529 fn indent(s: &str) -> String {
530 s.split_inclusive('\n')
531 .flat_map(|line: &str| [" ", line].into_iter())
532 .collect::<String>()
533 }
534 fn get_info(
535 ctx: &mut impl ExecuteContext<'_>,
536 task_id: TaskId,
537 parent_and_count: Option<(TaskId, i32)>,
538 visited: &mut FxHashSet<TaskId>,
539 ) -> String {
540 let task = ctx.task(task_id, TaskDataCategory::All);
541 let is_dirty = task.is_dirty();
542 let in_progress =
543 task.get_in_progress()
544 .map_or("not in progress", |p| match p {
545 InProgressState::InProgress(_) => "in progress",
546 InProgressState::Scheduled { .. } => "scheduled",
547 InProgressState::Canceled => "canceled",
548 });
549 let activeness = task.get_activeness().map_or_else(
550 || "not active".to_string(),
551 |activeness| format!("{activeness:?}"),
552 );
553 let aggregation_number = get_aggregation_number(&task);
554 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
555 {
556 let uppers = get_uppers(&task);
557 !uppers.contains(&parent_task_id)
558 } else {
559 false
560 };
561
562 let has_dirty_containers = task.has_dirty_containers();
564
565 let task_description = task.get_task_description();
566 let is_dirty_label = if let Some(parent_priority) = is_dirty {
567 format!(", dirty({parent_priority})")
568 } else {
569 String::new()
570 };
571 let has_dirty_containers_label = if has_dirty_containers {
572 ", dirty containers"
573 } else {
574 ""
575 };
576 let count = if let Some((_, count)) = parent_and_count {
577 format!(" {count}")
578 } else {
579 String::new()
580 };
581 let mut info = format!(
582 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
583 {in_progress}, \
584 {activeness}{is_dirty_label}{has_dirty_containers_label})",
585 );
586 let children: Vec<_> = task.dirty_containers_with_count().collect();
587 drop(task);
588
589 if missing_upper {
590 info.push_str("\n ERROR: missing upper connection");
591 }
592
593 if has_dirty_containers || !children.is_empty() {
594 writeln!(info, "\n dirty tasks:").unwrap();
595
596 for (child_task_id, count) in children {
597 let task_description = ctx
598 .task(child_task_id, TaskDataCategory::Data)
599 .get_task_description();
600 if visited.insert(child_task_id) {
601 let child_info = get_info(
602 ctx,
603 child_task_id,
604 Some((task_id, count)),
605 visited,
606 );
607 info.push_str(&indent(&child_info));
608 if !info.ends_with('\n') {
609 info.push('\n');
610 }
611 } else {
612 writeln!(
613 info,
614 " {child_task_id} {task_description} {count} \
615 (already visited)"
616 )
617 .unwrap();
618 }
619 }
620 }
621 info
622 }
623 let info = get_info(&mut ctx, task_id, None, &mut visited);
624 format!(
625 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
626 )
627 }
628 });
629 drop(reader_task);
630 drop(task);
631 if !task_ids_to_schedule.is_empty() {
632 let mut queue = AggregationUpdateQueue::new();
633 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
634 queue.execute(&mut ctx);
635 }
636
637 return Ok(Err(listener));
638 }
639 }
640
641 let reader_description = reader_task
642 .as_ref()
643 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
644 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
645 {
646 return value;
647 }
648
649 if let Some(output) = task.get_output() {
650 let result = match output {
651 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
652 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
653 OutputValue::Error(error) => Err(error.clone()),
654 };
655 if let Some(mut reader_task) = reader_task.take()
656 && options.tracking.should_track(result.is_err())
657 && (!task.immutable() || cfg!(feature = "verify_immutable"))
658 {
659 #[cfg(feature = "trace_task_output_dependencies")]
660 let _span = tracing::trace_span!(
661 "add output dependency",
662 task = %task_id,
663 dependent_task = ?reader
664 )
665 .entered();
666 let mut queue = LeafDistanceUpdateQueue::new();
667 let reader = reader.unwrap();
668 if task.add_output_dependent(reader) {
669 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
671 let reader_leaf_distance =
672 reader_task.get_leaf_distance().copied().unwrap_or_default();
673 if reader_leaf_distance.distance <= leaf_distance.distance {
674 queue.push(
675 reader,
676 leaf_distance.distance,
677 leaf_distance.max_distance_in_buffer,
678 );
679 }
680 }
681
682 drop(task);
683
684 if !reader_task.remove_outdated_output_dependencies(&task_id) {
690 let _ = reader_task.add_output_dependencies(task_id);
691 }
692 drop(reader_task);
693
694 queue.execute(&mut ctx);
695 } else {
696 drop(task);
697 }
698
699 return result.map_err(|error| {
700 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
701 .with_task_context(task_id, turbo_tasks.pin())
702 .into()
703 });
704 }
705 drop(reader_task);
706
707 let note = EventDescription::new(|| {
708 move || {
709 if let Some(reader) = reader_description.as_ref() {
710 format!("try_read_task_output (recompute) from {reader}",)
711 } else {
712 "try_read_task_output (recompute, untracked)".to_string()
713 }
714 }
715 });
716
717 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
719 TaskExecutionReason::OutputNotAvailable,
720 EventDescription::new(|| task.get_task_desc_fn()),
721 note,
722 );
723
724 let old = task.set_in_progress(in_progress_state);
727 debug_assert!(old.is_none(), "InProgress already exists");
728 ctx.schedule_task(task, TaskPriority::Initial);
729
730 Ok(Err(listener))
731 }
732
733 fn try_read_task_cell(
734 &self,
735 task_id: TaskId,
736 reader: Option<TaskId>,
737 cell: CellId,
738 options: ReadCellOptions,
739 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
740 ) -> Result<Result<TypedCellContent, EventListener>> {
741 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
742
743 fn add_cell_dependency(
744 task_id: TaskId,
745 mut task: impl TaskGuard,
746 reader: Option<TaskId>,
747 reader_task: Option<impl TaskGuard>,
748 cell: CellId,
749 key: Option<u64>,
750 ) {
751 if let Some(mut reader_task) = reader_task
752 && (!task.immutable() || cfg!(feature = "verify_immutable"))
753 {
754 let reader = reader.unwrap();
755 let _ = task.add_cell_dependents((cell, key, reader));
756 drop(task);
757
758 let target = CellRef {
764 task: task_id,
765 cell,
766 };
767 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
768 let _ = reader_task.add_cell_dependencies((target, key));
769 }
770 drop(reader_task);
771 }
772 }
773
774 let ReadCellOptions {
775 tracking,
776 final_read_hint,
777 } = options;
778
779 let mut ctx = self.execute_context(turbo_tasks);
780 let (mut task, reader_task) = if self.should_track_dependencies()
781 && !matches!(tracking, ReadCellTracking::Untracked)
782 && let Some(reader_id) = reader
783 && reader_id != task_id
784 {
785 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
789 (task, Some(reader))
790 } else {
791 (ctx.task(task_id, TaskDataCategory::All), None)
792 };
793
794 let content = if final_read_hint {
795 task.remove_cell_data(&cell)
796 } else {
797 task.get_cell_data(&cell).cloned()
798 };
799 if let Some(content) = content {
800 if tracking.should_track(false) {
801 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
802 }
803 return Ok(Ok(TypedCellContent(
804 cell.type_id,
805 CellContent(Some(content)),
806 )));
807 }
808
809 let in_progress = task.get_in_progress();
810 if matches!(
811 in_progress,
812 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
813 ) {
814 return Ok(Err(self
815 .listen_to_cell(&mut task, task_id, &reader_task, cell)
816 .0));
817 }
818 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
819
820 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
822 let Some(max_id) = max_id else {
823 let task_desc = task.get_task_description();
824 if tracking.should_track(true) {
825 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
826 }
827 bail!(
828 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
829 );
830 };
831 if cell.index >= max_id {
832 let task_desc = task.get_task_description();
833 if tracking.should_track(true) {
834 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
835 }
836 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
837 }
838
839 if is_cancelled {
845 bail!("{} was canceled", task.get_task_description());
846 }
847
848 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
850 drop(reader_task);
851 if !new_listener {
852 return Ok(Err(listener));
853 }
854
855 let _span = tracing::trace_span!(
856 "recomputation",
857 cell_type = get_value_type(cell.type_id).ty.global_name,
858 cell_index = cell.index
859 )
860 .entered();
861
862 let _ = task.add_scheduled(
863 TaskExecutionReason::CellNotAvailable,
864 EventDescription::new(|| task.get_task_desc_fn()),
865 );
866 ctx.schedule_task(task, TaskPriority::Initial);
867
868 Ok(Err(listener))
869 }
870
871 fn listen_to_cell(
872 &self,
873 task: &mut impl TaskGuard,
874 task_id: TaskId,
875 reader_task: &Option<impl TaskGuard>,
876 cell: CellId,
877 ) -> (EventListener, bool) {
878 let note = || {
879 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
880 move || {
881 if let Some(reader_desc) = reader_desc.as_ref() {
882 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
883 } else {
884 "try_read_task_cell (in progress, untracked)".to_string()
885 }
886 }
887 };
888 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
889 let listener = in_progress.event.listen_with_note(note);
891 return (listener, false);
892 }
893 let in_progress = InProgressCellState::new(task_id, cell);
894 let listener = in_progress.event.listen_with_note(note);
895 let old = task.insert_in_progress_cells(cell, in_progress);
896 debug_assert!(old.is_none(), "InProgressCell already exists");
897 (listener, true)
898 }
899
900 fn snapshot_and_persist(
901 &self,
902 parent_span: Option<tracing::Id>,
903 reason: &str,
904 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
905 ) -> Result<(Instant, bool), anyhow::Error> {
906 let snapshot_span =
907 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
908 .entered();
909 let _snapshot_in_progress = self.snapshot_in_progress.lock();
913 let start = Instant::now();
914 let wall_start = SystemTime::now();
918 debug_assert!(self.should_persist());
919
920 let mut snapshot_phase = {
921 let _span = tracing::info_span!("blocking").entered();
922 self.snapshot_coord.begin_snapshot()
923 };
924 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
927
928 let suspended_operations = snapshot_phase.take_suspended_operations();
929
930 let snapshot_time = Instant::now();
931 drop(snapshot_phase);
932
933 if !has_modifications {
934 drop(snapshot_guard);
937 return Ok((start, false));
938 }
939
940 #[cfg(feature = "print_cache_item_size")]
941 #[derive(Default)]
942 struct TaskCacheStats {
943 data: usize,
944 #[cfg(feature = "print_cache_item_size_with_compressed")]
945 data_compressed: usize,
946 data_count: usize,
947 meta: usize,
948 #[cfg(feature = "print_cache_item_size_with_compressed")]
949 meta_compressed: usize,
950 meta_count: usize,
951 upper_count: usize,
952 collectibles_count: usize,
953 aggregated_collectibles_count: usize,
954 children_count: usize,
955 followers_count: usize,
956 collectibles_dependents_count: usize,
957 aggregated_dirty_containers_count: usize,
958 output_size: usize,
959 }
960 #[cfg(feature = "print_cache_item_size")]
963 struct FormatSizes {
964 size: usize,
965 #[cfg(feature = "print_cache_item_size_with_compressed")]
966 compressed_size: usize,
967 }
968 #[cfg(feature = "print_cache_item_size")]
969 impl std::fmt::Display for FormatSizes {
970 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
971 use turbo_tasks::util::FormatBytes;
972 #[cfg(feature = "print_cache_item_size_with_compressed")]
973 {
974 write!(
975 f,
976 "{} ({} compressed)",
977 FormatBytes(self.size),
978 FormatBytes(self.compressed_size)
979 )
980 }
981 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
982 {
983 write!(f, "{}", FormatBytes(self.size))
984 }
985 }
986 }
987 #[cfg(feature = "print_cache_item_size")]
988 impl TaskCacheStats {
989 #[cfg(feature = "print_cache_item_size_with_compressed")]
990 fn compressed_size(data: &[u8]) -> Result<usize> {
991 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
992 data,
993 &mut Vec::new(),
994 lzzzz::lz4::ACC_LEVEL_DEFAULT,
995 )?)
996 }
997
998 fn add_data(&mut self, data: &[u8]) {
999 self.data += data.len();
1000 #[cfg(feature = "print_cache_item_size_with_compressed")]
1001 {
1002 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1003 }
1004 self.data_count += 1;
1005 }
1006
1007 fn add_meta(&mut self, data: &[u8]) {
1008 self.meta += data.len();
1009 #[cfg(feature = "print_cache_item_size_with_compressed")]
1010 {
1011 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1012 }
1013 self.meta_count += 1;
1014 }
1015
1016 fn add_counts(&mut self, storage: &TaskStorage) {
1017 let counts = storage.meta_counts();
1018 self.upper_count += counts.upper;
1019 self.collectibles_count += counts.collectibles;
1020 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1021 self.children_count += counts.children;
1022 self.followers_count += counts.followers;
1023 self.collectibles_dependents_count += counts.collectibles_dependents;
1024 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1025 if let Some(output) = storage.get_output() {
1026 use turbo_bincode::turbo_bincode_encode;
1027
1028 self.output_size += turbo_bincode_encode(&output)
1029 .map(|data| data.len())
1030 .unwrap_or(0);
1031 }
1032 }
1033
1034 fn task_name(storage: &TaskStorage) -> String {
1036 storage
1037 .get_persistent_task_type()
1038 .map(|t| t.to_string())
1039 .unwrap_or_else(|| "<unknown>".to_string())
1040 }
1041
1042 fn sort_key(&self) -> usize {
1045 #[cfg(feature = "print_cache_item_size_with_compressed")]
1046 {
1047 self.data_compressed + self.meta_compressed
1048 }
1049 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1050 {
1051 self.data + self.meta
1052 }
1053 }
1054
1055 fn format_total(&self) -> FormatSizes {
1056 FormatSizes {
1057 size: self.data + self.meta,
1058 #[cfg(feature = "print_cache_item_size_with_compressed")]
1059 compressed_size: self.data_compressed + self.meta_compressed,
1060 }
1061 }
1062
1063 fn format_data(&self) -> FormatSizes {
1064 FormatSizes {
1065 size: self.data,
1066 #[cfg(feature = "print_cache_item_size_with_compressed")]
1067 compressed_size: self.data_compressed,
1068 }
1069 }
1070
1071 fn format_avg_data(&self) -> FormatSizes {
1072 FormatSizes {
1073 size: self.data.checked_div(self.data_count).unwrap_or(0),
1074 #[cfg(feature = "print_cache_item_size_with_compressed")]
1075 compressed_size: self
1076 .data_compressed
1077 .checked_div(self.data_count)
1078 .unwrap_or(0),
1079 }
1080 }
1081
1082 fn format_meta(&self) -> FormatSizes {
1083 FormatSizes {
1084 size: self.meta,
1085 #[cfg(feature = "print_cache_item_size_with_compressed")]
1086 compressed_size: self.meta_compressed,
1087 }
1088 }
1089
1090 fn format_avg_meta(&self) -> FormatSizes {
1091 FormatSizes {
1092 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1093 #[cfg(feature = "print_cache_item_size_with_compressed")]
1094 compressed_size: self
1095 .meta_compressed
1096 .checked_div(self.meta_count)
1097 .unwrap_or(0),
1098 }
1099 }
1100 }
1101 #[cfg(feature = "print_cache_item_size")]
1102 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1103 Mutex::new(FxHashMap::default());
1104
1105 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1112 let encode_category = |task_id: TaskId,
1113 data: &TaskStorage,
1114 category: SpecificTaskDataCategory,
1115 buffer: &mut TurboBincodeBuffer|
1116 -> Option<TurboBincodeBuffer> {
1117 match encode_task_data(task_id, data, category, buffer) {
1118 Ok(encoded) => {
1119 #[cfg(feature = "print_cache_item_size")]
1120 {
1121 let mut stats = task_cache_stats.lock();
1122 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1123 match category {
1124 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1125 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1126 }
1127 }
1128 Some(encoded)
1129 }
1130 Err(err) => {
1131 panic!(
1132 "Serializing task {} failed ({:?}): {:?}",
1133 self.debug_get_task_description(task_id),
1134 category,
1135 err
1136 );
1137 }
1138 }
1139 };
1140 if task_id.is_transient() {
1141 unreachable!("transient task_ids should never be enqueued to be persisted");
1142 }
1143
1144 let encode_meta = inner.flags.meta_modified();
1145 let encode_data = inner.flags.data_modified();
1146
1147 #[cfg(feature = "print_cache_item_size")]
1148 if encode_data || encode_meta {
1149 task_cache_stats
1150 .lock()
1151 .entry(TaskCacheStats::task_name(inner))
1152 .or_default()
1153 .add_counts(inner);
1154 }
1155
1156 let meta = if encode_meta {
1157 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1158 } else {
1159 None
1160 };
1161
1162 let data = if encode_data {
1163 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1164 } else {
1165 None
1166 };
1167 let task_type_hash = if inner.flags.new_task() {
1168 let task_type = inner.get_persistent_task_type().expect(
1169 "It is not possible for a new_task to not have a persistent_task_type. Task \
1170 creation for persistent tasks uses a single ExecutionContextImpl for \
1171 creating the task (which sets new_task) and connect_child (which sets \
1172 persistent_task_type) and take_snapshot waits for all operations to complete \
1173 or suspend before we start snapshotting. So task creation will always set \
1174 the task_type.",
1175 );
1176 Some(compute_task_type_hash(task_type))
1177 } else {
1178 None
1179 };
1180
1181 SnapshotItem {
1182 task_id,
1183 meta,
1184 data,
1185 task_type_hash,
1186 }
1187 };
1188
1189 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1190
1191 drop(snapshot_span);
1192 let snapshot_duration = start.elapsed();
1193 let task_count = task_snapshots.len();
1194
1195 if task_snapshots.is_empty() {
1196 std::hint::cold_path();
1199 return Ok((snapshot_time, false));
1200 }
1201
1202 let persist_start = Instant::now();
1203 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1204 {
1205 self.backing_storage
1209 .save_snapshot(suspended_operations, task_snapshots)?;
1210 #[cfg(feature = "print_cache_item_size")]
1211 {
1212 let mut task_cache_stats = task_cache_stats
1213 .into_inner()
1214 .into_iter()
1215 .collect::<Vec<_>>();
1216 if !task_cache_stats.is_empty() {
1217 use turbo_tasks::util::FormatBytes;
1218
1219 use crate::utils::markdown_table::print_markdown_table;
1220
1221 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1222 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1223 });
1224
1225 println!(
1226 "Task cache stats: {}",
1227 FormatSizes {
1228 size: task_cache_stats
1229 .iter()
1230 .map(|(_, s)| s.data + s.meta)
1231 .sum::<usize>(),
1232 #[cfg(feature = "print_cache_item_size_with_compressed")]
1233 compressed_size: task_cache_stats
1234 .iter()
1235 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1236 .sum::<usize>()
1237 },
1238 );
1239
1240 print_markdown_table(
1241 [
1242 "Task",
1243 " Total Size",
1244 " Data Size",
1245 " Data Count x Avg",
1246 " Data Count x Avg",
1247 " Meta Size",
1248 " Meta Count x Avg",
1249 " Meta Count x Avg",
1250 " Uppers",
1251 " Coll",
1252 " Agg Coll",
1253 " Children",
1254 " Followers",
1255 " Coll Deps",
1256 " Agg Dirty",
1257 " Output Size",
1258 ],
1259 task_cache_stats.iter(),
1260 |(task_desc, stats)| {
1261 [
1262 task_desc.to_string(),
1263 format!(" {}", stats.format_total()),
1264 format!(" {}", stats.format_data()),
1265 format!(" {} x", stats.data_count),
1266 format!("{}", stats.format_avg_data()),
1267 format!(" {}", stats.format_meta()),
1268 format!(" {} x", stats.meta_count),
1269 format!("{}", stats.format_avg_meta()),
1270 format!(" {}", stats.upper_count),
1271 format!(" {}", stats.collectibles_count),
1272 format!(" {}", stats.aggregated_collectibles_count),
1273 format!(" {}", stats.children_count),
1274 format!(" {}", stats.followers_count),
1275 format!(" {}", stats.collectibles_dependents_count),
1276 format!(" {}", stats.aggregated_dirty_containers_count),
1277 format!(" {}", FormatBytes(stats.output_size)),
1278 ]
1279 },
1280 );
1281 }
1282 }
1283 }
1284
1285 let elapsed = start.elapsed();
1286 let persist_duration = persist_start.elapsed();
1287 if elapsed > Duration::from_secs(10) {
1289 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1290 "Finished writing to filesystem cache".to_string(),
1291 elapsed,
1292 )));
1293 }
1294
1295 let wall_start_ms = wall_start
1296 .duration_since(SystemTime::UNIX_EPOCH)
1297 .unwrap_or_default()
1298 .as_secs_f64()
1300 * 1000.0;
1301 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1302 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1303 "turbopack-persistence",
1304 wall_start_ms,
1305 wall_end_ms,
1306 vec![
1307 ("reason", serde_json::Value::from(reason)),
1308 (
1309 "snapshot_duration_ms",
1310 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1311 ),
1312 (
1313 "persist_duration_ms",
1314 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1315 ),
1316 ("task_count", serde_json::Value::from(task_count)),
1317 ],
1318 )));
1319
1320 Ok((snapshot_time, true))
1321 }
1322
1323 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1324 if self.should_restore() {
1325 let uncompleted_operations = self
1329 .backing_storage
1330 .uncompleted_operations()
1331 .expect("Failed to get uncompleted operations");
1332 if !uncompleted_operations.is_empty() {
1333 let mut ctx = self.execute_context(turbo_tasks);
1334 for op in uncompleted_operations {
1335 op.execute(&mut ctx);
1336 }
1337 }
1338 }
1339
1340 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1343 let _span = trace_span!("persisting background job").entered();
1345 let _span = tracing::info_span!("thread").entered();
1346 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1347 }
1348 }
1349
1350 fn stopping(&self) {
1351 self.stopping.store(true, Ordering::Release);
1352 self.stopping_event.notify(usize::MAX);
1353 }
1354
1355 #[allow(unused_variables)]
1356 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1357 #[cfg(feature = "verify_aggregation_graph")]
1358 {
1359 self.is_idle.store(false, Ordering::Release);
1360 self.verify_aggregation_graph(turbo_tasks, false);
1361 }
1362 if self.should_persist()
1363 && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1364 {
1365 eprintln!("Persisting failed during shutdown: {err:?}");
1366 }
1367 drop_contents(&self.task_cache);
1368 self.storage.drop_contents();
1369 if let Err(err) = self.backing_storage.shutdown() {
1370 println!("Shutting down failed: {err}");
1371 }
1372 }
1373
1374 #[allow(unused_variables)]
1375 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1376 self.idle_start_event.notify(usize::MAX);
1377
1378 #[cfg(feature = "verify_aggregation_graph")]
1379 {
1380 use tokio::select;
1381
1382 self.is_idle.store(true, Ordering::Release);
1383 let this = self.clone();
1384 let turbo_tasks = turbo_tasks.pin();
1385 tokio::task::spawn(async move {
1386 select! {
1387 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1388 }
1390 _ = this.idle_end_event.listen() => {
1391 return;
1392 }
1393 }
1394 if !this.is_idle.load(Ordering::Relaxed) {
1395 return;
1396 }
1397 this.verify_aggregation_graph(&*turbo_tasks, true);
1398 });
1399 }
1400 }
1401
1402 fn idle_end(&self) {
1403 #[cfg(feature = "verify_aggregation_graph")]
1404 self.is_idle.store(false, Ordering::Release);
1405 self.idle_end_event.notify(usize::MAX);
1406 }
1407
1408 fn get_or_create_task(
1409 &self,
1410 native_fn: &'static NativeFunction,
1411 this: Option<RawVc>,
1412 arg: &mut dyn StackDynTaskInputs,
1413 parent_task: Option<TaskId>,
1414 persistence: TaskPersistence,
1415 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1416 ) -> TaskId {
1417 let transient = matches!(persistence, TaskPersistence::Transient);
1418
1419 if transient
1420 && let Some(parent_task) = parent_task
1421 && !parent_task.is_transient()
1422 {
1423 let task_type = CachedTaskType {
1424 native_fn,
1425 this,
1426 arg: arg.take_box(),
1427 };
1428 self.panic_persistent_calling_transient(
1429 self.debug_get_task_description(parent_task),
1430 Some(&task_type),
1431 None,
1432 );
1433 }
1434
1435 let is_root = native_fn.is_root;
1436
1437 let arg_ref = arg.as_ref();
1439 let hash = CachedTaskType::hash_from_components(
1440 self.task_cache.hasher(),
1441 native_fn,
1442 this,
1443 arg_ref,
1444 );
1445 let shard = get_shard(&self.task_cache, hash);
1449
1450 if let Some(task_id) =
1454 raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1455 {
1456 self.track_cache_hit_by_fn(native_fn);
1457 self.connect_child(parent_task, task_id, turbo_tasks);
1458 return task_id;
1459 }
1460
1461 let mut ctx = self.execute_context(turbo_tasks);
1463
1464 let mut is_new = false;
1465
1466 let task_id = if !transient
1469 && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1470 {
1471 self.track_cache_hit_by_fn(native_fn);
1472 match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| {
1475 k.eq_components(native_fn, this, arg_ref)
1476 }) {
1477 RawEntry::Occupied(_) => {}
1478 RawEntry::Vacant(e) => {
1479 e.insert(stored_type, task_id);
1480 }
1481 };
1482 task_id
1483 } else {
1484 match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| {
1485 k.eq_components(native_fn, this, arg_ref)
1486 }) {
1487 RawEntry::Occupied(e) => {
1488 let task_id = *e.get();
1491 drop(e);
1492 self.track_cache_hit_by_fn(native_fn);
1493 task_id
1494 }
1495 RawEntry::Vacant(e) => {
1496 let task_type = Arc::new(CachedTaskType {
1500 native_fn,
1501 this,
1502 arg: arg.take_box(),
1503 });
1504 let task_id = if transient {
1505 self.transient_task_id_factory.get()
1506 } else {
1507 self.persisted_task_id_factory.get()
1508 };
1509 self.storage
1513 .initialize_new_task(task_id, Some(task_type.clone()));
1514 e.insert(task_type, task_id);
1516 self.track_cache_miss_by_fn(native_fn);
1517 is_new = true;
1518 task_id
1519 }
1520 }
1521 };
1522
1523 if is_new && is_root {
1524 AggregationUpdateQueue::run(
1525 AggregationUpdateJob::UpdateAggregationNumber {
1526 task_id,
1527 base_aggregation_number: u32::MAX,
1528 distance: None,
1529 },
1530 &mut ctx,
1531 );
1532 }
1533
1534 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1535
1536 task_id
1537 }
1538
1539 fn debug_trace_transient_task(
1542 &self,
1543 task_type: &CachedTaskType,
1544 cell_id: Option<CellId>,
1545 ) -> DebugTraceTransientTask {
1546 fn inner_id(
1549 backend: &TurboTasksBackendInner<impl BackingStorage>,
1550 task_id: TaskId,
1551 cell_type_id: Option<ValueTypeId>,
1552 visited_set: &mut FxHashSet<TaskId>,
1553 ) -> DebugTraceTransientTask {
1554 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1555 if visited_set.contains(&task_id) {
1556 let task_name = task_type.get_name();
1557 DebugTraceTransientTask::Collapsed {
1558 task_name,
1559 cell_type_id,
1560 }
1561 } else {
1562 inner_cached(backend, &task_type, cell_type_id, visited_set)
1563 }
1564 } else {
1565 DebugTraceTransientTask::Uncached { cell_type_id }
1566 }
1567 }
1568 fn inner_cached(
1569 backend: &TurboTasksBackendInner<impl BackingStorage>,
1570 task_type: &CachedTaskType,
1571 cell_type_id: Option<ValueTypeId>,
1572 visited_set: &mut FxHashSet<TaskId>,
1573 ) -> DebugTraceTransientTask {
1574 let task_name = task_type.get_name();
1575
1576 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1577 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1578 return None;
1582 };
1583 if task_id.is_transient() {
1584 Some(Box::new(inner_id(
1585 backend,
1586 task_id,
1587 cause_self_raw_vc.try_get_type_id(),
1588 visited_set,
1589 )))
1590 } else {
1591 None
1592 }
1593 });
1594 let cause_args = task_type
1595 .arg
1596 .get_raw_vcs()
1597 .into_iter()
1598 .filter_map(|raw_vc| {
1599 let Some(task_id) = raw_vc.try_get_task_id() else {
1600 return None;
1602 };
1603 if !task_id.is_transient() {
1604 return None;
1605 }
1606 Some((task_id, raw_vc.try_get_type_id()))
1607 })
1608 .collect::<IndexSet<_>>() .into_iter()
1610 .map(|(task_id, cell_type_id)| {
1611 inner_id(backend, task_id, cell_type_id, visited_set)
1612 })
1613 .collect();
1614
1615 DebugTraceTransientTask::Cached {
1616 task_name,
1617 cell_type_id,
1618 cause_self,
1619 cause_args,
1620 }
1621 }
1622 inner_cached(
1623 self,
1624 task_type,
1625 cell_id.map(|c| c.type_id),
1626 &mut FxHashSet::default(),
1627 )
1628 }
1629
1630 fn invalidate_task(
1631 &self,
1632 task_id: TaskId,
1633 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1634 ) {
1635 if !self.should_track_dependencies() {
1636 panic!("Dependency tracking is disabled so invalidation is not allowed");
1637 }
1638 operation::InvalidateOperation::run(
1639 smallvec![task_id],
1640 #[cfg(feature = "trace_task_dirty")]
1641 TaskDirtyCause::Invalidator,
1642 self.execute_context(turbo_tasks),
1643 );
1644 }
1645
1646 fn invalidate_tasks(
1647 &self,
1648 tasks: &[TaskId],
1649 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1650 ) {
1651 if !self.should_track_dependencies() {
1652 panic!("Dependency tracking is disabled so invalidation is not allowed");
1653 }
1654 operation::InvalidateOperation::run(
1655 tasks.iter().copied().collect(),
1656 #[cfg(feature = "trace_task_dirty")]
1657 TaskDirtyCause::Unknown,
1658 self.execute_context(turbo_tasks),
1659 );
1660 }
1661
1662 fn invalidate_tasks_set(
1663 &self,
1664 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1665 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1666 ) {
1667 if !self.should_track_dependencies() {
1668 panic!("Dependency tracking is disabled so invalidation is not allowed");
1669 }
1670 operation::InvalidateOperation::run(
1671 tasks.iter().copied().collect(),
1672 #[cfg(feature = "trace_task_dirty")]
1673 TaskDirtyCause::Unknown,
1674 self.execute_context(turbo_tasks),
1675 );
1676 }
1677
1678 fn invalidate_serialization(
1679 &self,
1680 task_id: TaskId,
1681 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1682 ) {
1683 if task_id.is_transient() {
1684 return;
1685 }
1686 let mut ctx = self.execute_context(turbo_tasks);
1687 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1688 task.invalidate_serialization();
1689 }
1690
1691 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1692 let task = self.storage.access_mut(task_id);
1693 if let Some(value) = task.get_persistent_task_type() {
1694 format!("{task_id:?} {}", value)
1695 } else if let Some(value) = task.get_transient_task_type() {
1696 format!("{task_id:?} {}", value)
1697 } else {
1698 format!("{task_id:?} unknown")
1699 }
1700 }
1701
1702 fn get_task_name(
1703 &self,
1704 task_id: TaskId,
1705 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1706 ) -> String {
1707 let mut ctx = self.execute_context(turbo_tasks);
1708 let task = ctx.task(task_id, TaskDataCategory::Data);
1709 if let Some(value) = task.get_persistent_task_type() {
1710 value.to_string()
1711 } else if let Some(value) = task.get_transient_task_type() {
1712 value.to_string()
1713 } else {
1714 "unknown".to_string()
1715 }
1716 }
1717
1718 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1719 let task = self.storage.access_mut(task_id);
1720 task.get_persistent_task_type().cloned()
1721 }
1722
1723 fn task_execution_canceled(
1724 &self,
1725 task_id: TaskId,
1726 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1727 ) {
1728 let mut ctx = self.execute_context(turbo_tasks);
1729 let mut task = ctx.task(task_id, TaskDataCategory::All);
1730 if let Some(in_progress) = task.take_in_progress() {
1731 match in_progress {
1732 InProgressState::Scheduled {
1733 done_event,
1734 reason: _,
1735 } => done_event.notify(usize::MAX),
1736 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1737 done_event.notify(usize::MAX)
1738 }
1739 InProgressState::Canceled => {}
1740 }
1741 }
1742 let in_progress_cells = task.take_in_progress_cells();
1745 if let Some(ref cells) = in_progress_cells {
1746 for state in cells.values() {
1747 state.event.notify(usize::MAX);
1748 }
1749 }
1750
1751 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1757 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1758 } else {
1759 None
1760 };
1761
1762 let old = task.set_in_progress(InProgressState::Canceled);
1763 debug_assert!(old.is_none(), "InProgress already exists");
1764 drop(task);
1765
1766 if let Some(data_update) = data_update {
1767 AggregationUpdateQueue::run(data_update, &mut ctx);
1768 }
1769
1770 drop(in_progress_cells);
1771 }
1772
1773 fn try_start_task_execution(
1774 &self,
1775 task_id: TaskId,
1776 priority: TaskPriority,
1777 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1778 ) -> Option<TaskExecutionSpec<'_>> {
1779 let execution_reason;
1780 let task_type;
1781 {
1782 let mut ctx = self.execute_context(turbo_tasks);
1783 let mut task = ctx.task(task_id, TaskDataCategory::All);
1784 task_type = task.get_task_type().to_owned();
1785 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1786 if let Some(tasks) = task.prefetch() {
1787 drop(task);
1788 ctx.prepare_tasks(tasks, "prefetch");
1789 task = ctx.task(task_id, TaskDataCategory::All);
1790 }
1791 let in_progress = task.take_in_progress()?;
1792 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1793 let old = task.set_in_progress(in_progress);
1794 debug_assert!(old.is_none(), "InProgress already exists");
1795 return None;
1796 };
1797 execution_reason = reason;
1798 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1799 InProgressStateInner {
1800 stale: false,
1801 once_task,
1802 done_event,
1803 session_dependent: false,
1804 marked_as_completed: false,
1805 new_children: Default::default(),
1806 },
1807 )));
1808 debug_assert!(old.is_none(), "InProgress already exists");
1809
1810 enum Collectible {
1812 Current(CollectibleRef, i32),
1813 Outdated(CollectibleRef),
1814 }
1815 let collectibles = task
1816 .iter_collectibles()
1817 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1818 .chain(
1819 task.iter_outdated_collectibles()
1820 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1821 )
1822 .collect::<Vec<_>>();
1823 for collectible in collectibles {
1824 match collectible {
1825 Collectible::Current(collectible, value) => {
1826 let _ = task.insert_outdated_collectible(collectible, value);
1827 }
1828 Collectible::Outdated(collectible) => {
1829 if task
1830 .collectibles()
1831 .is_none_or(|m| m.get(&collectible).is_none())
1832 {
1833 task.remove_outdated_collectibles(&collectible);
1834 }
1835 }
1836 }
1837 }
1838
1839 if self.should_track_dependencies() {
1840 let cell_dependencies = task.iter_cell_dependencies().collect();
1842 task.set_outdated_cell_dependencies(cell_dependencies);
1843
1844 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1845 task.set_outdated_output_dependencies(outdated_output_dependencies);
1846 }
1847 }
1848
1849 let (span, future) = match task_type {
1850 TaskType::Cached(task_type) => {
1851 let CachedTaskType {
1852 native_fn,
1853 this,
1854 arg,
1855 } = &*task_type;
1856 (
1857 native_fn.span(task_id.persistence(), execution_reason, priority),
1858 native_fn.execute(*this, &**arg),
1859 )
1860 }
1861 TaskType::Transient(task_type) => {
1862 let span = tracing::trace_span!("turbo_tasks::root_task");
1863 let future = match &*task_type {
1864 TransientTask::Root(f) => f(),
1865 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1866 };
1867 (span, future)
1868 }
1869 };
1870 Some(TaskExecutionSpec { future, span })
1871 }
1872
1873 fn task_execution_completed(
1874 &self,
1875 task_id: TaskId,
1876 result: Result<RawVc, TurboTasksExecutionError>,
1877 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1878 #[cfg(feature = "verify_determinism")] stateful: bool,
1879 has_invalidator: bool,
1880 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1881 ) -> bool {
1882 #[cfg(not(feature = "trace_task_details"))]
1897 let span = tracing::trace_span!(
1898 "task execution completed",
1899 new_children = tracing::field::Empty
1900 )
1901 .entered();
1902 #[cfg(feature = "trace_task_details")]
1903 let span = tracing::trace_span!(
1904 "task execution completed",
1905 task_id = display(task_id),
1906 result = match result.as_ref() {
1907 Ok(value) => display(either::Either::Left(value)),
1908 Err(err) => display(either::Either::Right(err)),
1909 },
1910 new_children = tracing::field::Empty,
1911 immutable = tracing::field::Empty,
1912 new_output = tracing::field::Empty,
1913 output_dependents = tracing::field::Empty,
1914 stale = tracing::field::Empty,
1915 )
1916 .entered();
1917
1918 let is_error = result.is_err();
1919
1920 let mut ctx = self.execute_context(turbo_tasks);
1921
1922 let Some(TaskExecutionCompletePrepareResult {
1923 new_children,
1924 is_now_immutable,
1925 #[cfg(feature = "verify_determinism")]
1926 no_output_set,
1927 new_output,
1928 output_dependent_tasks,
1929 is_recomputation,
1930 }) = self.task_execution_completed_prepare(
1931 &mut ctx,
1932 #[cfg(feature = "trace_task_details")]
1933 &span,
1934 task_id,
1935 result,
1936 cell_counters,
1937 #[cfg(feature = "verify_determinism")]
1938 stateful,
1939 has_invalidator,
1940 )
1941 else {
1942 #[cfg(feature = "trace_task_details")]
1944 span.record("stale", "prepare");
1945 return true;
1946 };
1947
1948 #[cfg(feature = "trace_task_details")]
1949 span.record("new_output", new_output.is_some());
1950 #[cfg(feature = "trace_task_details")]
1951 span.record("output_dependents", output_dependent_tasks.len());
1952
1953 if !output_dependent_tasks.is_empty() {
1958 self.task_execution_completed_invalidate_output_dependent(
1959 &mut ctx,
1960 task_id,
1961 output_dependent_tasks,
1962 );
1963 }
1964
1965 let has_new_children = !new_children.is_empty();
1966 span.record("new_children", new_children.len());
1967
1968 if has_new_children {
1969 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1970 }
1971
1972 if has_new_children
1973 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1974 {
1975 #[cfg(feature = "trace_task_details")]
1977 span.record("stale", "connect");
1978 return true;
1979 }
1980
1981 let (stale, in_progress_cells) = self.task_execution_completed_finish(
1982 &mut ctx,
1983 task_id,
1984 #[cfg(feature = "verify_determinism")]
1985 no_output_set,
1986 new_output,
1987 is_now_immutable,
1988 );
1989 if stale {
1990 #[cfg(feature = "trace_task_details")]
1992 span.record("stale", "finish");
1993 return true;
1994 }
1995
1996 let removed_data = self.task_execution_completed_cleanup(
1997 &mut ctx,
1998 task_id,
1999 cell_counters,
2000 is_error,
2001 is_recomputation,
2002 );
2003
2004 drop(removed_data);
2006 drop(in_progress_cells);
2007
2008 false
2009 }
2010
2011 fn task_execution_completed_prepare(
2012 &self,
2013 ctx: &mut impl ExecuteContext<'_>,
2014 #[cfg(feature = "trace_task_details")] span: &Span,
2015 task_id: TaskId,
2016 result: Result<RawVc, TurboTasksExecutionError>,
2017 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2018 #[cfg(feature = "verify_determinism")] stateful: bool,
2019 has_invalidator: bool,
2020 ) -> Option<TaskExecutionCompletePrepareResult> {
2021 let mut task = ctx.task(task_id, TaskDataCategory::All);
2022 let is_recomputation = task.is_dirty().is_none();
2023 let Some(in_progress) = task.get_in_progress_mut() else {
2024 panic!("Task execution completed, but task is not in progress: {task:#?}");
2025 };
2026 if matches!(in_progress, InProgressState::Canceled) {
2027 return Some(TaskExecutionCompletePrepareResult {
2028 new_children: Default::default(),
2029 is_now_immutable: false,
2030 #[cfg(feature = "verify_determinism")]
2031 no_output_set: false,
2032 new_output: None,
2033 output_dependent_tasks: Default::default(),
2034 is_recomputation,
2035 });
2036 }
2037 let &mut InProgressState::InProgress(box InProgressStateInner {
2038 stale,
2039 ref mut new_children,
2040 session_dependent,
2041 once_task: is_once_task,
2042 ..
2043 }) = in_progress
2044 else {
2045 panic!("Task execution completed, but task is not in progress: {task:#?}");
2046 };
2047
2048 #[cfg(not(feature = "no_fast_stale"))]
2050 if stale && !is_once_task {
2051 let Some(InProgressState::InProgress(box InProgressStateInner {
2052 done_event,
2053 mut new_children,
2054 ..
2055 })) = task.take_in_progress()
2056 else {
2057 unreachable!();
2058 };
2059 let old = task.set_in_progress(InProgressState::Scheduled {
2060 done_event,
2061 reason: TaskExecutionReason::Stale,
2062 });
2063 debug_assert!(old.is_none(), "InProgress already exists");
2064 for task in task.iter_children() {
2067 new_children.remove(&task);
2068 }
2069 drop(task);
2070
2071 AggregationUpdateQueue::run(
2074 AggregationUpdateJob::DecreaseActiveCounts {
2075 task_ids: new_children.into_iter().collect(),
2076 },
2077 ctx,
2078 );
2079 return None;
2080 }
2081
2082 let mut new_children = take(new_children);
2084
2085 #[cfg(feature = "verify_determinism")]
2087 if stateful {
2088 task.set_stateful(true);
2089 }
2090
2091 if has_invalidator {
2093 task.set_invalidator(true);
2094 }
2095
2096 if result.is_ok() || is_recomputation {
2106 let old_counters: FxHashMap<_, _> = task
2107 .iter_cell_type_max_index()
2108 .map(|(&k, &v)| (k, v))
2109 .collect();
2110 let mut counters_to_remove = old_counters.clone();
2111
2112 for (&cell_type, &max_index) in cell_counters.iter() {
2113 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2114 if old_max_index != max_index {
2115 task.insert_cell_type_max_index(cell_type, max_index);
2116 }
2117 } else {
2118 task.insert_cell_type_max_index(cell_type, max_index);
2119 }
2120 }
2121 for (cell_type, _) in counters_to_remove {
2122 task.remove_cell_type_max_index(&cell_type);
2123 }
2124 }
2125
2126 let mut queue = AggregationUpdateQueue::new();
2127
2128 let mut old_edges = Vec::new();
2129
2130 let has_children = !new_children.is_empty();
2131 let is_immutable = task.immutable();
2132 let task_dependencies_for_immutable =
2133 if !is_immutable
2135 && !session_dependent
2137 && !task.invalidator()
2139 && task.is_collectibles_dependencies_empty()
2141 {
2142 Some(
2143 task.iter_output_dependencies()
2145 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2146 .collect::<FxHashSet<_>>(),
2147 )
2148 } else {
2149 None
2150 };
2151
2152 if has_children {
2153 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2155
2156 old_edges.extend(
2158 task.iter_children()
2159 .filter(|task| !new_children.remove(task))
2160 .map(OutdatedEdge::Child),
2161 );
2162 } else {
2163 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2164 }
2165
2166 old_edges.extend(
2167 task.iter_outdated_collectibles()
2168 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2169 );
2170
2171 if self.should_track_dependencies() {
2172 old_edges.extend(
2179 task.iter_outdated_cell_dependencies()
2180 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2181 );
2182 old_edges.extend(
2183 task.iter_outdated_output_dependencies()
2184 .map(OutdatedEdge::OutputDependency),
2185 );
2186 }
2187
2188 let current_output = task.get_output();
2190 #[cfg(feature = "verify_determinism")]
2191 let no_output_set = current_output.is_none();
2192 let new_output = match result {
2193 Ok(RawVc::TaskOutput(output_task_id)) => {
2194 if let Some(OutputValue::Output(current_task_id)) = current_output
2195 && *current_task_id == output_task_id
2196 {
2197 None
2198 } else {
2199 Some(OutputValue::Output(output_task_id))
2200 }
2201 }
2202 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2203 if let Some(OutputValue::Cell(CellRef {
2204 task: current_task_id,
2205 cell: current_cell,
2206 })) = current_output
2207 && *current_task_id == output_task_id
2208 && *current_cell == cell
2209 {
2210 None
2211 } else {
2212 Some(OutputValue::Cell(CellRef {
2213 task: output_task_id,
2214 cell,
2215 }))
2216 }
2217 }
2218 Ok(RawVc::LocalOutput(..)) => {
2219 panic!("Non-local tasks must not return a local Vc");
2220 }
2221 Err(err) => {
2222 if let Some(OutputValue::Error(old_error)) = current_output
2223 && **old_error == err
2224 {
2225 None
2226 } else {
2227 Some(OutputValue::Error(Arc::new((&err).into())))
2228 }
2229 }
2230 };
2231 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2232 if new_output.is_some() && ctx.should_track_dependencies() {
2234 output_dependent_tasks = task.iter_output_dependent().collect();
2235 }
2236
2237 drop(task);
2238
2239 let mut is_now_immutable = false;
2241 if let Some(dependencies) = task_dependencies_for_immutable
2242 && dependencies
2243 .iter()
2244 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2245 {
2246 is_now_immutable = true;
2247 }
2248 #[cfg(feature = "trace_task_details")]
2249 span.record("immutable", is_immutable || is_now_immutable);
2250
2251 if !queue.is_empty() || !old_edges.is_empty() {
2252 #[cfg(any(
2253 feature = "trace_task_completion",
2254 feature = "trace_aggregation_update_stats"
2255 ))]
2256 let _span =
2257 tracing::trace_span!("remove old edges and prepare new children", stats = Empty)
2258 .entered();
2259 #[cfg(feature = "trace_aggregation_update_stats")]
2263 {
2264 let stats = CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2265 _span.record("stats", tracing::field::debug(stats));
2266 }
2267 #[cfg(not(feature = "trace_aggregation_update_stats"))]
2268 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2269 }
2270
2271 Some(TaskExecutionCompletePrepareResult {
2272 new_children,
2273 is_now_immutable,
2274 #[cfg(feature = "verify_determinism")]
2275 no_output_set,
2276 new_output,
2277 output_dependent_tasks,
2278 is_recomputation,
2279 })
2280 }
2281
2282 fn task_execution_completed_invalidate_output_dependent(
2283 &self,
2284 ctx: &mut impl ExecuteContext<'_>,
2285 task_id: TaskId,
2286 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2287 ) {
2288 debug_assert!(!output_dependent_tasks.is_empty());
2289
2290 if output_dependent_tasks.len() > 1 {
2291 ctx.prepare_tasks(
2292 output_dependent_tasks
2293 .iter()
2294 .map(|&id| (id, TaskDataCategory::All)),
2295 "invalidate output dependents",
2296 );
2297 }
2298
2299 fn process_output_dependents(
2300 ctx: &mut impl ExecuteContext<'_>,
2301 task_id: TaskId,
2302 dependent_task_id: TaskId,
2303 queue: &mut AggregationUpdateQueue,
2304 ) {
2305 #[cfg(feature = "trace_task_output_dependencies")]
2306 let span = tracing::trace_span!(
2307 "invalidate output dependency",
2308 task = %task_id,
2309 dependent_task = %dependent_task_id,
2310 result = tracing::field::Empty,
2311 )
2312 .entered();
2313 let mut make_stale = true;
2314 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2315 let transient_task_type = dependent.get_transient_task_type();
2316 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2317 #[cfg(feature = "trace_task_output_dependencies")]
2319 span.record("result", "once task");
2320 return;
2321 }
2322 if dependent.outdated_output_dependencies_contains(&task_id) {
2323 #[cfg(feature = "trace_task_output_dependencies")]
2324 span.record("result", "outdated dependency");
2325 make_stale = false;
2330 } else if !dependent.output_dependencies_contains(&task_id) {
2331 #[cfg(feature = "trace_task_output_dependencies")]
2334 span.record("result", "no backward dependency");
2335 return;
2336 }
2337 make_task_dirty_internal(
2338 dependent,
2339 dependent_task_id,
2340 make_stale,
2341 #[cfg(feature = "trace_task_dirty")]
2342 TaskDirtyCause::OutputChange { task_id },
2343 queue,
2344 ctx,
2345 );
2346 #[cfg(feature = "trace_task_output_dependencies")]
2347 span.record("result", "marked dirty");
2348 }
2349
2350 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
2351 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2352 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2353 let _ = scope_and_block(chunks.len(), |scope| {
2354 for chunk in chunks {
2355 let child_ctx = ctx.child_context();
2356 scope.spawn(move || {
2357 let mut ctx = child_ctx.create();
2358 let mut queue = AggregationUpdateQueue::new();
2359 for dependent_task_id in chunk {
2360 process_output_dependents(
2361 &mut ctx,
2362 task_id,
2363 dependent_task_id,
2364 &mut queue,
2365 )
2366 }
2367 queue.execute(&mut ctx);
2368 });
2369 }
2370 });
2371 } else {
2372 let mut queue = AggregationUpdateQueue::new();
2373 for dependent_task_id in output_dependent_tasks {
2374 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2375 }
2376 queue.execute(ctx);
2377 }
2378 }
2379
2380 fn task_execution_completed_unfinished_children_dirty(
2381 &self,
2382 ctx: &mut impl ExecuteContext<'_>,
2383 new_children: &FxHashSet<TaskId>,
2384 ) {
2385 debug_assert!(!new_children.is_empty());
2386
2387 let mut queue = AggregationUpdateQueue::new();
2388 ctx.for_each_task_all(
2389 new_children.iter().copied(),
2390 "unfinished children dirty",
2391 |child_task, ctx| {
2392 if !child_task.has_output() {
2393 let child_id = child_task.id();
2394 make_task_dirty_internal(
2395 child_task,
2396 child_id,
2397 false,
2398 #[cfg(feature = "trace_task_dirty")]
2399 TaskDirtyCause::InitialDirty,
2400 &mut queue,
2401 ctx,
2402 );
2403 }
2404 },
2405 );
2406
2407 queue.execute(ctx);
2408 }
2409
2410 fn task_execution_completed_connect(
2411 &self,
2412 ctx: &mut impl ExecuteContext<'_>,
2413 task_id: TaskId,
2414 new_children: FxHashSet<TaskId>,
2415 ) -> bool {
2416 debug_assert!(!new_children.is_empty());
2417
2418 let mut task = ctx.task(task_id, TaskDataCategory::All);
2419 let Some(in_progress) = task.get_in_progress() else {
2420 panic!("Task execution completed, but task is not in progress: {task:#?}");
2421 };
2422 if matches!(in_progress, InProgressState::Canceled) {
2423 return false;
2425 }
2426 let InProgressState::InProgress(box InProgressStateInner {
2427 #[cfg(not(feature = "no_fast_stale"))]
2428 stale,
2429 once_task: is_once_task,
2430 ..
2431 }) = in_progress
2432 else {
2433 panic!("Task execution completed, but task is not in progress: {task:#?}");
2434 };
2435
2436 #[cfg(not(feature = "no_fast_stale"))]
2438 if *stale && !is_once_task {
2439 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2440 task.take_in_progress()
2441 else {
2442 unreachable!();
2443 };
2444 let old = task.set_in_progress(InProgressState::Scheduled {
2445 done_event,
2446 reason: TaskExecutionReason::Stale,
2447 });
2448 debug_assert!(old.is_none(), "InProgress already exists");
2449 drop(task);
2450
2451 AggregationUpdateQueue::run(
2454 AggregationUpdateJob::DecreaseActiveCounts {
2455 task_ids: new_children.into_iter().collect(),
2456 },
2457 ctx,
2458 );
2459 return true;
2460 }
2461
2462 let has_active_count = ctx.should_track_activeness()
2463 && task
2464 .get_activeness()
2465 .is_some_and(|activeness| activeness.active_counter > 0);
2466 connect_children(
2467 ctx,
2468 task_id,
2469 task,
2470 new_children,
2471 has_active_count,
2472 ctx.should_track_activeness(),
2473 );
2474
2475 false
2476 }
2477
2478 fn task_execution_completed_finish(
2479 &self,
2480 ctx: &mut impl ExecuteContext<'_>,
2481 task_id: TaskId,
2482 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2483 new_output: Option<OutputValue>,
2484 is_now_immutable: bool,
2485 ) -> (
2486 bool,
2487 Option<
2488 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2489 >,
2490 ) {
2491 let mut task = ctx.task(task_id, TaskDataCategory::All);
2492 let Some(in_progress) = task.take_in_progress() else {
2493 panic!("Task execution completed, but task is not in progress: {task:#?}");
2494 };
2495 if matches!(in_progress, InProgressState::Canceled) {
2496 return (false, None);
2498 }
2499 let InProgressState::InProgress(box InProgressStateInner {
2500 done_event,
2501 once_task: is_once_task,
2502 stale,
2503 session_dependent,
2504 marked_as_completed: _,
2505 new_children,
2506 }) = in_progress
2507 else {
2508 panic!("Task execution completed, but task is not in progress: {task:#?}");
2509 };
2510 debug_assert!(new_children.is_empty());
2511
2512 if stale && !is_once_task {
2514 let old = task.set_in_progress(InProgressState::Scheduled {
2515 done_event,
2516 reason: TaskExecutionReason::Stale,
2517 });
2518 debug_assert!(old.is_none(), "InProgress already exists");
2519 return (true, None);
2520 }
2521
2522 let mut old_content = None;
2524 if let Some(value) = new_output {
2525 old_content = task.set_output(value);
2526 }
2527
2528 if is_now_immutable {
2531 task.set_immutable(true);
2532 }
2533
2534 let in_progress_cells = task.take_in_progress_cells();
2536 if let Some(ref cells) = in_progress_cells {
2537 for state in cells.values() {
2538 state.event.notify(usize::MAX);
2539 }
2540 }
2541
2542 let new_dirtyness = if session_dependent {
2544 Some(Dirtyness::SessionDependent)
2545 } else {
2546 None
2547 };
2548 #[cfg(feature = "verify_determinism")]
2549 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2550 let data_update = task.update_dirty_state(new_dirtyness);
2551
2552 #[cfg(feature = "verify_determinism")]
2553 let reschedule =
2554 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2555 #[cfg(not(feature = "verify_determinism"))]
2556 let reschedule = false;
2557 if reschedule {
2558 let old = task.set_in_progress(InProgressState::Scheduled {
2559 done_event,
2560 reason: TaskExecutionReason::Stale,
2561 });
2562 debug_assert!(old.is_none(), "InProgress already exists");
2563 drop(task);
2564 } else {
2565 drop(task);
2566
2567 done_event.notify(usize::MAX);
2569 }
2570
2571 drop(old_content);
2572
2573 if let Some(data_update) = data_update {
2574 AggregationUpdateQueue::run(data_update, ctx);
2575 }
2576
2577 (reschedule, in_progress_cells)
2579 }
2580
2581 fn task_execution_completed_cleanup(
2582 &self,
2583 ctx: &mut impl ExecuteContext<'_>,
2584 task_id: TaskId,
2585 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2586 is_error: bool,
2587 is_recomputation: bool,
2588 ) -> Vec<SharedReference> {
2589 let mut task = ctx.task(task_id, TaskDataCategory::All);
2590 let mut removed_cell_data = Vec::new();
2591 if !is_error || is_recomputation {
2597 let to_remove: Vec<_> = task
2603 .iter_cell_data()
2604 .filter_map(|(cell, _)| {
2605 cell_counters
2606 .get(&cell.type_id)
2607 .is_none_or(|start_index| cell.index >= *start_index)
2608 .then_some(*cell)
2609 })
2610 .collect();
2611 removed_cell_data.reserve_exact(to_remove.len());
2612 for cell in to_remove {
2613 if let Some(data) = task.remove_cell_data(&cell) {
2614 removed_cell_data.push(data);
2615 }
2616 }
2617 let to_remove_hash: Vec<_> = task
2619 .iter_cell_data_hash()
2620 .filter_map(|(cell, _)| {
2621 cell_counters
2622 .get(&cell.type_id)
2623 .is_none_or(|start_index| cell.index >= *start_index)
2624 .then_some(*cell)
2625 })
2626 .collect();
2627 for cell in to_remove_hash {
2628 task.remove_cell_data_hash(&cell);
2629 }
2630 }
2631
2632 task.cleanup_after_execution();
2636
2637 drop(task);
2638
2639 removed_cell_data
2641 }
2642
2643 fn log_unrecoverable_persist_error() {
2646 eprintln!(
2647 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2648 background persisting process."
2649 );
2650 }
2651
2652 fn run_backend_job<'a>(
2653 self: &'a Arc<Self>,
2654 job: TurboTasksBackendJob,
2655 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2656 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2657 Box::pin(async move {
2658 match job {
2659 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2660 debug_assert!(self.should_persist());
2661
2662 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2663 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2664 let mut idle_start_listener = self.idle_start_event.listen();
2665 let mut idle_end_listener = self.idle_end_event.listen();
2666 let mut fresh_idle = true;
2667 loop {
2668 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2669 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2670 let idle_timeout = *IDLE_TIMEOUT;
2671 let (time, mut reason) =
2672 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2673 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2674 } else {
2675 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2676 };
2677
2678 let until = last_snapshot + time;
2679 if until > Instant::now() {
2680 let mut stop_listener = self.stopping_event.listen();
2681 if self.stopping.load(Ordering::Acquire) {
2682 return;
2683 }
2684 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2685 Instant::now() + idle_timeout
2686 } else {
2687 far_future()
2688 };
2689 loop {
2690 tokio::select! {
2691 _ = &mut stop_listener => {
2692 return;
2693 },
2694 _ = &mut idle_start_listener => {
2695 idle_time = Instant::now() + idle_timeout;
2696 idle_start_listener = self.idle_start_event.listen()
2697 },
2698 _ = &mut idle_end_listener => {
2699 idle_time = until + idle_timeout;
2700 idle_end_listener = self.idle_end_event.listen()
2701 },
2702 _ = tokio::time::sleep_until(until) => {
2703 break;
2704 },
2705 _ = tokio::time::sleep_until(idle_time) => {
2706 if turbo_tasks.is_idle() {
2707 reason = "idle timeout";
2708 break;
2709 }
2710 },
2711 }
2712 }
2713 }
2714
2715 let this = self.clone();
2716 let background_span =
2720 tracing::info_span!(parent: None, "background snapshot");
2721 match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2722 Err(err) => {
2723 eprintln!("Persisting failed: {err:?}");
2726 Self::log_unrecoverable_persist_error();
2727 return;
2728 }
2729 Ok((snapshot_start, new_data)) => {
2730 last_snapshot = snapshot_start;
2731
2732 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2739 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2740 let idle_ended = tokio::select! {
2741 biased;
2742 _ = &mut idle_end_listener => {
2743 idle_end_listener = self.idle_end_event.listen();
2744 true
2745 },
2746 _ = std::future::ready(()) => false,
2747 };
2748 if idle_ended {
2749 break;
2750 }
2751 let _compact_span = tracing::info_span!(
2755 parent: background_span.id(),
2756 "compact database"
2757 )
2758 .entered();
2759 match self.backing_storage.compact() {
2760 Ok(true) => {}
2761 Ok(false) => break,
2762 Err(err) => {
2763 eprintln!("Compaction failed: {err:?}");
2764 if self.backing_storage.has_unrecoverable_write_error()
2765 {
2766 Self::log_unrecoverable_persist_error();
2767 return;
2768 }
2769 break;
2770 }
2771 }
2772 }
2773
2774 if !new_data {
2775 fresh_idle = false;
2776 continue;
2777 }
2778 let last_snapshot = last_snapshot.duration_since(self.start_time);
2779 self.last_snapshot.store(
2780 last_snapshot.as_millis().try_into().unwrap(),
2781 Ordering::Relaxed,
2782 );
2783
2784 turbo_tasks.schedule_backend_background_job(
2785 TurboTasksBackendJob::FollowUpSnapshot,
2786 );
2787 return;
2788 }
2789 }
2790 }
2791 }
2792 }
2793 })
2794 }
2795
2796 fn try_read_own_task_cell(
2797 &self,
2798 task_id: TaskId,
2799 cell: CellId,
2800 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2801 ) -> Result<TypedCellContent> {
2802 let mut ctx = self.execute_context(turbo_tasks);
2803 let task = ctx.task(task_id, TaskDataCategory::Data);
2804 if let Some(content) = task.get_cell_data(&cell).cloned() {
2805 Ok(CellContent(Some(content)).into_typed(cell.type_id))
2806 } else {
2807 Ok(CellContent(None).into_typed(cell.type_id))
2808 }
2809 }
2810
2811 fn read_task_collectibles(
2812 &self,
2813 task_id: TaskId,
2814 collectible_type: TraitTypeId,
2815 reader_id: Option<TaskId>,
2816 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2817 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2818 let mut ctx = self.execute_context(turbo_tasks);
2819 let mut collectibles = AutoMap::default();
2820 {
2821 let mut task = ctx.task(task_id, TaskDataCategory::All);
2822 loop {
2824 let aggregation_number = get_aggregation_number(&task);
2825 if is_root_node(aggregation_number) {
2826 break;
2827 }
2828 drop(task);
2829 AggregationUpdateQueue::run(
2830 AggregationUpdateJob::UpdateAggregationNumber {
2831 task_id,
2832 base_aggregation_number: u32::MAX,
2833 distance: None,
2834 },
2835 &mut ctx,
2836 );
2837 task = ctx.task(task_id, TaskDataCategory::All);
2838 }
2839 for (collectible, count) in task.iter_aggregated_collectibles() {
2840 if *count > 0 && collectible.collectible_type == collectible_type {
2841 *collectibles
2842 .entry(RawVc::TaskCell(
2843 collectible.cell.task,
2844 collectible.cell.cell,
2845 ))
2846 .or_insert(0) += 1;
2847 }
2848 }
2849 for (&collectible, &count) in task.iter_collectibles() {
2850 if collectible.collectible_type == collectible_type {
2851 *collectibles
2852 .entry(RawVc::TaskCell(
2853 collectible.cell.task,
2854 collectible.cell.cell,
2855 ))
2856 .or_insert(0) += count;
2857 }
2858 }
2859 if let Some(reader_id) = reader_id {
2860 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2861 }
2862 }
2863 if let Some(reader_id) = reader_id {
2864 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2865 let target = CollectiblesRef {
2866 task: task_id,
2867 collectible_type,
2868 };
2869 if !reader.remove_outdated_collectibles_dependencies(&target) {
2870 let _ = reader.add_collectibles_dependencies(target);
2871 }
2872 }
2873 collectibles
2874 }
2875
2876 fn emit_collectible(
2877 &self,
2878 collectible_type: TraitTypeId,
2879 collectible: RawVc,
2880 task_id: TaskId,
2881 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2882 ) {
2883 self.assert_valid_collectible(task_id, collectible);
2884
2885 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2886 panic!("Collectibles need to be resolved");
2887 };
2888 let cell = CellRef {
2889 task: collectible_task,
2890 cell,
2891 };
2892 operation::UpdateCollectibleOperation::run(
2893 task_id,
2894 CollectibleRef {
2895 collectible_type,
2896 cell,
2897 },
2898 1,
2899 self.execute_context(turbo_tasks),
2900 );
2901 }
2902
2903 fn unemit_collectible(
2904 &self,
2905 collectible_type: TraitTypeId,
2906 collectible: RawVc,
2907 count: u32,
2908 task_id: TaskId,
2909 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2910 ) {
2911 self.assert_valid_collectible(task_id, collectible);
2912
2913 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2914 panic!("Collectibles need to be resolved");
2915 };
2916 let cell = CellRef {
2917 task: collectible_task,
2918 cell,
2919 };
2920 operation::UpdateCollectibleOperation::run(
2921 task_id,
2922 CollectibleRef {
2923 collectible_type,
2924 cell,
2925 },
2926 -(i32::try_from(count).unwrap()),
2927 self.execute_context(turbo_tasks),
2928 );
2929 }
2930
2931 fn update_task_cell(
2932 &self,
2933 task_id: TaskId,
2934 cell: CellId,
2935 content: CellContent,
2936 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2937 content_hash: Option<CellHash>,
2938 verification_mode: VerificationMode,
2939 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2940 ) {
2941 operation::UpdateCellOperation::run(
2942 task_id,
2943 cell,
2944 content,
2945 updated_key_hashes,
2946 content_hash,
2947 verification_mode,
2948 self.execute_context(turbo_tasks),
2949 );
2950 }
2951
2952 fn mark_own_task_as_session_dependent(
2953 &self,
2954 task_id: TaskId,
2955 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2956 ) {
2957 if !self.should_track_dependencies() {
2958 return;
2960 }
2961 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2962 let mut ctx = self.execute_context(turbo_tasks);
2963 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2964 let aggregation_number = get_aggregation_number(&task);
2965 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2966 drop(task);
2967 AggregationUpdateQueue::run(
2970 AggregationUpdateJob::UpdateAggregationNumber {
2971 task_id,
2972 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2973 distance: None,
2974 },
2975 &mut ctx,
2976 );
2977 task = ctx.task(task_id, TaskDataCategory::Meta);
2978 }
2979 if let Some(InProgressState::InProgress(box InProgressStateInner {
2980 session_dependent,
2981 ..
2982 })) = task.get_in_progress_mut()
2983 {
2984 *session_dependent = true;
2985 }
2986 }
2987
2988 fn mark_own_task_as_finished(
2989 &self,
2990 task: TaskId,
2991 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2992 ) {
2993 let mut ctx = self.execute_context(turbo_tasks);
2994 let mut task = ctx.task(task, TaskDataCategory::Data);
2995 if let Some(InProgressState::InProgress(box InProgressStateInner {
2996 marked_as_completed,
2997 ..
2998 })) = task.get_in_progress_mut()
2999 {
3000 *marked_as_completed = true;
3001 }
3006 }
3007
3008 fn connect_task(
3009 &self,
3010 task: TaskId,
3011 parent_task: Option<TaskId>,
3012 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3013 ) {
3014 self.assert_not_persistent_calling_transient(parent_task, task, None);
3015 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3016 }
3017
3018 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3019 let task_id = self.transient_task_id_factory.get();
3020 {
3021 let mut task = self.storage.access_mut(task_id);
3022 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3023 }
3024 #[cfg(feature = "verify_aggregation_graph")]
3025 self.root_tasks.lock().insert(task_id);
3026 task_id
3027 }
3028
3029 fn dispose_root_task(
3030 &self,
3031 task_id: TaskId,
3032 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3033 ) {
3034 #[cfg(feature = "verify_aggregation_graph")]
3035 self.root_tasks.lock().remove(&task_id);
3036
3037 let mut ctx = self.execute_context(turbo_tasks);
3038 let mut task = ctx.task(task_id, TaskDataCategory::All);
3039 let is_dirty = task.is_dirty();
3040 let has_dirty_containers = task.has_dirty_containers();
3041 if is_dirty.is_some() || has_dirty_containers {
3042 if let Some(activeness_state) = task.get_activeness_mut() {
3043 activeness_state.unset_root_type();
3045 activeness_state.set_active_until_clean();
3046 };
3047 } else if let Some(activeness_state) = task.take_activeness() {
3048 activeness_state.all_clean_event.notify(usize::MAX);
3051 }
3052 }
3053
3054 #[cfg(feature = "verify_aggregation_graph")]
3055 fn verify_aggregation_graph(
3056 &self,
3057 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3058 idle: bool,
3059 ) {
3060 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3061 return;
3062 }
3063 use std::{collections::VecDeque, env, io::stdout};
3064
3065 use crate::backend::operation::{get_uppers, is_aggregating_node};
3066
3067 let mut ctx = self.execute_context(turbo_tasks);
3068 let root_tasks = self.root_tasks.lock().clone();
3069
3070 for task_id in root_tasks.into_iter() {
3071 let mut queue = VecDeque::new();
3072 let mut visited = FxHashSet::default();
3073 let mut aggregated_nodes = FxHashSet::default();
3074 let mut collectibles = FxHashMap::default();
3075 let root_task_id = task_id;
3076 visited.insert(task_id);
3077 aggregated_nodes.insert(task_id);
3078 queue.push_back(task_id);
3079 let mut counter = 0;
3080 while let Some(task_id) = queue.pop_front() {
3081 counter += 1;
3082 if counter % 100000 == 0 {
3083 println!(
3084 "queue={}, visited={}, aggregated_nodes={}",
3085 queue.len(),
3086 visited.len(),
3087 aggregated_nodes.len()
3088 );
3089 }
3090 let task = ctx.task(task_id, TaskDataCategory::All);
3091 if idle && !self.is_idle.load(Ordering::Relaxed) {
3092 return;
3093 }
3094
3095 let uppers = get_uppers(&task);
3096 if task_id != root_task_id
3097 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3098 {
3099 panic!(
3100 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3101 {:?})",
3102 task_id,
3103 task.get_task_description(),
3104 uppers
3105 );
3106 }
3107
3108 for (collectible, _) in task.iter_aggregated_collectibles() {
3109 collectibles
3110 .entry(*collectible)
3111 .or_insert_with(|| (false, Vec::new()))
3112 .1
3113 .push(task_id);
3114 }
3115
3116 for (&collectible, &value) in task.iter_collectibles() {
3117 if value > 0 {
3118 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3119 *flag = true
3120 } else {
3121 panic!(
3122 "Task {} has a collectible {:?} that is not in any upper task",
3123 task_id, collectible
3124 );
3125 }
3126 }
3127 }
3128
3129 let is_dirty = task.has_dirty();
3130 let has_dirty_container = task.has_dirty_containers();
3131 let should_be_in_upper = is_dirty || has_dirty_container;
3132
3133 let aggregation_number = get_aggregation_number(&task);
3134 if is_aggregating_node(aggregation_number) {
3135 aggregated_nodes.insert(task_id);
3136 }
3137 for child_id in task.iter_children() {
3144 if visited.insert(child_id) {
3146 queue.push_back(child_id);
3147 }
3148 }
3149 drop(task);
3150
3151 if should_be_in_upper {
3152 for upper_id in uppers {
3153 let upper = ctx.task(upper_id, TaskDataCategory::All);
3154 let in_upper = upper
3155 .get_aggregated_dirty_containers(&task_id)
3156 .is_some_and(|&dirty| dirty > 0);
3157 if !in_upper {
3158 let containers: Vec<_> = upper
3159 .iter_aggregated_dirty_containers()
3160 .map(|(&k, &v)| (k, v))
3161 .collect();
3162 let upper_task_desc = upper.get_task_description();
3163 drop(upper);
3164 panic!(
3165 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3166 ({})\nThese dirty containers are present:\n{:#?}",
3167 task_id,
3168 ctx.task(task_id, TaskDataCategory::Data)
3169 .get_task_description(),
3170 upper_id,
3171 upper_task_desc,
3172 containers,
3173 );
3174 }
3175 }
3176 }
3177 }
3178
3179 for (collectible, (flag, task_ids)) in collectibles {
3180 if !flag {
3181 use std::io::Write;
3182 let mut stdout = stdout().lock();
3183 writeln!(
3184 stdout,
3185 "{:?} that is not emitted in any child task but in these aggregated \
3186 tasks: {:#?}",
3187 collectible,
3188 task_ids
3189 .iter()
3190 .map(|t| format!(
3191 "{t} {}",
3192 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3193 ))
3194 .collect::<Vec<_>>()
3195 )
3196 .unwrap();
3197
3198 let task_id = collectible.cell.task;
3199 let mut queue = {
3200 let task = ctx.task(task_id, TaskDataCategory::All);
3201 get_uppers(&task)
3202 };
3203 let mut visited = FxHashSet::default();
3204 for &upper_id in queue.iter() {
3205 visited.insert(upper_id);
3206 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3207 }
3208 while let Some(task_id) = queue.pop() {
3209 let task = ctx.task(task_id, TaskDataCategory::All);
3210 let desc = task.get_task_description();
3211 let aggregated_collectible = task
3212 .get_aggregated_collectibles(&collectible)
3213 .copied()
3214 .unwrap_or_default();
3215 let uppers = get_uppers(&task);
3216 drop(task);
3217 writeln!(
3218 stdout,
3219 "upper {task_id} {desc} collectible={aggregated_collectible}"
3220 )
3221 .unwrap();
3222 if task_ids.contains(&task_id) {
3223 writeln!(
3224 stdout,
3225 "Task has an upper connection to an aggregated task that doesn't \
3226 reference it. Upper connection is invalid!"
3227 )
3228 .unwrap();
3229 }
3230 for upper_id in uppers {
3231 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3232 if !visited.contains(&upper_id) {
3233 queue.push(upper_id);
3234 }
3235 }
3236 }
3237 panic!("See stdout for more details");
3238 }
3239 }
3240 }
3241 }
3242
3243 fn assert_not_persistent_calling_transient(
3244 &self,
3245 parent_id: Option<TaskId>,
3246 child_id: TaskId,
3247 cell_id: Option<CellId>,
3248 ) {
3249 if let Some(parent_id) = parent_id
3250 && !parent_id.is_transient()
3251 && child_id.is_transient()
3252 {
3253 self.panic_persistent_calling_transient(
3254 self.debug_get_task_description(parent_id),
3255 self.debug_get_cached_task_type(child_id).as_deref(),
3256 cell_id,
3257 );
3258 }
3259 }
3260
3261 fn panic_persistent_calling_transient(
3262 &self,
3263 parent: String,
3264 child: Option<&CachedTaskType>,
3265 cell_id: Option<CellId>,
3266 ) -> ! {
3267 let transient_reason = if let Some(child) = child {
3268 Cow::Owned(format!(
3269 " The callee is transient because it depends on:\n{}",
3270 self.debug_trace_transient_task(child, cell_id),
3271 ))
3272 } else {
3273 Cow::Borrowed("")
3274 };
3275 panic!(
3276 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3277 parent,
3278 child.map_or("unknown", |t| t.get_name()),
3279 transient_reason,
3280 );
3281 }
3282
3283 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3284 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3286 let task_info = if let Some(col_task_ty) = collectible
3288 .try_get_task_id()
3289 .map(|t| self.debug_get_task_description(t))
3290 {
3291 Cow::Owned(format!(" (return type of {col_task_ty})"))
3292 } else {
3293 Cow::Borrowed("")
3294 };
3295 panic!("Collectible{task_info} must be a ResolvedVc")
3296 };
3297 if col_task_id.is_transient() && !task_id.is_transient() {
3298 let transient_reason =
3299 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3300 Cow::Owned(format!(
3301 ". The collectible is transient because it depends on:\n{}",
3302 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3303 ))
3304 } else {
3305 Cow::Borrowed("")
3306 };
3307 panic!(
3309 "Collectible is transient, transient collectibles cannot be emitted from \
3310 persistent tasks{transient_reason}",
3311 )
3312 }
3313 }
3314}
3315
3316impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3317 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3318 self.0.startup(turbo_tasks);
3319 }
3320
3321 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3322 self.0.stopping();
3323 }
3324
3325 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3326 self.0.stop(turbo_tasks);
3327 }
3328
3329 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3330 self.0.idle_start(turbo_tasks);
3331 }
3332
3333 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3334 self.0.idle_end();
3335 }
3336
3337 fn get_or_create_task(
3338 &self,
3339 native_fn: &'static NativeFunction,
3340 this: Option<RawVc>,
3341 arg: &mut dyn StackDynTaskInputs,
3342 parent_task: Option<TaskId>,
3343 persistence: TaskPersistence,
3344 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3345 ) -> TaskId {
3346 self.0
3347 .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3348 }
3349
3350 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3351 self.0.invalidate_task(task_id, turbo_tasks);
3352 }
3353
3354 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3355 self.0.invalidate_tasks(tasks, turbo_tasks);
3356 }
3357
3358 fn invalidate_tasks_set(
3359 &self,
3360 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3361 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3362 ) {
3363 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3364 }
3365
3366 fn invalidate_serialization(
3367 &self,
3368 task_id: TaskId,
3369 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3370 ) {
3371 self.0.invalidate_serialization(task_id, turbo_tasks);
3372 }
3373
3374 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3375 self.0.task_execution_canceled(task, turbo_tasks)
3376 }
3377
3378 fn try_start_task_execution(
3379 &self,
3380 task_id: TaskId,
3381 priority: TaskPriority,
3382 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3383 ) -> Option<TaskExecutionSpec<'_>> {
3384 self.0
3385 .try_start_task_execution(task_id, priority, turbo_tasks)
3386 }
3387
3388 fn task_execution_completed(
3389 &self,
3390 task_id: TaskId,
3391 result: Result<RawVc, TurboTasksExecutionError>,
3392 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3393 #[cfg(feature = "verify_determinism")] stateful: bool,
3394 has_invalidator: bool,
3395 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3396 ) -> bool {
3397 self.0.task_execution_completed(
3398 task_id,
3399 result,
3400 cell_counters,
3401 #[cfg(feature = "verify_determinism")]
3402 stateful,
3403 has_invalidator,
3404 turbo_tasks,
3405 )
3406 }
3407
3408 type BackendJob = TurboTasksBackendJob;
3409
3410 fn run_backend_job<'a>(
3411 &'a self,
3412 job: Self::BackendJob,
3413 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3414 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3415 self.0.run_backend_job(job, turbo_tasks)
3416 }
3417
3418 fn try_read_task_output(
3419 &self,
3420 task_id: TaskId,
3421 reader: Option<TaskId>,
3422 options: ReadOutputOptions,
3423 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3424 ) -> Result<Result<RawVc, EventListener>> {
3425 self.0
3426 .try_read_task_output(task_id, reader, options, turbo_tasks)
3427 }
3428
3429 fn try_read_task_cell(
3430 &self,
3431 task_id: TaskId,
3432 cell: CellId,
3433 reader: Option<TaskId>,
3434 options: ReadCellOptions,
3435 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3436 ) -> Result<Result<TypedCellContent, EventListener>> {
3437 self.0
3438 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3439 }
3440
3441 fn try_read_own_task_cell(
3442 &self,
3443 task_id: TaskId,
3444 cell: CellId,
3445 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3446 ) -> Result<TypedCellContent> {
3447 self.0.try_read_own_task_cell(task_id, cell, turbo_tasks)
3448 }
3449
3450 fn read_task_collectibles(
3451 &self,
3452 task_id: TaskId,
3453 collectible_type: TraitTypeId,
3454 reader: Option<TaskId>,
3455 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3456 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3457 self.0
3458 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3459 }
3460
3461 fn emit_collectible(
3462 &self,
3463 collectible_type: TraitTypeId,
3464 collectible: RawVc,
3465 task_id: TaskId,
3466 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3467 ) {
3468 self.0
3469 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3470 }
3471
3472 fn unemit_collectible(
3473 &self,
3474 collectible_type: TraitTypeId,
3475 collectible: RawVc,
3476 count: u32,
3477 task_id: TaskId,
3478 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3479 ) {
3480 self.0
3481 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3482 }
3483
3484 fn update_task_cell(
3485 &self,
3486 task_id: TaskId,
3487 cell: CellId,
3488 content: CellContent,
3489 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3490 content_hash: Option<CellHash>,
3491 verification_mode: VerificationMode,
3492 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3493 ) {
3494 self.0.update_task_cell(
3495 task_id,
3496 cell,
3497 content,
3498 updated_key_hashes,
3499 content_hash,
3500 verification_mode,
3501 turbo_tasks,
3502 );
3503 }
3504
3505 fn mark_own_task_as_finished(
3506 &self,
3507 task_id: TaskId,
3508 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3509 ) {
3510 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3511 }
3512
3513 fn mark_own_task_as_session_dependent(
3514 &self,
3515 task: TaskId,
3516 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3517 ) {
3518 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3519 }
3520
3521 fn connect_task(
3522 &self,
3523 task: TaskId,
3524 parent_task: Option<TaskId>,
3525 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3526 ) {
3527 self.0.connect_task(task, parent_task, turbo_tasks);
3528 }
3529
3530 fn create_transient_task(
3531 &self,
3532 task_type: TransientTaskType,
3533 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3534 ) -> TaskId {
3535 self.0.create_transient_task(task_type)
3536 }
3537
3538 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3539 self.0.dispose_root_task(task_id, turbo_tasks);
3540 }
3541
3542 fn task_statistics(&self) -> &TaskStatisticsApi {
3543 &self.0.task_statistics
3544 }
3545
3546 fn is_tracking_dependencies(&self) -> bool {
3547 self.0.options.dependency_tracking
3548 }
3549
3550 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3551 self.0.get_task_name(task, turbo_tasks)
3552 }
3553}
3554
3555enum DebugTraceTransientTask {
3556 Cached {
3557 task_name: &'static str,
3558 cell_type_id: Option<ValueTypeId>,
3559 cause_self: Option<Box<DebugTraceTransientTask>>,
3560 cause_args: Vec<DebugTraceTransientTask>,
3561 },
3562 Collapsed {
3564 task_name: &'static str,
3565 cell_type_id: Option<ValueTypeId>,
3566 },
3567 Uncached {
3568 cell_type_id: Option<ValueTypeId>,
3569 },
3570}
3571
3572impl DebugTraceTransientTask {
3573 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3574 let indent = " ".repeat(level);
3575 f.write_str(&indent)?;
3576
3577 fn fmt_cell_type_id(
3578 f: &mut fmt::Formatter<'_>,
3579 cell_type_id: Option<ValueTypeId>,
3580 ) -> fmt::Result {
3581 if let Some(ty) = cell_type_id {
3582 write!(
3583 f,
3584 " (read cell of type {})",
3585 get_value_type(ty).ty.global_name
3586 )
3587 } else {
3588 Ok(())
3589 }
3590 }
3591
3592 match self {
3594 Self::Cached {
3595 task_name,
3596 cell_type_id,
3597 ..
3598 }
3599 | Self::Collapsed {
3600 task_name,
3601 cell_type_id,
3602 ..
3603 } => {
3604 f.write_str(task_name)?;
3605 fmt_cell_type_id(f, *cell_type_id)?;
3606 if matches!(self, Self::Collapsed { .. }) {
3607 f.write_str(" (collapsed)")?;
3608 }
3609 }
3610 Self::Uncached { cell_type_id } => {
3611 f.write_str("unknown transient task")?;
3612 fmt_cell_type_id(f, *cell_type_id)?;
3613 }
3614 }
3615 f.write_char('\n')?;
3616
3617 if let Self::Cached {
3619 cause_self,
3620 cause_args,
3621 ..
3622 } = self
3623 {
3624 if let Some(c) = cause_self {
3625 writeln!(f, "{indent} self:")?;
3626 c.fmt_indented(f, level + 1)?;
3627 }
3628 if !cause_args.is_empty() {
3629 writeln!(f, "{indent} args:")?;
3630 for c in cause_args {
3631 c.fmt_indented(f, level + 1)?;
3632 }
3633 }
3634 }
3635 Ok(())
3636 }
3637}
3638
3639impl fmt::Display for DebugTraceTransientTask {
3640 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3641 self.fmt_indented(f, 0)
3642 }
3643}
3644
3645fn far_future() -> Instant {
3647 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3652}
3653
3654fn encode_task_data(
3666 task: TaskId,
3667 data: &TaskStorage,
3668 category: SpecificTaskDataCategory,
3669 scratch_buffer: &mut TurboBincodeBuffer,
3670) -> Result<TurboBincodeBuffer> {
3671 scratch_buffer.clear();
3672 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3673 data.encode(category, &mut encoder)?;
3674
3675 if cfg!(feature = "verify_serialization") {
3676 TaskStorage::new()
3677 .decode(
3678 category,
3679 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3680 )
3681 .with_context(|| {
3682 format!(
3683 "expected to be able to decode serialized data for '{category:?}' information \
3684 for {task}"
3685 )
3686 })?;
3687 }
3688 Ok(SmallVec::from_slice(scratch_buffer))
3689}