1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17 time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31 ReadOutputOptions, ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT,
32 TaskExecutionReason, TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi,
33 TurboTasksPanic, ValueTypeId,
34 backend::{
35 Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
36 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
37 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
38 VerificationMode,
39 },
40 event::{Event, EventDescription, EventListener},
41 macro_helpers::NativeFunction,
42 message_queue::{TimingEvent, TraceEvent},
43 registry::get_value_type,
44 scope::scope_and_block,
45 task_statistics::TaskStatisticsApi,
46 trace::TraceRawVcs,
47 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
48};
49
50pub use self::{
51 operation::AnyOperation,
52 storage::{SpecificTaskDataCategory, TaskDataCategory},
53};
54#[cfg(feature = "trace_task_dirty")]
55use crate::backend::operation::TaskDirtyCause;
56use crate::{
57 backend::{
58 operation::{
59 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
60 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
61 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
62 connect_children, get_aggregation_number, get_uppers, is_root_node,
63 make_task_dirty_internal, prepare_new_children,
64 },
65 storage::Storage,
66 storage_schema::{TaskStorage, TaskStorageAccessors},
67 },
68 backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
69 data::{
70 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
71 InProgressState, InProgressStateInner, OutputValue, TransientTask,
72 },
73 error::TaskError,
74 utils::{
75 dash_map_drop_contents::drop_contents,
76 dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
77 ptr_eq_arc::PtrEqArc,
78 shard_amount::compute_shard_amount,
79 },
80};
81
82const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
86
87const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
88
89static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
92 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
93 .ok()
94 .and_then(|v| v.parse::<u64>().ok())
95 .map(Duration::from_millis)
96 .unwrap_or(Duration::from_secs(2))
97});
98
99struct SnapshotRequest {
100 snapshot_requested: bool,
101 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
102}
103
104impl SnapshotRequest {
105 fn new() -> Self {
106 Self {
107 snapshot_requested: false,
108 suspended_operations: FxHashSet::default(),
109 }
110 }
111}
112
113pub enum StorageMode {
114 ReadOnly,
116 ReadWrite,
119 ReadWriteOnShutdown,
122}
123
124pub struct BackendOptions {
125 pub dependency_tracking: bool,
130
131 pub active_tracking: bool,
137
138 pub storage_mode: Option<StorageMode>,
140
141 pub num_workers: Option<usize>,
144
145 pub small_preallocation: bool,
147}
148
149impl Default for BackendOptions {
150 fn default() -> Self {
151 Self {
152 dependency_tracking: true,
153 active_tracking: true,
154 storage_mode: Some(StorageMode::ReadWrite),
155 num_workers: None,
156 small_preallocation: false,
157 }
158 }
159}
160
161pub enum TurboTasksBackendJob {
162 InitialSnapshot,
163 FollowUpSnapshot,
164}
165
166pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
167
168struct TurboTasksBackendInner<B: BackingStorage> {
169 options: BackendOptions,
170
171 start_time: Instant,
172
173 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
174 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
175
176 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
177
178 storage: Storage,
179
180 in_progress_operations: AtomicUsize,
186
187 snapshot_request: Mutex<SnapshotRequest>,
188 operations_suspended: Condvar,
192 snapshot_completed: Condvar,
195 last_snapshot: AtomicU64,
197
198 stopping: AtomicBool,
199 stopping_event: Event,
200 idle_start_event: Event,
201 idle_end_event: Event,
202 #[cfg(feature = "verify_aggregation_graph")]
203 is_idle: AtomicBool,
204
205 task_statistics: TaskStatisticsApi,
206
207 backing_storage: B,
208
209 #[cfg(feature = "verify_aggregation_graph")]
210 root_tasks: Mutex<FxHashSet<TaskId>>,
211}
212
213impl<B: BackingStorage> TurboTasksBackend<B> {
214 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
215 Self(Arc::new(TurboTasksBackendInner::new(
216 options,
217 backing_storage,
218 )))
219 }
220
221 pub fn backing_storage(&self) -> &B {
222 &self.0.backing_storage
223 }
224}
225
226impl<B: BackingStorage> TurboTasksBackendInner<B> {
227 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
228 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
229 if !options.dependency_tracking {
230 options.active_tracking = false;
231 }
232 let small_preallocation = options.small_preallocation;
233 let next_task_id = backing_storage
234 .next_free_task_id()
235 .expect("Failed to get task id");
236 Self {
237 options,
238 start_time: Instant::now(),
239 persisted_task_id_factory: IdFactoryWithReuse::new(
240 next_task_id,
241 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
242 ),
243 transient_task_id_factory: IdFactoryWithReuse::new(
244 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
245 TaskId::MAX,
246 ),
247 task_cache: FxDashMap::default(),
248 storage: Storage::new(shard_amount, small_preallocation),
249 in_progress_operations: AtomicUsize::new(0),
250 snapshot_request: Mutex::new(SnapshotRequest::new()),
251 operations_suspended: Condvar::new(),
252 snapshot_completed: Condvar::new(),
253 last_snapshot: AtomicU64::new(0),
254 stopping: AtomicBool::new(false),
255 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
256 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
257 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
258 #[cfg(feature = "verify_aggregation_graph")]
259 is_idle: AtomicBool::new(false),
260 task_statistics: TaskStatisticsApi::default(),
261 backing_storage,
262 #[cfg(feature = "verify_aggregation_graph")]
263 root_tasks: Default::default(),
264 }
265 }
266
267 fn execute_context<'a>(
268 &'a self,
269 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
270 ) -> impl ExecuteContext<'a> {
271 ExecuteContextImpl::new(self, turbo_tasks)
272 }
273
274 fn suspending_requested(&self) -> bool {
275 self.should_persist()
276 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
277 }
278
279 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
280 #[cold]
281 fn operation_suspend_point_cold<B: BackingStorage>(
282 this: &TurboTasksBackendInner<B>,
283 suspend: impl FnOnce() -> AnyOperation,
284 ) {
285 let operation = Arc::new(suspend());
286 let mut snapshot_request = this.snapshot_request.lock();
287 if snapshot_request.snapshot_requested {
288 snapshot_request
289 .suspended_operations
290 .insert(operation.clone().into());
291 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
292 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
293 if value == SNAPSHOT_REQUESTED_BIT {
294 this.operations_suspended.notify_all();
295 }
296 this.snapshot_completed
297 .wait_while(&mut snapshot_request, |snapshot_request| {
298 snapshot_request.snapshot_requested
299 });
300 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
301 snapshot_request
302 .suspended_operations
303 .remove(&operation.into());
304 }
305 }
306
307 if self.suspending_requested() {
308 operation_suspend_point_cold(self, suspend);
309 }
310 }
311
312 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
313 if !self.should_persist() {
314 return OperationGuard { backend: None };
315 }
316 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
317 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
318 let mut snapshot_request = self.snapshot_request.lock();
319 if snapshot_request.snapshot_requested {
320 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
321 if value == SNAPSHOT_REQUESTED_BIT {
322 self.operations_suspended.notify_all();
323 }
324 self.snapshot_completed
325 .wait_while(&mut snapshot_request, |snapshot_request| {
326 snapshot_request.snapshot_requested
327 });
328 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
329 }
330 }
331 OperationGuard {
332 backend: Some(self),
333 }
334 }
335
336 fn should_persist(&self) -> bool {
337 matches!(
338 self.options.storage_mode,
339 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
340 )
341 }
342
343 fn should_restore(&self) -> bool {
344 self.options.storage_mode.is_some()
345 }
346
347 fn should_track_dependencies(&self) -> bool {
348 self.options.dependency_tracking
349 }
350
351 fn should_track_activeness(&self) -> bool {
352 self.options.active_tracking
353 }
354
355 fn track_cache_hit_by_fn(&self, native_fn: &'static NativeFunction) {
356 self.task_statistics
357 .map(|stats| stats.increment_cache_hit(native_fn));
358 }
359
360 fn track_cache_miss_by_fn(&self, native_fn: &'static NativeFunction) {
361 self.task_statistics
362 .map(|stats| stats.increment_cache_miss(native_fn));
363 }
364
365 fn task_error_to_turbo_tasks_execution_error(
370 &self,
371 error: &TaskError,
372 ctx: &mut impl ExecuteContext<'_>,
373 ) -> TurboTasksExecutionError {
374 match error {
375 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
376 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
377 message: item.message.clone(),
378 source: item
379 .source
380 .as_ref()
381 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
382 })),
383 TaskError::LocalTaskContext(local_task_context) => {
384 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
385 name: local_task_context.name.clone(),
386 source: local_task_context
387 .source
388 .as_ref()
389 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
390 }))
391 }
392 TaskError::TaskChain(chain) => {
393 let task_id = chain.last().unwrap();
394 let error = {
395 let task = ctx.task(*task_id, TaskDataCategory::Meta);
396 if let Some(OutputValue::Error(error)) = task.get_output() {
397 Some(error.clone())
398 } else {
399 None
400 }
401 };
402 let error = error.map_or_else(
403 || {
404 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
406 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
407 "Error no longer available",
408 )),
409 location: None,
410 }))
411 },
412 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
413 );
414 let mut current_error = error;
415 for &task_id in chain.iter().rev() {
416 current_error =
417 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
418 task_id,
419 source: Some(current_error),
420 turbo_tasks: ctx.turbo_tasks(),
421 }));
422 }
423 current_error
424 }
425 }
426 }
427}
428
429pub(crate) struct OperationGuard<'a, B: BackingStorage> {
430 backend: Option<&'a TurboTasksBackendInner<B>>,
431}
432
433impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
434 fn drop(&mut self) {
435 if let Some(backend) = self.backend {
436 let fetch_sub = backend
437 .in_progress_operations
438 .fetch_sub(1, Ordering::AcqRel);
439 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
440 backend.operations_suspended.notify_all();
441 }
442 }
443 }
444}
445
446struct TaskExecutionCompletePrepareResult {
448 pub new_children: FxHashSet<TaskId>,
449 pub is_now_immutable: bool,
450 #[cfg(feature = "verify_determinism")]
451 pub no_output_set: bool,
452 pub new_output: Option<OutputValue>,
453 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
454 pub is_recomputation: bool,
455}
456
457impl<B: BackingStorage> TurboTasksBackendInner<B> {
459 fn connect_child(
460 &self,
461 parent_task: Option<TaskId>,
462 child_task: TaskId,
463 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
464 ) {
465 operation::ConnectChildOperation::run(
466 parent_task,
467 child_task,
468 self.execute_context(turbo_tasks),
469 );
470 }
471
472 fn try_read_task_output(
473 self: &Arc<Self>,
474 task_id: TaskId,
475 reader: Option<TaskId>,
476 options: ReadOutputOptions,
477 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
478 ) -> Result<Result<RawVc, EventListener>> {
479 self.assert_not_persistent_calling_transient(reader, task_id, None);
480
481 let mut ctx = self.execute_context(turbo_tasks);
482 let need_reader_task = if self.should_track_dependencies()
483 && !matches!(options.tracking, ReadTracking::Untracked)
484 && reader.is_some_and(|reader_id| reader_id != task_id)
485 && let Some(reader_id) = reader
486 && reader_id != task_id
487 {
488 Some(reader_id)
489 } else {
490 None
491 };
492 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
493 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
497 (task, Some(reader))
498 } else {
499 (ctx.task(task_id, TaskDataCategory::All), None)
500 };
501
502 fn listen_to_done_event(
503 reader_description: Option<EventDescription>,
504 tracking: ReadTracking,
505 done_event: &Event,
506 ) -> EventListener {
507 done_event.listen_with_note(move || {
508 move || {
509 if let Some(reader_description) = reader_description.as_ref() {
510 format!(
511 "try_read_task_output from {} ({})",
512 reader_description, tracking
513 )
514 } else {
515 format!("try_read_task_output ({})", tracking)
516 }
517 }
518 })
519 }
520
521 fn check_in_progress(
522 task: &impl TaskGuard,
523 reader_description: Option<EventDescription>,
524 tracking: ReadTracking,
525 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
526 {
527 match task.get_in_progress() {
528 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
529 listen_to_done_event(reader_description, tracking, done_event),
530 ))),
531 Some(InProgressState::InProgress(box InProgressStateInner {
532 done_event, ..
533 })) => Some(Ok(Err(listen_to_done_event(
534 reader_description,
535 tracking,
536 done_event,
537 )))),
538 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
539 "{} was canceled",
540 task.get_task_description()
541 ))),
542 None => None,
543 }
544 }
545
546 if matches!(options.consistency, ReadConsistency::Strong) {
547 loop {
549 let aggregation_number = get_aggregation_number(&task);
550 if is_root_node(aggregation_number) {
551 break;
552 }
553 drop(task);
554 drop(reader_task);
555 {
556 let _span = tracing::trace_span!(
557 "make root node for strongly consistent read",
558 task = self.debug_get_task_description(task_id)
559 )
560 .entered();
561 AggregationUpdateQueue::run(
562 AggregationUpdateJob::UpdateAggregationNumber {
563 task_id,
564 base_aggregation_number: u32::MAX,
565 distance: None,
566 },
567 &mut ctx,
568 );
569 }
570 (task, reader_task) = if let Some(reader_id) = need_reader_task {
571 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
573 (task, Some(reader))
574 } else {
575 (ctx.task(task_id, TaskDataCategory::All), None)
576 }
577 }
578
579 let is_dirty = task.is_dirty();
580
581 let has_dirty_containers = task.has_dirty_containers();
583 if has_dirty_containers || is_dirty.is_some() {
584 let activeness = task.get_activeness_mut();
585 let mut task_ids_to_schedule: Vec<_> = Vec::new();
586 let activeness = if let Some(activeness) = activeness {
588 activeness.set_active_until_clean();
592 activeness
593 } else {
594 if ctx.should_track_activeness() {
598 task_ids_to_schedule = task.dirty_containers().collect();
600 task_ids_to_schedule.push(task_id);
601 }
602 let activeness =
603 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
604 activeness.set_active_until_clean();
605 activeness
606 };
607 let listener = activeness.all_clean_event.listen_with_note(move || {
608 let this = self.clone();
609 let tt = turbo_tasks.pin();
610 move || {
611 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
612 let mut ctx = this.execute_context(tt);
613 let mut visited = FxHashSet::default();
614 fn indent(s: &str) -> String {
615 s.split_inclusive('\n')
616 .flat_map(|line: &str| [" ", line].into_iter())
617 .collect::<String>()
618 }
619 fn get_info(
620 ctx: &mut impl ExecuteContext<'_>,
621 task_id: TaskId,
622 parent_and_count: Option<(TaskId, i32)>,
623 visited: &mut FxHashSet<TaskId>,
624 ) -> String {
625 let task = ctx.task(task_id, TaskDataCategory::All);
626 let is_dirty = task.is_dirty();
627 let in_progress =
628 task.get_in_progress()
629 .map_or("not in progress", |p| match p {
630 InProgressState::InProgress(_) => "in progress",
631 InProgressState::Scheduled { .. } => "scheduled",
632 InProgressState::Canceled => "canceled",
633 });
634 let activeness = task.get_activeness().map_or_else(
635 || "not active".to_string(),
636 |activeness| format!("{activeness:?}"),
637 );
638 let aggregation_number = get_aggregation_number(&task);
639 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
640 {
641 let uppers = get_uppers(&task);
642 !uppers.contains(&parent_task_id)
643 } else {
644 false
645 };
646
647 let has_dirty_containers = task.has_dirty_containers();
649
650 let task_description = task.get_task_description();
651 let is_dirty_label = if let Some(parent_priority) = is_dirty {
652 format!(", dirty({parent_priority})")
653 } else {
654 String::new()
655 };
656 let has_dirty_containers_label = if has_dirty_containers {
657 ", dirty containers"
658 } else {
659 ""
660 };
661 let count = if let Some((_, count)) = parent_and_count {
662 format!(" {count}")
663 } else {
664 String::new()
665 };
666 let mut info = format!(
667 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
668 {in_progress}, \
669 {activeness}{is_dirty_label}{has_dirty_containers_label})",
670 );
671 let children: Vec<_> = task.dirty_containers_with_count().collect();
672 drop(task);
673
674 if missing_upper {
675 info.push_str("\n ERROR: missing upper connection");
676 }
677
678 if has_dirty_containers || !children.is_empty() {
679 writeln!(info, "\n dirty tasks:").unwrap();
680
681 for (child_task_id, count) in children {
682 let task_description = ctx
683 .task(child_task_id, TaskDataCategory::Data)
684 .get_task_description();
685 if visited.insert(child_task_id) {
686 let child_info = get_info(
687 ctx,
688 child_task_id,
689 Some((task_id, count)),
690 visited,
691 );
692 info.push_str(&indent(&child_info));
693 if !info.ends_with('\n') {
694 info.push('\n');
695 }
696 } else {
697 writeln!(
698 info,
699 " {child_task_id} {task_description} {count} \
700 (already visited)"
701 )
702 .unwrap();
703 }
704 }
705 }
706 info
707 }
708 let info = get_info(&mut ctx, task_id, None, &mut visited);
709 format!(
710 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
711 )
712 }
713 });
714 drop(reader_task);
715 drop(task);
716 if !task_ids_to_schedule.is_empty() {
717 let mut queue = AggregationUpdateQueue::new();
718 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
719 queue.execute(&mut ctx);
720 }
721
722 return Ok(Err(listener));
723 }
724 }
725
726 let reader_description = reader_task
727 .as_ref()
728 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
729 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
730 {
731 return value;
732 }
733
734 if let Some(output) = task.get_output() {
735 let result = match output {
736 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
737 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
738 OutputValue::Error(error) => Err(error.clone()),
739 };
740 if let Some(mut reader_task) = reader_task.take()
741 && options.tracking.should_track(result.is_err())
742 && (!task.immutable() || cfg!(feature = "verify_immutable"))
743 {
744 #[cfg(feature = "trace_task_output_dependencies")]
745 let _span = tracing::trace_span!(
746 "add output dependency",
747 task = %task_id,
748 dependent_task = ?reader
749 )
750 .entered();
751 let mut queue = LeafDistanceUpdateQueue::new();
752 let reader = reader.unwrap();
753 if task.add_output_dependent(reader) {
754 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
756 let reader_leaf_distance =
757 reader_task.get_leaf_distance().copied().unwrap_or_default();
758 if reader_leaf_distance.distance <= leaf_distance.distance {
759 queue.push(
760 reader,
761 leaf_distance.distance,
762 leaf_distance.max_distance_in_buffer,
763 );
764 }
765 }
766
767 drop(task);
768
769 if !reader_task.remove_outdated_output_dependencies(&task_id) {
775 let _ = reader_task.add_output_dependencies(task_id);
776 }
777 drop(reader_task);
778
779 queue.execute(&mut ctx);
780 } else {
781 drop(task);
782 }
783
784 return result.map_err(|error| {
785 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
786 .with_task_context(task_id, turbo_tasks.pin())
787 .into()
788 });
789 }
790 drop(reader_task);
791
792 let note = EventDescription::new(|| {
793 move || {
794 if let Some(reader) = reader_description.as_ref() {
795 format!("try_read_task_output (recompute) from {reader}",)
796 } else {
797 "try_read_task_output (recompute, untracked)".to_string()
798 }
799 }
800 });
801
802 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
804 TaskExecutionReason::OutputNotAvailable,
805 EventDescription::new(|| task.get_task_desc_fn()),
806 note,
807 );
808
809 let old = task.set_in_progress(in_progress_state);
812 debug_assert!(old.is_none(), "InProgress already exists");
813 ctx.schedule_task(task, TaskPriority::Initial);
814
815 Ok(Err(listener))
816 }
817
818 fn try_read_task_cell(
819 &self,
820 task_id: TaskId,
821 reader: Option<TaskId>,
822 cell: CellId,
823 options: ReadCellOptions,
824 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
825 ) -> Result<Result<TypedCellContent, EventListener>> {
826 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
827
828 fn add_cell_dependency(
829 task_id: TaskId,
830 mut task: impl TaskGuard,
831 reader: Option<TaskId>,
832 reader_task: Option<impl TaskGuard>,
833 cell: CellId,
834 key: Option<u64>,
835 ) {
836 if let Some(mut reader_task) = reader_task
837 && (!task.immutable() || cfg!(feature = "verify_immutable"))
838 {
839 let reader = reader.unwrap();
840 let _ = task.add_cell_dependents((cell, key, reader));
841 drop(task);
842
843 let target = CellRef {
849 task: task_id,
850 cell,
851 };
852 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
853 let _ = reader_task.add_cell_dependencies((target, key));
854 }
855 drop(reader_task);
856 }
857 }
858
859 let ReadCellOptions {
860 is_serializable_cell_content,
861 tracking,
862 final_read_hint,
863 } = options;
864
865 let mut ctx = self.execute_context(turbo_tasks);
866 let (mut task, reader_task) = if self.should_track_dependencies()
867 && !matches!(tracking, ReadCellTracking::Untracked)
868 && let Some(reader_id) = reader
869 && reader_id != task_id
870 {
871 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
875 (task, Some(reader))
876 } else {
877 (ctx.task(task_id, TaskDataCategory::All), None)
878 };
879
880 let content = if final_read_hint {
881 task.remove_cell_data(is_serializable_cell_content, cell)
882 } else {
883 task.get_cell_data(is_serializable_cell_content, cell)
884 };
885 if let Some(content) = content {
886 if tracking.should_track(false) {
887 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
888 }
889 return Ok(Ok(TypedCellContent(
890 cell.type_id,
891 CellContent(Some(content.reference)),
892 )));
893 }
894
895 let in_progress = task.get_in_progress();
896 if matches!(
897 in_progress,
898 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
899 ) {
900 return Ok(Err(self
901 .listen_to_cell(&mut task, task_id, &reader_task, cell)
902 .0));
903 }
904 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
905
906 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
908 let Some(max_id) = max_id else {
909 let task_desc = task.get_task_description();
910 if tracking.should_track(true) {
911 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
912 }
913 bail!(
914 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
915 );
916 };
917 if cell.index >= max_id {
918 let task_desc = task.get_task_description();
919 if tracking.should_track(true) {
920 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
921 }
922 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
923 }
924
925 if is_cancelled {
931 bail!("{} was canceled", task.get_task_description());
932 }
933
934 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
936 drop(reader_task);
937 if !new_listener {
938 return Ok(Err(listener));
939 }
940
941 let _span = tracing::trace_span!(
942 "recomputation",
943 cell_type = get_value_type(cell.type_id).ty.global_name,
944 cell_index = cell.index
945 )
946 .entered();
947
948 let _ = task.add_scheduled(
949 TaskExecutionReason::CellNotAvailable,
950 EventDescription::new(|| task.get_task_desc_fn()),
951 );
952 ctx.schedule_task(task, TaskPriority::Initial);
953
954 Ok(Err(listener))
955 }
956
957 fn listen_to_cell(
958 &self,
959 task: &mut impl TaskGuard,
960 task_id: TaskId,
961 reader_task: &Option<impl TaskGuard>,
962 cell: CellId,
963 ) -> (EventListener, bool) {
964 let note = || {
965 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
966 move || {
967 if let Some(reader_desc) = reader_desc.as_ref() {
968 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
969 } else {
970 "try_read_task_cell (in progress, untracked)".to_string()
971 }
972 }
973 };
974 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
975 let listener = in_progress.event.listen_with_note(note);
977 return (listener, false);
978 }
979 let in_progress = InProgressCellState::new(task_id, cell);
980 let listener = in_progress.event.listen_with_note(note);
981 let old = task.insert_in_progress_cells(cell, in_progress);
982 debug_assert!(old.is_none(), "InProgressCell already exists");
983 (listener, true)
984 }
985
986 fn snapshot_and_persist(
987 &self,
988 parent_span: Option<tracing::Id>,
989 reason: &str,
990 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
991 ) -> Result<(Instant, bool), anyhow::Error> {
992 let snapshot_span =
993 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
994 .entered();
995 let start = Instant::now();
996 let wall_start = SystemTime::now();
1000 debug_assert!(self.should_persist());
1001
1002 let suspended_operations;
1003 {
1004 let _span = tracing::info_span!("blocking").entered();
1005 let mut snapshot_request = self.snapshot_request.lock();
1006 snapshot_request.snapshot_requested = true;
1007 let active_operations = self
1008 .in_progress_operations
1009 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1010 if active_operations != 0 {
1011 self.operations_suspended
1012 .wait_while(&mut snapshot_request, |_| {
1013 self.in_progress_operations.load(Ordering::Relaxed)
1014 != SNAPSHOT_REQUESTED_BIT
1015 });
1016 }
1017 suspended_operations = snapshot_request
1018 .suspended_operations
1019 .iter()
1020 .map(|op| op.arc().clone())
1021 .collect::<Vec<_>>();
1022 }
1023 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
1026 let mut snapshot_request = self.snapshot_request.lock();
1027 snapshot_request.snapshot_requested = false;
1028 self.in_progress_operations
1029 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1030 self.snapshot_completed.notify_all();
1031 let snapshot_time = Instant::now();
1032 drop(snapshot_request);
1033
1034 if !has_modifications {
1035 drop(snapshot_guard);
1038 return Ok((start, false));
1039 }
1040
1041 #[cfg(feature = "print_cache_item_size")]
1042 #[derive(Default)]
1043 struct TaskCacheStats {
1044 data: usize,
1045 #[cfg(feature = "print_cache_item_size_with_compressed")]
1046 data_compressed: usize,
1047 data_count: usize,
1048 meta: usize,
1049 #[cfg(feature = "print_cache_item_size_with_compressed")]
1050 meta_compressed: usize,
1051 meta_count: usize,
1052 upper_count: usize,
1053 collectibles_count: usize,
1054 aggregated_collectibles_count: usize,
1055 children_count: usize,
1056 followers_count: usize,
1057 collectibles_dependents_count: usize,
1058 aggregated_dirty_containers_count: usize,
1059 output_size: usize,
1060 }
1061 #[cfg(feature = "print_cache_item_size")]
1064 struct FormatSizes {
1065 size: usize,
1066 #[cfg(feature = "print_cache_item_size_with_compressed")]
1067 compressed_size: usize,
1068 }
1069 #[cfg(feature = "print_cache_item_size")]
1070 impl std::fmt::Display for FormatSizes {
1071 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1072 use turbo_tasks::util::FormatBytes;
1073 #[cfg(feature = "print_cache_item_size_with_compressed")]
1074 {
1075 write!(
1076 f,
1077 "{} ({} compressed)",
1078 FormatBytes(self.size),
1079 FormatBytes(self.compressed_size)
1080 )
1081 }
1082 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1083 {
1084 write!(f, "{}", FormatBytes(self.size))
1085 }
1086 }
1087 }
1088 #[cfg(feature = "print_cache_item_size")]
1089 impl TaskCacheStats {
1090 #[cfg(feature = "print_cache_item_size_with_compressed")]
1091 fn compressed_size(data: &[u8]) -> Result<usize> {
1092 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1093 data,
1094 &mut Vec::new(),
1095 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1096 )?)
1097 }
1098
1099 fn add_data(&mut self, data: &[u8]) {
1100 self.data += data.len();
1101 #[cfg(feature = "print_cache_item_size_with_compressed")]
1102 {
1103 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1104 }
1105 self.data_count += 1;
1106 }
1107
1108 fn add_meta(&mut self, data: &[u8]) {
1109 self.meta += data.len();
1110 #[cfg(feature = "print_cache_item_size_with_compressed")]
1111 {
1112 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1113 }
1114 self.meta_count += 1;
1115 }
1116
1117 fn add_counts(&mut self, storage: &TaskStorage) {
1118 let counts = storage.meta_counts();
1119 self.upper_count += counts.upper;
1120 self.collectibles_count += counts.collectibles;
1121 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1122 self.children_count += counts.children;
1123 self.followers_count += counts.followers;
1124 self.collectibles_dependents_count += counts.collectibles_dependents;
1125 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1126 if let Some(output) = storage.get_output() {
1127 use turbo_bincode::turbo_bincode_encode;
1128
1129 self.output_size += turbo_bincode_encode(&output)
1130 .map(|data| data.len())
1131 .unwrap_or(0);
1132 }
1133 }
1134
1135 fn task_name(storage: &TaskStorage) -> String {
1137 storage
1138 .get_persistent_task_type()
1139 .map(|t| t.to_string())
1140 .unwrap_or_else(|| "<unknown>".to_string())
1141 }
1142
1143 fn sort_key(&self) -> usize {
1146 #[cfg(feature = "print_cache_item_size_with_compressed")]
1147 {
1148 self.data_compressed + self.meta_compressed
1149 }
1150 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1151 {
1152 self.data + self.meta
1153 }
1154 }
1155
1156 fn format_total(&self) -> FormatSizes {
1157 FormatSizes {
1158 size: self.data + self.meta,
1159 #[cfg(feature = "print_cache_item_size_with_compressed")]
1160 compressed_size: self.data_compressed + self.meta_compressed,
1161 }
1162 }
1163
1164 fn format_data(&self) -> FormatSizes {
1165 FormatSizes {
1166 size: self.data,
1167 #[cfg(feature = "print_cache_item_size_with_compressed")]
1168 compressed_size: self.data_compressed,
1169 }
1170 }
1171
1172 fn format_avg_data(&self) -> FormatSizes {
1173 FormatSizes {
1174 size: self.data.checked_div(self.data_count).unwrap_or(0),
1175 #[cfg(feature = "print_cache_item_size_with_compressed")]
1176 compressed_size: self
1177 .data_compressed
1178 .checked_div(self.data_count)
1179 .unwrap_or(0),
1180 }
1181 }
1182
1183 fn format_meta(&self) -> FormatSizes {
1184 FormatSizes {
1185 size: self.meta,
1186 #[cfg(feature = "print_cache_item_size_with_compressed")]
1187 compressed_size: self.meta_compressed,
1188 }
1189 }
1190
1191 fn format_avg_meta(&self) -> FormatSizes {
1192 FormatSizes {
1193 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1194 #[cfg(feature = "print_cache_item_size_with_compressed")]
1195 compressed_size: self
1196 .meta_compressed
1197 .checked_div(self.meta_count)
1198 .unwrap_or(0),
1199 }
1200 }
1201 }
1202 #[cfg(feature = "print_cache_item_size")]
1203 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1204 Mutex::new(FxHashMap::default());
1205
1206 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1213 let encode_category = |task_id: TaskId,
1214 data: &TaskStorage,
1215 category: SpecificTaskDataCategory,
1216 buffer: &mut TurboBincodeBuffer|
1217 -> Option<TurboBincodeBuffer> {
1218 match encode_task_data(task_id, data, category, buffer) {
1219 Ok(encoded) => {
1220 #[cfg(feature = "print_cache_item_size")]
1221 {
1222 let mut stats = task_cache_stats.lock();
1223 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1224 match category {
1225 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1226 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1227 }
1228 }
1229 Some(encoded)
1230 }
1231 Err(err) => {
1232 panic!(
1233 "Serializing task {} failed ({:?}): {:?}",
1234 self.debug_get_task_description(task_id),
1235 category,
1236 err
1237 );
1238 }
1239 }
1240 };
1241 if task_id.is_transient() {
1242 unreachable!("transient task_ids should never be enqueued to be persisted");
1243 }
1244
1245 let encode_meta = inner.flags.meta_modified();
1246 let encode_data = inner.flags.data_modified();
1247
1248 #[cfg(feature = "print_cache_item_size")]
1249 if encode_data || encode_meta {
1250 task_cache_stats
1251 .lock()
1252 .entry(TaskCacheStats::task_name(inner))
1253 .or_default()
1254 .add_counts(inner);
1255 }
1256
1257 let meta = if encode_meta {
1258 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1259 } else {
1260 None
1261 };
1262
1263 let data = if encode_data {
1264 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1265 } else {
1266 None
1267 };
1268 let task_type_hash = if inner.flags.new_task() {
1269 let task_type = inner.get_persistent_task_type().expect(
1270 "It is not possible for a new_task to not have a persistent_task_type. Task \
1271 creation for persistent tasks uses a single ExecutionContextImpl for \
1272 creating the task (which sets new_task) and connect_child (which sets \
1273 persistent_task_type) and take_snapshot waits for all operations to complete \
1274 or suspend before we start snapshotting. So task creation will always set \
1275 the task_type.",
1276 );
1277 Some(compute_task_type_hash(task_type))
1278 } else {
1279 None
1280 };
1281
1282 SnapshotItem {
1283 task_id,
1284 meta,
1285 data,
1286 task_type_hash,
1287 }
1288 };
1289
1290 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1291
1292 drop(snapshot_span);
1293 let snapshot_duration = start.elapsed();
1294 let task_count = task_snapshots.len();
1295
1296 if task_snapshots.is_empty() {
1297 std::hint::cold_path();
1300 return Ok((snapshot_time, false));
1301 }
1302
1303 let persist_start = Instant::now();
1304 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1305 {
1306 self.backing_storage
1310 .save_snapshot(suspended_operations, task_snapshots)?;
1311 #[cfg(feature = "print_cache_item_size")]
1312 {
1313 let mut task_cache_stats = task_cache_stats
1314 .into_inner()
1315 .into_iter()
1316 .collect::<Vec<_>>();
1317 if !task_cache_stats.is_empty() {
1318 use turbo_tasks::util::FormatBytes;
1319
1320 use crate::utils::markdown_table::print_markdown_table;
1321
1322 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1323 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1324 });
1325
1326 println!(
1327 "Task cache stats: {}",
1328 FormatSizes {
1329 size: task_cache_stats
1330 .iter()
1331 .map(|(_, s)| s.data + s.meta)
1332 .sum::<usize>(),
1333 #[cfg(feature = "print_cache_item_size_with_compressed")]
1334 compressed_size: task_cache_stats
1335 .iter()
1336 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1337 .sum::<usize>()
1338 },
1339 );
1340
1341 print_markdown_table(
1342 [
1343 "Task",
1344 " Total Size",
1345 " Data Size",
1346 " Data Count x Avg",
1347 " Data Count x Avg",
1348 " Meta Size",
1349 " Meta Count x Avg",
1350 " Meta Count x Avg",
1351 " Uppers",
1352 " Coll",
1353 " Agg Coll",
1354 " Children",
1355 " Followers",
1356 " Coll Deps",
1357 " Agg Dirty",
1358 " Output Size",
1359 ],
1360 task_cache_stats.iter(),
1361 |(task_desc, stats)| {
1362 [
1363 task_desc.to_string(),
1364 format!(" {}", stats.format_total()),
1365 format!(" {}", stats.format_data()),
1366 format!(" {} x", stats.data_count),
1367 format!("{}", stats.format_avg_data()),
1368 format!(" {}", stats.format_meta()),
1369 format!(" {} x", stats.meta_count),
1370 format!("{}", stats.format_avg_meta()),
1371 format!(" {}", stats.upper_count),
1372 format!(" {}", stats.collectibles_count),
1373 format!(" {}", stats.aggregated_collectibles_count),
1374 format!(" {}", stats.children_count),
1375 format!(" {}", stats.followers_count),
1376 format!(" {}", stats.collectibles_dependents_count),
1377 format!(" {}", stats.aggregated_dirty_containers_count),
1378 format!(" {}", FormatBytes(stats.output_size)),
1379 ]
1380 },
1381 );
1382 }
1383 }
1384 }
1385
1386 let elapsed = start.elapsed();
1387 let persist_duration = persist_start.elapsed();
1388 if elapsed > Duration::from_secs(10) {
1390 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1391 "Finished writing to filesystem cache".to_string(),
1392 elapsed,
1393 )));
1394 }
1395
1396 let wall_start_ms = wall_start
1397 .duration_since(SystemTime::UNIX_EPOCH)
1398 .unwrap_or_default()
1399 .as_secs_f64()
1401 * 1000.0;
1402 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1403 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1404 "turbopack-persistence",
1405 wall_start_ms,
1406 wall_end_ms,
1407 vec![
1408 ("reason", serde_json::Value::from(reason)),
1409 (
1410 "snapshot_duration_ms",
1411 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1412 ),
1413 (
1414 "persist_duration_ms",
1415 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1416 ),
1417 ("task_count", serde_json::Value::from(task_count)),
1418 ],
1419 )));
1420
1421 Ok((snapshot_time, true))
1422 }
1423
1424 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1425 if self.should_restore() {
1426 let uncompleted_operations = self
1430 .backing_storage
1431 .uncompleted_operations()
1432 .expect("Failed to get uncompleted operations");
1433 if !uncompleted_operations.is_empty() {
1434 let mut ctx = self.execute_context(turbo_tasks);
1435 for op in uncompleted_operations {
1436 op.execute(&mut ctx);
1437 }
1438 }
1439 }
1440
1441 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1444 let _span = trace_span!("persisting background job").entered();
1446 let _span = tracing::info_span!("thread").entered();
1447 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1448 }
1449 }
1450
1451 fn stopping(&self) {
1452 self.stopping.store(true, Ordering::Release);
1453 self.stopping_event.notify(usize::MAX);
1454 }
1455
1456 #[allow(unused_variables)]
1457 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1458 #[cfg(feature = "verify_aggregation_graph")]
1459 {
1460 self.is_idle.store(false, Ordering::Release);
1461 self.verify_aggregation_graph(turbo_tasks, false);
1462 }
1463 if self.should_persist()
1464 && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1465 {
1466 eprintln!("Persisting failed during shutdown: {err:?}");
1467 }
1468 drop_contents(&self.task_cache);
1469 self.storage.drop_contents();
1470 if let Err(err) = self.backing_storage.shutdown() {
1471 println!("Shutting down failed: {err}");
1472 }
1473 }
1474
1475 #[allow(unused_variables)]
1476 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1477 self.idle_start_event.notify(usize::MAX);
1478
1479 #[cfg(feature = "verify_aggregation_graph")]
1480 {
1481 use tokio::select;
1482
1483 self.is_idle.store(true, Ordering::Release);
1484 let this = self.clone();
1485 let turbo_tasks = turbo_tasks.pin();
1486 tokio::task::spawn(async move {
1487 select! {
1488 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1489 }
1491 _ = this.idle_end_event.listen() => {
1492 return;
1493 }
1494 }
1495 if !this.is_idle.load(Ordering::Relaxed) {
1496 return;
1497 }
1498 this.verify_aggregation_graph(&*turbo_tasks, true);
1499 });
1500 }
1501 }
1502
1503 fn idle_end(&self) {
1504 #[cfg(feature = "verify_aggregation_graph")]
1505 self.is_idle.store(false, Ordering::Release);
1506 self.idle_end_event.notify(usize::MAX);
1507 }
1508
1509 fn get_or_create_task(
1510 &self,
1511 native_fn: &'static NativeFunction,
1512 this: Option<RawVc>,
1513 arg: &mut dyn StackDynTaskInputs,
1514 parent_task: Option<TaskId>,
1515 persistence: TaskPersistence,
1516 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1517 ) -> TaskId {
1518 let transient = matches!(persistence, TaskPersistence::Transient);
1519
1520 if transient
1521 && let Some(parent_task) = parent_task
1522 && !parent_task.is_transient()
1523 {
1524 let task_type = CachedTaskType {
1525 native_fn,
1526 this,
1527 arg: arg.take_box(),
1528 };
1529 self.panic_persistent_calling_transient(
1530 self.debug_get_task_description(parent_task),
1531 Some(&task_type),
1532 None,
1533 );
1534 }
1535
1536 let is_root = native_fn.is_root;
1537
1538 let arg_ref = arg.as_ref();
1540 let hash = CachedTaskType::hash_from_components(
1541 self.task_cache.hasher(),
1542 native_fn,
1543 this,
1544 arg_ref,
1545 );
1546 let shard = get_shard(&self.task_cache, hash);
1550
1551 if let Some(task_id) =
1555 raw_get_in_shard(shard, hash, |k| k.eq_components(native_fn, this, arg_ref))
1556 {
1557 self.track_cache_hit_by_fn(native_fn);
1558 self.connect_child(parent_task, task_id, turbo_tasks);
1559 return task_id;
1560 }
1561
1562 let mut ctx = self.execute_context(turbo_tasks);
1564
1565 let mut is_new = false;
1566
1567 let task_id = if !transient
1570 && let Some((task_id, stored_type)) = ctx.task_by_type(native_fn, this, arg_ref)
1571 {
1572 self.track_cache_hit_by_fn(native_fn);
1573 match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| {
1576 k.eq_components(native_fn, this, arg_ref)
1577 }) {
1578 RawEntry::Occupied(_) => {}
1579 RawEntry::Vacant(e) => {
1580 e.insert(stored_type, task_id);
1581 }
1582 };
1583 task_id
1584 } else {
1585 match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| {
1586 k.eq_components(native_fn, this, arg_ref)
1587 }) {
1588 RawEntry::Occupied(e) => {
1589 let task_id = *e.get();
1592 drop(e);
1593 self.track_cache_hit_by_fn(native_fn);
1594 task_id
1595 }
1596 RawEntry::Vacant(e) => {
1597 let task_type = Arc::new(CachedTaskType {
1601 native_fn,
1602 this,
1603 arg: arg.take_box(),
1604 });
1605 let task_id = if transient {
1606 self.transient_task_id_factory.get()
1607 } else {
1608 self.persisted_task_id_factory.get()
1609 };
1610 self.storage
1614 .initialize_new_task(task_id, Some(task_type.clone()));
1615 e.insert(task_type, task_id);
1617 self.track_cache_miss_by_fn(native_fn);
1618 is_new = true;
1619 task_id
1620 }
1621 }
1622 };
1623
1624 if is_new && is_root {
1625 AggregationUpdateQueue::run(
1626 AggregationUpdateJob::UpdateAggregationNumber {
1627 task_id,
1628 base_aggregation_number: u32::MAX,
1629 distance: None,
1630 },
1631 &mut ctx,
1632 );
1633 }
1634
1635 operation::ConnectChildOperation::run(parent_task, task_id, ctx);
1636
1637 task_id
1638 }
1639
1640 fn debug_trace_transient_task(
1643 &self,
1644 task_type: &CachedTaskType,
1645 cell_id: Option<CellId>,
1646 ) -> DebugTraceTransientTask {
1647 fn inner_id(
1650 backend: &TurboTasksBackendInner<impl BackingStorage>,
1651 task_id: TaskId,
1652 cell_type_id: Option<ValueTypeId>,
1653 visited_set: &mut FxHashSet<TaskId>,
1654 ) -> DebugTraceTransientTask {
1655 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1656 if visited_set.contains(&task_id) {
1657 let task_name = task_type.get_name();
1658 DebugTraceTransientTask::Collapsed {
1659 task_name,
1660 cell_type_id,
1661 }
1662 } else {
1663 inner_cached(backend, &task_type, cell_type_id, visited_set)
1664 }
1665 } else {
1666 DebugTraceTransientTask::Uncached { cell_type_id }
1667 }
1668 }
1669 fn inner_cached(
1670 backend: &TurboTasksBackendInner<impl BackingStorage>,
1671 task_type: &CachedTaskType,
1672 cell_type_id: Option<ValueTypeId>,
1673 visited_set: &mut FxHashSet<TaskId>,
1674 ) -> DebugTraceTransientTask {
1675 let task_name = task_type.get_name();
1676
1677 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1678 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1679 return None;
1683 };
1684 if task_id.is_transient() {
1685 Some(Box::new(inner_id(
1686 backend,
1687 task_id,
1688 cause_self_raw_vc.try_get_type_id(),
1689 visited_set,
1690 )))
1691 } else {
1692 None
1693 }
1694 });
1695 let cause_args = task_type
1696 .arg
1697 .get_raw_vcs()
1698 .into_iter()
1699 .filter_map(|raw_vc| {
1700 let Some(task_id) = raw_vc.try_get_task_id() else {
1701 return None;
1703 };
1704 if !task_id.is_transient() {
1705 return None;
1706 }
1707 Some((task_id, raw_vc.try_get_type_id()))
1708 })
1709 .collect::<IndexSet<_>>() .into_iter()
1711 .map(|(task_id, cell_type_id)| {
1712 inner_id(backend, task_id, cell_type_id, visited_set)
1713 })
1714 .collect();
1715
1716 DebugTraceTransientTask::Cached {
1717 task_name,
1718 cell_type_id,
1719 cause_self,
1720 cause_args,
1721 }
1722 }
1723 inner_cached(
1724 self,
1725 task_type,
1726 cell_id.map(|c| c.type_id),
1727 &mut FxHashSet::default(),
1728 )
1729 }
1730
1731 fn invalidate_task(
1732 &self,
1733 task_id: TaskId,
1734 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1735 ) {
1736 if !self.should_track_dependencies() {
1737 panic!("Dependency tracking is disabled so invalidation is not allowed");
1738 }
1739 operation::InvalidateOperation::run(
1740 smallvec![task_id],
1741 #[cfg(feature = "trace_task_dirty")]
1742 TaskDirtyCause::Invalidator,
1743 self.execute_context(turbo_tasks),
1744 );
1745 }
1746
1747 fn invalidate_tasks(
1748 &self,
1749 tasks: &[TaskId],
1750 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1751 ) {
1752 if !self.should_track_dependencies() {
1753 panic!("Dependency tracking is disabled so invalidation is not allowed");
1754 }
1755 operation::InvalidateOperation::run(
1756 tasks.iter().copied().collect(),
1757 #[cfg(feature = "trace_task_dirty")]
1758 TaskDirtyCause::Unknown,
1759 self.execute_context(turbo_tasks),
1760 );
1761 }
1762
1763 fn invalidate_tasks_set(
1764 &self,
1765 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1766 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1767 ) {
1768 if !self.should_track_dependencies() {
1769 panic!("Dependency tracking is disabled so invalidation is not allowed");
1770 }
1771 operation::InvalidateOperation::run(
1772 tasks.iter().copied().collect(),
1773 #[cfg(feature = "trace_task_dirty")]
1774 TaskDirtyCause::Unknown,
1775 self.execute_context(turbo_tasks),
1776 );
1777 }
1778
1779 fn invalidate_serialization(
1780 &self,
1781 task_id: TaskId,
1782 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1783 ) {
1784 if task_id.is_transient() {
1785 return;
1786 }
1787 let mut ctx = self.execute_context(turbo_tasks);
1788 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1789 task.invalidate_serialization();
1790 }
1791
1792 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1793 let task = self.storage.access_mut(task_id);
1794 if let Some(value) = task.get_persistent_task_type() {
1795 format!("{task_id:?} {}", value)
1796 } else if let Some(value) = task.get_transient_task_type() {
1797 format!("{task_id:?} {}", value)
1798 } else {
1799 format!("{task_id:?} unknown")
1800 }
1801 }
1802
1803 fn get_task_name(
1804 &self,
1805 task_id: TaskId,
1806 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1807 ) -> String {
1808 let mut ctx = self.execute_context(turbo_tasks);
1809 let task = ctx.task(task_id, TaskDataCategory::Data);
1810 if let Some(value) = task.get_persistent_task_type() {
1811 value.to_string()
1812 } else if let Some(value) = task.get_transient_task_type() {
1813 value.to_string()
1814 } else {
1815 "unknown".to_string()
1816 }
1817 }
1818
1819 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1820 let task = self.storage.access_mut(task_id);
1821 task.get_persistent_task_type().cloned()
1822 }
1823
1824 fn task_execution_canceled(
1825 &self,
1826 task_id: TaskId,
1827 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1828 ) {
1829 let mut ctx = self.execute_context(turbo_tasks);
1830 let mut task = ctx.task(task_id, TaskDataCategory::All);
1831 if let Some(in_progress) = task.take_in_progress() {
1832 match in_progress {
1833 InProgressState::Scheduled {
1834 done_event,
1835 reason: _,
1836 } => done_event.notify(usize::MAX),
1837 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1838 done_event.notify(usize::MAX)
1839 }
1840 InProgressState::Canceled => {}
1841 }
1842 }
1843 let in_progress_cells = task.take_in_progress_cells();
1846 if let Some(ref cells) = in_progress_cells {
1847 for state in cells.values() {
1848 state.event.notify(usize::MAX);
1849 }
1850 }
1851
1852 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1858 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1859 } else {
1860 None
1861 };
1862
1863 let old = task.set_in_progress(InProgressState::Canceled);
1864 debug_assert!(old.is_none(), "InProgress already exists");
1865 drop(task);
1866
1867 if let Some(data_update) = data_update {
1868 AggregationUpdateQueue::run(data_update, &mut ctx);
1869 }
1870
1871 drop(in_progress_cells);
1872 }
1873
1874 fn try_start_task_execution(
1875 &self,
1876 task_id: TaskId,
1877 priority: TaskPriority,
1878 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1879 ) -> Option<TaskExecutionSpec<'_>> {
1880 let execution_reason;
1881 let task_type;
1882 {
1883 let mut ctx = self.execute_context(turbo_tasks);
1884 let mut task = ctx.task(task_id, TaskDataCategory::All);
1885 task_type = task.get_task_type().to_owned();
1886 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1887 if let Some(tasks) = task.prefetch() {
1888 drop(task);
1889 ctx.prepare_tasks(tasks, "prefetch");
1890 task = ctx.task(task_id, TaskDataCategory::All);
1891 }
1892 let in_progress = task.take_in_progress()?;
1893 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1894 let old = task.set_in_progress(in_progress);
1895 debug_assert!(old.is_none(), "InProgress already exists");
1896 return None;
1897 };
1898 execution_reason = reason;
1899 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1900 InProgressStateInner {
1901 stale: false,
1902 once_task,
1903 done_event,
1904 session_dependent: false,
1905 marked_as_completed: false,
1906 new_children: Default::default(),
1907 },
1908 )));
1909 debug_assert!(old.is_none(), "InProgress already exists");
1910
1911 enum Collectible {
1913 Current(CollectibleRef, i32),
1914 Outdated(CollectibleRef),
1915 }
1916 let collectibles = task
1917 .iter_collectibles()
1918 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1919 .chain(
1920 task.iter_outdated_collectibles()
1921 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1922 )
1923 .collect::<Vec<_>>();
1924 for collectible in collectibles {
1925 match collectible {
1926 Collectible::Current(collectible, value) => {
1927 let _ = task.insert_outdated_collectible(collectible, value);
1928 }
1929 Collectible::Outdated(collectible) => {
1930 if task
1931 .collectibles()
1932 .is_none_or(|m| m.get(&collectible).is_none())
1933 {
1934 task.remove_outdated_collectibles(&collectible);
1935 }
1936 }
1937 }
1938 }
1939
1940 if self.should_track_dependencies() {
1941 let cell_dependencies = task.iter_cell_dependencies().collect();
1943 task.set_outdated_cell_dependencies(cell_dependencies);
1944
1945 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1946 task.set_outdated_output_dependencies(outdated_output_dependencies);
1947 }
1948 }
1949
1950 let (span, future) = match task_type {
1951 TaskType::Cached(task_type) => {
1952 let CachedTaskType {
1953 native_fn,
1954 this,
1955 arg,
1956 } = &*task_type;
1957 (
1958 native_fn.span(task_id.persistence(), execution_reason, priority),
1959 native_fn.execute(*this, &**arg),
1960 )
1961 }
1962 TaskType::Transient(task_type) => {
1963 let span = tracing::trace_span!("turbo_tasks::root_task");
1964 let future = match &*task_type {
1965 TransientTask::Root(f) => f(),
1966 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1967 };
1968 (span, future)
1969 }
1970 };
1971 Some(TaskExecutionSpec { future, span })
1972 }
1973
1974 fn task_execution_completed(
1975 &self,
1976 task_id: TaskId,
1977 result: Result<RawVc, TurboTasksExecutionError>,
1978 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1979 #[cfg(feature = "verify_determinism")] stateful: bool,
1980 has_invalidator: bool,
1981 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1982 ) -> bool {
1983 #[cfg(not(feature = "trace_task_details"))]
1998 let span = tracing::trace_span!(
1999 "task execution completed",
2000 new_children = tracing::field::Empty
2001 )
2002 .entered();
2003 #[cfg(feature = "trace_task_details")]
2004 let span = tracing::trace_span!(
2005 "task execution completed",
2006 task_id = display(task_id),
2007 result = match result.as_ref() {
2008 Ok(value) => display(either::Either::Left(value)),
2009 Err(err) => display(either::Either::Right(err)),
2010 },
2011 new_children = tracing::field::Empty,
2012 immutable = tracing::field::Empty,
2013 new_output = tracing::field::Empty,
2014 output_dependents = tracing::field::Empty,
2015 stale = tracing::field::Empty,
2016 )
2017 .entered();
2018
2019 let is_error = result.is_err();
2020
2021 let mut ctx = self.execute_context(turbo_tasks);
2022
2023 let Some(TaskExecutionCompletePrepareResult {
2024 new_children,
2025 is_now_immutable,
2026 #[cfg(feature = "verify_determinism")]
2027 no_output_set,
2028 new_output,
2029 output_dependent_tasks,
2030 is_recomputation,
2031 }) = self.task_execution_completed_prepare(
2032 &mut ctx,
2033 #[cfg(feature = "trace_task_details")]
2034 &span,
2035 task_id,
2036 result,
2037 cell_counters,
2038 #[cfg(feature = "verify_determinism")]
2039 stateful,
2040 has_invalidator,
2041 )
2042 else {
2043 #[cfg(feature = "trace_task_details")]
2045 span.record("stale", "prepare");
2046 return true;
2047 };
2048
2049 #[cfg(feature = "trace_task_details")]
2050 span.record("new_output", new_output.is_some());
2051 #[cfg(feature = "trace_task_details")]
2052 span.record("output_dependents", output_dependent_tasks.len());
2053
2054 if !output_dependent_tasks.is_empty() {
2059 self.task_execution_completed_invalidate_output_dependent(
2060 &mut ctx,
2061 task_id,
2062 output_dependent_tasks,
2063 );
2064 }
2065
2066 let has_new_children = !new_children.is_empty();
2067 span.record("new_children", new_children.len());
2068
2069 if has_new_children {
2070 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2071 }
2072
2073 if has_new_children
2074 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2075 {
2076 #[cfg(feature = "trace_task_details")]
2078 span.record("stale", "connect");
2079 return true;
2080 }
2081
2082 let (stale, in_progress_cells) = self.task_execution_completed_finish(
2083 &mut ctx,
2084 task_id,
2085 #[cfg(feature = "verify_determinism")]
2086 no_output_set,
2087 new_output,
2088 is_now_immutable,
2089 );
2090 if stale {
2091 #[cfg(feature = "trace_task_details")]
2093 span.record("stale", "finish");
2094 return true;
2095 }
2096
2097 let removed_data = self.task_execution_completed_cleanup(
2098 &mut ctx,
2099 task_id,
2100 cell_counters,
2101 is_error,
2102 is_recomputation,
2103 );
2104
2105 drop(removed_data);
2107 drop(in_progress_cells);
2108
2109 false
2110 }
2111
2112 fn task_execution_completed_prepare(
2113 &self,
2114 ctx: &mut impl ExecuteContext<'_>,
2115 #[cfg(feature = "trace_task_details")] span: &Span,
2116 task_id: TaskId,
2117 result: Result<RawVc, TurboTasksExecutionError>,
2118 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2119 #[cfg(feature = "verify_determinism")] stateful: bool,
2120 has_invalidator: bool,
2121 ) -> Option<TaskExecutionCompletePrepareResult> {
2122 let mut task = ctx.task(task_id, TaskDataCategory::All);
2123 let is_recomputation = task.is_dirty().is_none();
2124 let Some(in_progress) = task.get_in_progress_mut() else {
2125 panic!("Task execution completed, but task is not in progress: {task:#?}");
2126 };
2127 if matches!(in_progress, InProgressState::Canceled) {
2128 return Some(TaskExecutionCompletePrepareResult {
2129 new_children: Default::default(),
2130 is_now_immutable: false,
2131 #[cfg(feature = "verify_determinism")]
2132 no_output_set: false,
2133 new_output: None,
2134 output_dependent_tasks: Default::default(),
2135 is_recomputation,
2136 });
2137 }
2138 let &mut InProgressState::InProgress(box InProgressStateInner {
2139 stale,
2140 ref mut new_children,
2141 session_dependent,
2142 once_task: is_once_task,
2143 ..
2144 }) = in_progress
2145 else {
2146 panic!("Task execution completed, but task is not in progress: {task:#?}");
2147 };
2148
2149 #[cfg(not(feature = "no_fast_stale"))]
2151 if stale && !is_once_task {
2152 let Some(InProgressState::InProgress(box InProgressStateInner {
2153 done_event,
2154 mut new_children,
2155 ..
2156 })) = task.take_in_progress()
2157 else {
2158 unreachable!();
2159 };
2160 let old = task.set_in_progress(InProgressState::Scheduled {
2161 done_event,
2162 reason: TaskExecutionReason::Stale,
2163 });
2164 debug_assert!(old.is_none(), "InProgress already exists");
2165 for task in task.iter_children() {
2168 new_children.remove(&task);
2169 }
2170 drop(task);
2171
2172 AggregationUpdateQueue::run(
2175 AggregationUpdateJob::DecreaseActiveCounts {
2176 task_ids: new_children.into_iter().collect(),
2177 },
2178 ctx,
2179 );
2180 return None;
2181 }
2182
2183 let mut new_children = take(new_children);
2185
2186 #[cfg(feature = "verify_determinism")]
2188 if stateful {
2189 task.set_stateful(true);
2190 }
2191
2192 if has_invalidator {
2194 task.set_invalidator(true);
2195 }
2196
2197 if result.is_ok() || is_recomputation {
2207 let old_counters: FxHashMap<_, _> = task
2208 .iter_cell_type_max_index()
2209 .map(|(&k, &v)| (k, v))
2210 .collect();
2211 let mut counters_to_remove = old_counters.clone();
2212
2213 for (&cell_type, &max_index) in cell_counters.iter() {
2214 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2215 if old_max_index != max_index {
2216 task.insert_cell_type_max_index(cell_type, max_index);
2217 }
2218 } else {
2219 task.insert_cell_type_max_index(cell_type, max_index);
2220 }
2221 }
2222 for (cell_type, _) in counters_to_remove {
2223 task.remove_cell_type_max_index(&cell_type);
2224 }
2225 }
2226
2227 let mut queue = AggregationUpdateQueue::new();
2228
2229 let mut old_edges = Vec::new();
2230
2231 let has_children = !new_children.is_empty();
2232 let is_immutable = task.immutable();
2233 let task_dependencies_for_immutable =
2234 if !is_immutable
2236 && !session_dependent
2238 && !task.invalidator()
2240 && task.is_collectibles_dependencies_empty()
2242 {
2243 Some(
2244 task.iter_output_dependencies()
2246 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2247 .collect::<FxHashSet<_>>(),
2248 )
2249 } else {
2250 None
2251 };
2252
2253 if has_children {
2254 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2256
2257 old_edges.extend(
2259 task.iter_children()
2260 .filter(|task| !new_children.remove(task))
2261 .map(OutdatedEdge::Child),
2262 );
2263 } else {
2264 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2265 }
2266
2267 old_edges.extend(
2268 task.iter_outdated_collectibles()
2269 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2270 );
2271
2272 if self.should_track_dependencies() {
2273 old_edges.extend(
2280 task.iter_outdated_cell_dependencies()
2281 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2282 );
2283 old_edges.extend(
2284 task.iter_outdated_output_dependencies()
2285 .map(OutdatedEdge::OutputDependency),
2286 );
2287 }
2288
2289 let current_output = task.get_output();
2291 #[cfg(feature = "verify_determinism")]
2292 let no_output_set = current_output.is_none();
2293 let new_output = match result {
2294 Ok(RawVc::TaskOutput(output_task_id)) => {
2295 if let Some(OutputValue::Output(current_task_id)) = current_output
2296 && *current_task_id == output_task_id
2297 {
2298 None
2299 } else {
2300 Some(OutputValue::Output(output_task_id))
2301 }
2302 }
2303 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2304 if let Some(OutputValue::Cell(CellRef {
2305 task: current_task_id,
2306 cell: current_cell,
2307 })) = current_output
2308 && *current_task_id == output_task_id
2309 && *current_cell == cell
2310 {
2311 None
2312 } else {
2313 Some(OutputValue::Cell(CellRef {
2314 task: output_task_id,
2315 cell,
2316 }))
2317 }
2318 }
2319 Ok(RawVc::LocalOutput(..)) => {
2320 panic!("Non-local tasks must not return a local Vc");
2321 }
2322 Err(err) => {
2323 if let Some(OutputValue::Error(old_error)) = current_output
2324 && **old_error == err
2325 {
2326 None
2327 } else {
2328 Some(OutputValue::Error(Arc::new((&err).into())))
2329 }
2330 }
2331 };
2332 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2333 if new_output.is_some() && ctx.should_track_dependencies() {
2335 output_dependent_tasks = task.iter_output_dependent().collect();
2336 }
2337
2338 drop(task);
2339
2340 let mut is_now_immutable = false;
2342 if let Some(dependencies) = task_dependencies_for_immutable
2343 && dependencies
2344 .iter()
2345 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2346 {
2347 is_now_immutable = true;
2348 }
2349 #[cfg(feature = "trace_task_details")]
2350 span.record("immutable", is_immutable || is_now_immutable);
2351
2352 if !queue.is_empty() || !old_edges.is_empty() {
2353 #[cfg(feature = "trace_task_completion")]
2354 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2355 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2359 }
2360
2361 Some(TaskExecutionCompletePrepareResult {
2362 new_children,
2363 is_now_immutable,
2364 #[cfg(feature = "verify_determinism")]
2365 no_output_set,
2366 new_output,
2367 output_dependent_tasks,
2368 is_recomputation,
2369 })
2370 }
2371
2372 fn task_execution_completed_invalidate_output_dependent(
2373 &self,
2374 ctx: &mut impl ExecuteContext<'_>,
2375 task_id: TaskId,
2376 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2377 ) {
2378 debug_assert!(!output_dependent_tasks.is_empty());
2379
2380 if output_dependent_tasks.len() > 1 {
2381 ctx.prepare_tasks(
2382 output_dependent_tasks
2383 .iter()
2384 .map(|&id| (id, TaskDataCategory::All)),
2385 "invalidate output dependents",
2386 );
2387 }
2388
2389 fn process_output_dependents(
2390 ctx: &mut impl ExecuteContext<'_>,
2391 task_id: TaskId,
2392 dependent_task_id: TaskId,
2393 queue: &mut AggregationUpdateQueue,
2394 ) {
2395 #[cfg(feature = "trace_task_output_dependencies")]
2396 let span = tracing::trace_span!(
2397 "invalidate output dependency",
2398 task = %task_id,
2399 dependent_task = %dependent_task_id,
2400 result = tracing::field::Empty,
2401 )
2402 .entered();
2403 let mut make_stale = true;
2404 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2405 let transient_task_type = dependent.get_transient_task_type();
2406 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2407 #[cfg(feature = "trace_task_output_dependencies")]
2409 span.record("result", "once task");
2410 return;
2411 }
2412 if dependent.outdated_output_dependencies_contains(&task_id) {
2413 #[cfg(feature = "trace_task_output_dependencies")]
2414 span.record("result", "outdated dependency");
2415 make_stale = false;
2420 } else if !dependent.output_dependencies_contains(&task_id) {
2421 #[cfg(feature = "trace_task_output_dependencies")]
2424 span.record("result", "no backward dependency");
2425 return;
2426 }
2427 make_task_dirty_internal(
2428 dependent,
2429 dependent_task_id,
2430 make_stale,
2431 #[cfg(feature = "trace_task_dirty")]
2432 TaskDirtyCause::OutputChange { task_id },
2433 queue,
2434 ctx,
2435 );
2436 #[cfg(feature = "trace_task_output_dependencies")]
2437 span.record("result", "marked dirty");
2438 }
2439
2440 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2441 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2442 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2443 let _ = scope_and_block(chunks.len(), |scope| {
2444 for chunk in chunks {
2445 let child_ctx = ctx.child_context();
2446 scope.spawn(move || {
2447 let mut ctx = child_ctx.create();
2448 let mut queue = AggregationUpdateQueue::new();
2449 for dependent_task_id in chunk {
2450 process_output_dependents(
2451 &mut ctx,
2452 task_id,
2453 dependent_task_id,
2454 &mut queue,
2455 )
2456 }
2457 queue.execute(&mut ctx);
2458 });
2459 }
2460 });
2461 } else {
2462 let mut queue = AggregationUpdateQueue::new();
2463 for dependent_task_id in output_dependent_tasks {
2464 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2465 }
2466 queue.execute(ctx);
2467 }
2468 }
2469
2470 fn task_execution_completed_unfinished_children_dirty(
2471 &self,
2472 ctx: &mut impl ExecuteContext<'_>,
2473 new_children: &FxHashSet<TaskId>,
2474 ) {
2475 debug_assert!(!new_children.is_empty());
2476
2477 let mut queue = AggregationUpdateQueue::new();
2478 ctx.for_each_task_all(
2479 new_children.iter().copied(),
2480 "unfinished children dirty",
2481 |child_task, ctx| {
2482 if !child_task.has_output() {
2483 let child_id = child_task.id();
2484 make_task_dirty_internal(
2485 child_task,
2486 child_id,
2487 false,
2488 #[cfg(feature = "trace_task_dirty")]
2489 TaskDirtyCause::InitialDirty,
2490 &mut queue,
2491 ctx,
2492 );
2493 }
2494 },
2495 );
2496
2497 queue.execute(ctx);
2498 }
2499
2500 fn task_execution_completed_connect(
2501 &self,
2502 ctx: &mut impl ExecuteContext<'_>,
2503 task_id: TaskId,
2504 new_children: FxHashSet<TaskId>,
2505 ) -> bool {
2506 debug_assert!(!new_children.is_empty());
2507
2508 let mut task = ctx.task(task_id, TaskDataCategory::All);
2509 let Some(in_progress) = task.get_in_progress() else {
2510 panic!("Task execution completed, but task is not in progress: {task:#?}");
2511 };
2512 if matches!(in_progress, InProgressState::Canceled) {
2513 return false;
2515 }
2516 let InProgressState::InProgress(box InProgressStateInner {
2517 #[cfg(not(feature = "no_fast_stale"))]
2518 stale,
2519 once_task: is_once_task,
2520 ..
2521 }) = in_progress
2522 else {
2523 panic!("Task execution completed, but task is not in progress: {task:#?}");
2524 };
2525
2526 #[cfg(not(feature = "no_fast_stale"))]
2528 if *stale && !is_once_task {
2529 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2530 task.take_in_progress()
2531 else {
2532 unreachable!();
2533 };
2534 let old = task.set_in_progress(InProgressState::Scheduled {
2535 done_event,
2536 reason: TaskExecutionReason::Stale,
2537 });
2538 debug_assert!(old.is_none(), "InProgress already exists");
2539 drop(task);
2540
2541 AggregationUpdateQueue::run(
2544 AggregationUpdateJob::DecreaseActiveCounts {
2545 task_ids: new_children.into_iter().collect(),
2546 },
2547 ctx,
2548 );
2549 return true;
2550 }
2551
2552 let has_active_count = ctx.should_track_activeness()
2553 && task
2554 .get_activeness()
2555 .is_some_and(|activeness| activeness.active_counter > 0);
2556 connect_children(
2557 ctx,
2558 task_id,
2559 task,
2560 new_children,
2561 has_active_count,
2562 ctx.should_track_activeness(),
2563 );
2564
2565 false
2566 }
2567
2568 fn task_execution_completed_finish(
2569 &self,
2570 ctx: &mut impl ExecuteContext<'_>,
2571 task_id: TaskId,
2572 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2573 new_output: Option<OutputValue>,
2574 is_now_immutable: bool,
2575 ) -> (
2576 bool,
2577 Option<
2578 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2579 >,
2580 ) {
2581 let mut task = ctx.task(task_id, TaskDataCategory::All);
2582 let Some(in_progress) = task.take_in_progress() else {
2583 panic!("Task execution completed, but task is not in progress: {task:#?}");
2584 };
2585 if matches!(in_progress, InProgressState::Canceled) {
2586 return (false, None);
2588 }
2589 let InProgressState::InProgress(box InProgressStateInner {
2590 done_event,
2591 once_task: is_once_task,
2592 stale,
2593 session_dependent,
2594 marked_as_completed: _,
2595 new_children,
2596 }) = in_progress
2597 else {
2598 panic!("Task execution completed, but task is not in progress: {task:#?}");
2599 };
2600 debug_assert!(new_children.is_empty());
2601
2602 if stale && !is_once_task {
2604 let old = task.set_in_progress(InProgressState::Scheduled {
2605 done_event,
2606 reason: TaskExecutionReason::Stale,
2607 });
2608 debug_assert!(old.is_none(), "InProgress already exists");
2609 return (true, None);
2610 }
2611
2612 let mut old_content = None;
2614 if let Some(value) = new_output {
2615 old_content = task.set_output(value);
2616 }
2617
2618 if is_now_immutable {
2621 task.set_immutable(true);
2622 }
2623
2624 let in_progress_cells = task.take_in_progress_cells();
2626 if let Some(ref cells) = in_progress_cells {
2627 for state in cells.values() {
2628 state.event.notify(usize::MAX);
2629 }
2630 }
2631
2632 let new_dirtyness = if session_dependent {
2634 Some(Dirtyness::SessionDependent)
2635 } else {
2636 None
2637 };
2638 #[cfg(feature = "verify_determinism")]
2639 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2640 let data_update = task.update_dirty_state(new_dirtyness);
2641
2642 #[cfg(feature = "verify_determinism")]
2643 let reschedule =
2644 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2645 #[cfg(not(feature = "verify_determinism"))]
2646 let reschedule = false;
2647 if reschedule {
2648 let old = task.set_in_progress(InProgressState::Scheduled {
2649 done_event,
2650 reason: TaskExecutionReason::Stale,
2651 });
2652 debug_assert!(old.is_none(), "InProgress already exists");
2653 drop(task);
2654 } else {
2655 drop(task);
2656
2657 done_event.notify(usize::MAX);
2659 }
2660
2661 drop(old_content);
2662
2663 if let Some(data_update) = data_update {
2664 AggregationUpdateQueue::run(data_update, ctx);
2665 }
2666
2667 (reschedule, in_progress_cells)
2669 }
2670
2671 fn task_execution_completed_cleanup(
2672 &self,
2673 ctx: &mut impl ExecuteContext<'_>,
2674 task_id: TaskId,
2675 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2676 is_error: bool,
2677 is_recomputation: bool,
2678 ) -> Vec<SharedReference> {
2679 let mut task = ctx.task(task_id, TaskDataCategory::All);
2680 let mut removed_cell_data = Vec::new();
2681 if !is_error || is_recomputation {
2687 let to_remove_persistent: Vec<_> = task
2694 .iter_persistent_cell_data()
2695 .filter_map(|(cell, _)| {
2696 cell_counters
2697 .get(&cell.type_id)
2698 .is_none_or(|start_index| cell.index >= *start_index)
2699 .then_some(*cell)
2700 })
2701 .collect();
2702
2703 let to_remove_transient: Vec<_> = task
2705 .iter_transient_cell_data()
2706 .filter_map(|(cell, _)| {
2707 cell_counters
2708 .get(&cell.type_id)
2709 .is_none_or(|start_index| cell.index >= *start_index)
2710 .then_some(*cell)
2711 })
2712 .collect();
2713 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2714 for cell in to_remove_persistent {
2715 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2716 removed_cell_data.push(data.into_untyped());
2717 }
2718 }
2719 for cell in to_remove_transient {
2720 if let Some(data) = task.remove_transient_cell_data(&cell) {
2721 removed_cell_data.push(data);
2722 }
2723 }
2724 let to_remove_hash: Vec<_> = task
2726 .iter_cell_data_hash()
2727 .filter_map(|(cell, _)| {
2728 cell_counters
2729 .get(&cell.type_id)
2730 .is_none_or(|start_index| cell.index >= *start_index)
2731 .then_some(*cell)
2732 })
2733 .collect();
2734 for cell in to_remove_hash {
2735 task.remove_cell_data_hash(&cell);
2736 }
2737 }
2738
2739 task.cleanup_after_execution();
2743
2744 drop(task);
2745
2746 removed_cell_data
2748 }
2749
2750 fn log_unrecoverable_persist_error() {
2753 eprintln!(
2754 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2755 background persisting process."
2756 );
2757 }
2758
2759 fn run_backend_job<'a>(
2760 self: &'a Arc<Self>,
2761 job: TurboTasksBackendJob,
2762 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2763 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2764 Box::pin(async move {
2765 match job {
2766 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2767 debug_assert!(self.should_persist());
2768
2769 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2770 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2771 let mut idle_start_listener = self.idle_start_event.listen();
2772 let mut idle_end_listener = self.idle_end_event.listen();
2773 let mut fresh_idle = true;
2774 loop {
2775 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2776 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2777 let idle_timeout = *IDLE_TIMEOUT;
2778 let (time, mut reason) =
2779 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2780 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2781 } else {
2782 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2783 };
2784
2785 let until = last_snapshot + time;
2786 if until > Instant::now() {
2787 let mut stop_listener = self.stopping_event.listen();
2788 if self.stopping.load(Ordering::Acquire) {
2789 return;
2790 }
2791 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2792 Instant::now() + idle_timeout
2793 } else {
2794 far_future()
2795 };
2796 loop {
2797 tokio::select! {
2798 _ = &mut stop_listener => {
2799 return;
2800 },
2801 _ = &mut idle_start_listener => {
2802 idle_time = Instant::now() + idle_timeout;
2803 idle_start_listener = self.idle_start_event.listen()
2804 },
2805 _ = &mut idle_end_listener => {
2806 idle_time = until + idle_timeout;
2807 idle_end_listener = self.idle_end_event.listen()
2808 },
2809 _ = tokio::time::sleep_until(until) => {
2810 break;
2811 },
2812 _ = tokio::time::sleep_until(idle_time) => {
2813 if turbo_tasks.is_idle() {
2814 reason = "idle timeout";
2815 break;
2816 }
2817 },
2818 }
2819 }
2820 }
2821
2822 let this = self.clone();
2823 let background_span =
2827 tracing::info_span!(parent: None, "background snapshot");
2828 match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2829 Err(err) => {
2830 eprintln!("Persisting failed: {err:?}");
2833 Self::log_unrecoverable_persist_error();
2834 return;
2835 }
2836 Ok((snapshot_start, new_data)) => {
2837 last_snapshot = snapshot_start;
2838
2839 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2846 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2847 let idle_ended = tokio::select! {
2848 biased;
2849 _ = &mut idle_end_listener => {
2850 idle_end_listener = self.idle_end_event.listen();
2851 true
2852 },
2853 _ = std::future::ready(()) => false,
2854 };
2855 if idle_ended {
2856 break;
2857 }
2858 let _compact_span = tracing::info_span!(
2862 parent: background_span.id(),
2863 "compact database"
2864 )
2865 .entered();
2866 match self.backing_storage.compact() {
2867 Ok(true) => {}
2868 Ok(false) => break,
2869 Err(err) => {
2870 eprintln!("Compaction failed: {err:?}");
2871 if self.backing_storage.has_unrecoverable_write_error()
2872 {
2873 Self::log_unrecoverable_persist_error();
2874 return;
2875 }
2876 break;
2877 }
2878 }
2879 }
2880
2881 if !new_data {
2882 fresh_idle = false;
2883 continue;
2884 }
2885 let last_snapshot = last_snapshot.duration_since(self.start_time);
2886 self.last_snapshot.store(
2887 last_snapshot.as_millis().try_into().unwrap(),
2888 Ordering::Relaxed,
2889 );
2890
2891 turbo_tasks.schedule_backend_background_job(
2892 TurboTasksBackendJob::FollowUpSnapshot,
2893 );
2894 return;
2895 }
2896 }
2897 }
2898 }
2899 }
2900 })
2901 }
2902
2903 fn try_read_own_task_cell(
2904 &self,
2905 task_id: TaskId,
2906 cell: CellId,
2907 options: ReadCellOptions,
2908 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2909 ) -> Result<TypedCellContent> {
2910 let mut ctx = self.execute_context(turbo_tasks);
2911 let task = ctx.task(task_id, TaskDataCategory::Data);
2912 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2913 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2914 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2915 } else {
2916 Ok(CellContent(None).into_typed(cell.type_id))
2917 }
2918 }
2919
2920 fn read_task_collectibles(
2921 &self,
2922 task_id: TaskId,
2923 collectible_type: TraitTypeId,
2924 reader_id: Option<TaskId>,
2925 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2926 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2927 let mut ctx = self.execute_context(turbo_tasks);
2928 let mut collectibles = AutoMap::default();
2929 {
2930 let mut task = ctx.task(task_id, TaskDataCategory::All);
2931 loop {
2933 let aggregation_number = get_aggregation_number(&task);
2934 if is_root_node(aggregation_number) {
2935 break;
2936 }
2937 drop(task);
2938 AggregationUpdateQueue::run(
2939 AggregationUpdateJob::UpdateAggregationNumber {
2940 task_id,
2941 base_aggregation_number: u32::MAX,
2942 distance: None,
2943 },
2944 &mut ctx,
2945 );
2946 task = ctx.task(task_id, TaskDataCategory::All);
2947 }
2948 for (collectible, count) in task.iter_aggregated_collectibles() {
2949 if *count > 0 && collectible.collectible_type == collectible_type {
2950 *collectibles
2951 .entry(RawVc::TaskCell(
2952 collectible.cell.task,
2953 collectible.cell.cell,
2954 ))
2955 .or_insert(0) += 1;
2956 }
2957 }
2958 for (&collectible, &count) in task.iter_collectibles() {
2959 if collectible.collectible_type == collectible_type {
2960 *collectibles
2961 .entry(RawVc::TaskCell(
2962 collectible.cell.task,
2963 collectible.cell.cell,
2964 ))
2965 .or_insert(0) += count;
2966 }
2967 }
2968 if let Some(reader_id) = reader_id {
2969 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2970 }
2971 }
2972 if let Some(reader_id) = reader_id {
2973 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2974 let target = CollectiblesRef {
2975 task: task_id,
2976 collectible_type,
2977 };
2978 if !reader.remove_outdated_collectibles_dependencies(&target) {
2979 let _ = reader.add_collectibles_dependencies(target);
2980 }
2981 }
2982 collectibles
2983 }
2984
2985 fn emit_collectible(
2986 &self,
2987 collectible_type: TraitTypeId,
2988 collectible: RawVc,
2989 task_id: TaskId,
2990 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2991 ) {
2992 self.assert_valid_collectible(task_id, collectible);
2993
2994 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2995 panic!("Collectibles need to be resolved");
2996 };
2997 let cell = CellRef {
2998 task: collectible_task,
2999 cell,
3000 };
3001 operation::UpdateCollectibleOperation::run(
3002 task_id,
3003 CollectibleRef {
3004 collectible_type,
3005 cell,
3006 },
3007 1,
3008 self.execute_context(turbo_tasks),
3009 );
3010 }
3011
3012 fn unemit_collectible(
3013 &self,
3014 collectible_type: TraitTypeId,
3015 collectible: RawVc,
3016 count: u32,
3017 task_id: TaskId,
3018 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3019 ) {
3020 self.assert_valid_collectible(task_id, collectible);
3021
3022 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3023 panic!("Collectibles need to be resolved");
3024 };
3025 let cell = CellRef {
3026 task: collectible_task,
3027 cell,
3028 };
3029 operation::UpdateCollectibleOperation::run(
3030 task_id,
3031 CollectibleRef {
3032 collectible_type,
3033 cell,
3034 },
3035 -(i32::try_from(count).unwrap()),
3036 self.execute_context(turbo_tasks),
3037 );
3038 }
3039
3040 fn update_task_cell(
3041 &self,
3042 task_id: TaskId,
3043 cell: CellId,
3044 is_serializable_cell_content: bool,
3045 content: CellContent,
3046 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3047 content_hash: Option<CellHash>,
3048 verification_mode: VerificationMode,
3049 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3050 ) {
3051 operation::UpdateCellOperation::run(
3052 task_id,
3053 cell,
3054 content,
3055 is_serializable_cell_content,
3056 updated_key_hashes,
3057 content_hash,
3058 verification_mode,
3059 self.execute_context(turbo_tasks),
3060 );
3061 }
3062
3063 fn mark_own_task_as_session_dependent(
3064 &self,
3065 task_id: TaskId,
3066 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3067 ) {
3068 if !self.should_track_dependencies() {
3069 return;
3071 }
3072 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3073 let mut ctx = self.execute_context(turbo_tasks);
3074 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3075 let aggregation_number = get_aggregation_number(&task);
3076 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3077 drop(task);
3078 AggregationUpdateQueue::run(
3081 AggregationUpdateJob::UpdateAggregationNumber {
3082 task_id,
3083 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3084 distance: None,
3085 },
3086 &mut ctx,
3087 );
3088 task = ctx.task(task_id, TaskDataCategory::Meta);
3089 }
3090 if let Some(InProgressState::InProgress(box InProgressStateInner {
3091 session_dependent,
3092 ..
3093 })) = task.get_in_progress_mut()
3094 {
3095 *session_dependent = true;
3096 }
3097 }
3098
3099 fn mark_own_task_as_finished(
3100 &self,
3101 task: TaskId,
3102 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3103 ) {
3104 let mut ctx = self.execute_context(turbo_tasks);
3105 let mut task = ctx.task(task, TaskDataCategory::Data);
3106 if let Some(InProgressState::InProgress(box InProgressStateInner {
3107 marked_as_completed,
3108 ..
3109 })) = task.get_in_progress_mut()
3110 {
3111 *marked_as_completed = true;
3112 }
3117 }
3118
3119 fn connect_task(
3120 &self,
3121 task: TaskId,
3122 parent_task: Option<TaskId>,
3123 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3124 ) {
3125 self.assert_not_persistent_calling_transient(parent_task, task, None);
3126 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
3127 }
3128
3129 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3130 let task_id = self.transient_task_id_factory.get();
3131 {
3132 let mut task = self.storage.access_mut(task_id);
3133 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3134 }
3135 #[cfg(feature = "verify_aggregation_graph")]
3136 self.root_tasks.lock().insert(task_id);
3137 task_id
3138 }
3139
3140 fn dispose_root_task(
3141 &self,
3142 task_id: TaskId,
3143 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3144 ) {
3145 #[cfg(feature = "verify_aggregation_graph")]
3146 self.root_tasks.lock().remove(&task_id);
3147
3148 let mut ctx = self.execute_context(turbo_tasks);
3149 let mut task = ctx.task(task_id, TaskDataCategory::All);
3150 let is_dirty = task.is_dirty();
3151 let has_dirty_containers = task.has_dirty_containers();
3152 if is_dirty.is_some() || has_dirty_containers {
3153 if let Some(activeness_state) = task.get_activeness_mut() {
3154 activeness_state.unset_root_type();
3156 activeness_state.set_active_until_clean();
3157 };
3158 } else if let Some(activeness_state) = task.take_activeness() {
3159 activeness_state.all_clean_event.notify(usize::MAX);
3162 }
3163 }
3164
3165 #[cfg(feature = "verify_aggregation_graph")]
3166 fn verify_aggregation_graph(
3167 &self,
3168 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3169 idle: bool,
3170 ) {
3171 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3172 return;
3173 }
3174 use std::{collections::VecDeque, env, io::stdout};
3175
3176 use crate::backend::operation::{get_uppers, is_aggregating_node};
3177
3178 let mut ctx = self.execute_context(turbo_tasks);
3179 let root_tasks = self.root_tasks.lock().clone();
3180
3181 for task_id in root_tasks.into_iter() {
3182 let mut queue = VecDeque::new();
3183 let mut visited = FxHashSet::default();
3184 let mut aggregated_nodes = FxHashSet::default();
3185 let mut collectibles = FxHashMap::default();
3186 let root_task_id = task_id;
3187 visited.insert(task_id);
3188 aggregated_nodes.insert(task_id);
3189 queue.push_back(task_id);
3190 let mut counter = 0;
3191 while let Some(task_id) = queue.pop_front() {
3192 counter += 1;
3193 if counter % 100000 == 0 {
3194 println!(
3195 "queue={}, visited={}, aggregated_nodes={}",
3196 queue.len(),
3197 visited.len(),
3198 aggregated_nodes.len()
3199 );
3200 }
3201 let task = ctx.task(task_id, TaskDataCategory::All);
3202 if idle && !self.is_idle.load(Ordering::Relaxed) {
3203 return;
3204 }
3205
3206 let uppers = get_uppers(&task);
3207 if task_id != root_task_id
3208 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3209 {
3210 panic!(
3211 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3212 {:?})",
3213 task_id,
3214 task.get_task_description(),
3215 uppers
3216 );
3217 }
3218
3219 for (collectible, _) in task.iter_aggregated_collectibles() {
3220 collectibles
3221 .entry(*collectible)
3222 .or_insert_with(|| (false, Vec::new()))
3223 .1
3224 .push(task_id);
3225 }
3226
3227 for (&collectible, &value) in task.iter_collectibles() {
3228 if value > 0 {
3229 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3230 *flag = true
3231 } else {
3232 panic!(
3233 "Task {} has a collectible {:?} that is not in any upper task",
3234 task_id, collectible
3235 );
3236 }
3237 }
3238 }
3239
3240 let is_dirty = task.has_dirty();
3241 let has_dirty_container = task.has_dirty_containers();
3242 let should_be_in_upper = is_dirty || has_dirty_container;
3243
3244 let aggregation_number = get_aggregation_number(&task);
3245 if is_aggregating_node(aggregation_number) {
3246 aggregated_nodes.insert(task_id);
3247 }
3248 for child_id in task.iter_children() {
3255 if visited.insert(child_id) {
3257 queue.push_back(child_id);
3258 }
3259 }
3260 drop(task);
3261
3262 if should_be_in_upper {
3263 for upper_id in uppers {
3264 let upper = ctx.task(upper_id, TaskDataCategory::All);
3265 let in_upper = upper
3266 .get_aggregated_dirty_containers(&task_id)
3267 .is_some_and(|&dirty| dirty > 0);
3268 if !in_upper {
3269 let containers: Vec<_> = upper
3270 .iter_aggregated_dirty_containers()
3271 .map(|(&k, &v)| (k, v))
3272 .collect();
3273 let upper_task_desc = upper.get_task_description();
3274 drop(upper);
3275 panic!(
3276 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3277 ({})\nThese dirty containers are present:\n{:#?}",
3278 task_id,
3279 ctx.task(task_id, TaskDataCategory::Data)
3280 .get_task_description(),
3281 upper_id,
3282 upper_task_desc,
3283 containers,
3284 );
3285 }
3286 }
3287 }
3288 }
3289
3290 for (collectible, (flag, task_ids)) in collectibles {
3291 if !flag {
3292 use std::io::Write;
3293 let mut stdout = stdout().lock();
3294 writeln!(
3295 stdout,
3296 "{:?} that is not emitted in any child task but in these aggregated \
3297 tasks: {:#?}",
3298 collectible,
3299 task_ids
3300 .iter()
3301 .map(|t| format!(
3302 "{t} {}",
3303 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3304 ))
3305 .collect::<Vec<_>>()
3306 )
3307 .unwrap();
3308
3309 let task_id = collectible.cell.task;
3310 let mut queue = {
3311 let task = ctx.task(task_id, TaskDataCategory::All);
3312 get_uppers(&task)
3313 };
3314 let mut visited = FxHashSet::default();
3315 for &upper_id in queue.iter() {
3316 visited.insert(upper_id);
3317 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3318 }
3319 while let Some(task_id) = queue.pop() {
3320 let task = ctx.task(task_id, TaskDataCategory::All);
3321 let desc = task.get_task_description();
3322 let aggregated_collectible = task
3323 .get_aggregated_collectibles(&collectible)
3324 .copied()
3325 .unwrap_or_default();
3326 let uppers = get_uppers(&task);
3327 drop(task);
3328 writeln!(
3329 stdout,
3330 "upper {task_id} {desc} collectible={aggregated_collectible}"
3331 )
3332 .unwrap();
3333 if task_ids.contains(&task_id) {
3334 writeln!(
3335 stdout,
3336 "Task has an upper connection to an aggregated task that doesn't \
3337 reference it. Upper connection is invalid!"
3338 )
3339 .unwrap();
3340 }
3341 for upper_id in uppers {
3342 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3343 if !visited.contains(&upper_id) {
3344 queue.push(upper_id);
3345 }
3346 }
3347 }
3348 panic!("See stdout for more details");
3349 }
3350 }
3351 }
3352 }
3353
3354 fn assert_not_persistent_calling_transient(
3355 &self,
3356 parent_id: Option<TaskId>,
3357 child_id: TaskId,
3358 cell_id: Option<CellId>,
3359 ) {
3360 if let Some(parent_id) = parent_id
3361 && !parent_id.is_transient()
3362 && child_id.is_transient()
3363 {
3364 self.panic_persistent_calling_transient(
3365 self.debug_get_task_description(parent_id),
3366 self.debug_get_cached_task_type(child_id).as_deref(),
3367 cell_id,
3368 );
3369 }
3370 }
3371
3372 fn panic_persistent_calling_transient(
3373 &self,
3374 parent: String,
3375 child: Option<&CachedTaskType>,
3376 cell_id: Option<CellId>,
3377 ) -> ! {
3378 let transient_reason = if let Some(child) = child {
3379 Cow::Owned(format!(
3380 " The callee is transient because it depends on:\n{}",
3381 self.debug_trace_transient_task(child, cell_id),
3382 ))
3383 } else {
3384 Cow::Borrowed("")
3385 };
3386 panic!(
3387 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3388 parent,
3389 child.map_or("unknown", |t| t.get_name()),
3390 transient_reason,
3391 );
3392 }
3393
3394 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3395 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3397 let task_info = if let Some(col_task_ty) = collectible
3399 .try_get_task_id()
3400 .map(|t| self.debug_get_task_description(t))
3401 {
3402 Cow::Owned(format!(" (return type of {col_task_ty})"))
3403 } else {
3404 Cow::Borrowed("")
3405 };
3406 panic!("Collectible{task_info} must be a ResolvedVc")
3407 };
3408 if col_task_id.is_transient() && !task_id.is_transient() {
3409 let transient_reason =
3410 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3411 Cow::Owned(format!(
3412 ". The collectible is transient because it depends on:\n{}",
3413 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3414 ))
3415 } else {
3416 Cow::Borrowed("")
3417 };
3418 panic!(
3420 "Collectible is transient, transient collectibles cannot be emitted from \
3421 persistent tasks{transient_reason}",
3422 )
3423 }
3424 }
3425}
3426
3427impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3428 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3429 self.0.startup(turbo_tasks);
3430 }
3431
3432 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3433 self.0.stopping();
3434 }
3435
3436 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3437 self.0.stop(turbo_tasks);
3438 }
3439
3440 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3441 self.0.idle_start(turbo_tasks);
3442 }
3443
3444 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3445 self.0.idle_end();
3446 }
3447
3448 fn get_or_create_task(
3449 &self,
3450 native_fn: &'static NativeFunction,
3451 this: Option<RawVc>,
3452 arg: &mut dyn StackDynTaskInputs,
3453 parent_task: Option<TaskId>,
3454 persistence: TaskPersistence,
3455 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3456 ) -> TaskId {
3457 self.0
3458 .get_or_create_task(native_fn, this, arg, parent_task, persistence, turbo_tasks)
3459 }
3460
3461 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3462 self.0.invalidate_task(task_id, turbo_tasks);
3463 }
3464
3465 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3466 self.0.invalidate_tasks(tasks, turbo_tasks);
3467 }
3468
3469 fn invalidate_tasks_set(
3470 &self,
3471 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3472 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3473 ) {
3474 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3475 }
3476
3477 fn invalidate_serialization(
3478 &self,
3479 task_id: TaskId,
3480 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3481 ) {
3482 self.0.invalidate_serialization(task_id, turbo_tasks);
3483 }
3484
3485 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3486 self.0.task_execution_canceled(task, turbo_tasks)
3487 }
3488
3489 fn try_start_task_execution(
3490 &self,
3491 task_id: TaskId,
3492 priority: TaskPriority,
3493 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3494 ) -> Option<TaskExecutionSpec<'_>> {
3495 self.0
3496 .try_start_task_execution(task_id, priority, turbo_tasks)
3497 }
3498
3499 fn task_execution_completed(
3500 &self,
3501 task_id: TaskId,
3502 result: Result<RawVc, TurboTasksExecutionError>,
3503 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3504 #[cfg(feature = "verify_determinism")] stateful: bool,
3505 has_invalidator: bool,
3506 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3507 ) -> bool {
3508 self.0.task_execution_completed(
3509 task_id,
3510 result,
3511 cell_counters,
3512 #[cfg(feature = "verify_determinism")]
3513 stateful,
3514 has_invalidator,
3515 turbo_tasks,
3516 )
3517 }
3518
3519 type BackendJob = TurboTasksBackendJob;
3520
3521 fn run_backend_job<'a>(
3522 &'a self,
3523 job: Self::BackendJob,
3524 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3525 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3526 self.0.run_backend_job(job, turbo_tasks)
3527 }
3528
3529 fn try_read_task_output(
3530 &self,
3531 task_id: TaskId,
3532 reader: Option<TaskId>,
3533 options: ReadOutputOptions,
3534 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3535 ) -> Result<Result<RawVc, EventListener>> {
3536 self.0
3537 .try_read_task_output(task_id, reader, options, turbo_tasks)
3538 }
3539
3540 fn try_read_task_cell(
3541 &self,
3542 task_id: TaskId,
3543 cell: CellId,
3544 reader: Option<TaskId>,
3545 options: ReadCellOptions,
3546 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3547 ) -> Result<Result<TypedCellContent, EventListener>> {
3548 self.0
3549 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3550 }
3551
3552 fn try_read_own_task_cell(
3553 &self,
3554 task_id: TaskId,
3555 cell: CellId,
3556 options: ReadCellOptions,
3557 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3558 ) -> Result<TypedCellContent> {
3559 self.0
3560 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3561 }
3562
3563 fn read_task_collectibles(
3564 &self,
3565 task_id: TaskId,
3566 collectible_type: TraitTypeId,
3567 reader: Option<TaskId>,
3568 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3569 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3570 self.0
3571 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3572 }
3573
3574 fn emit_collectible(
3575 &self,
3576 collectible_type: TraitTypeId,
3577 collectible: RawVc,
3578 task_id: TaskId,
3579 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3580 ) {
3581 self.0
3582 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3583 }
3584
3585 fn unemit_collectible(
3586 &self,
3587 collectible_type: TraitTypeId,
3588 collectible: RawVc,
3589 count: u32,
3590 task_id: TaskId,
3591 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3592 ) {
3593 self.0
3594 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3595 }
3596
3597 fn update_task_cell(
3598 &self,
3599 task_id: TaskId,
3600 cell: CellId,
3601 is_serializable_cell_content: bool,
3602 content: CellContent,
3603 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3604 content_hash: Option<CellHash>,
3605 verification_mode: VerificationMode,
3606 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3607 ) {
3608 self.0.update_task_cell(
3609 task_id,
3610 cell,
3611 is_serializable_cell_content,
3612 content,
3613 updated_key_hashes,
3614 content_hash,
3615 verification_mode,
3616 turbo_tasks,
3617 );
3618 }
3619
3620 fn mark_own_task_as_finished(
3621 &self,
3622 task_id: TaskId,
3623 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3624 ) {
3625 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3626 }
3627
3628 fn mark_own_task_as_session_dependent(
3629 &self,
3630 task: TaskId,
3631 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3632 ) {
3633 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3634 }
3635
3636 fn connect_task(
3637 &self,
3638 task: TaskId,
3639 parent_task: Option<TaskId>,
3640 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3641 ) {
3642 self.0.connect_task(task, parent_task, turbo_tasks);
3643 }
3644
3645 fn create_transient_task(
3646 &self,
3647 task_type: TransientTaskType,
3648 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3649 ) -> TaskId {
3650 self.0.create_transient_task(task_type)
3651 }
3652
3653 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3654 self.0.dispose_root_task(task_id, turbo_tasks);
3655 }
3656
3657 fn task_statistics(&self) -> &TaskStatisticsApi {
3658 &self.0.task_statistics
3659 }
3660
3661 fn is_tracking_dependencies(&self) -> bool {
3662 self.0.options.dependency_tracking
3663 }
3664
3665 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3666 self.0.get_task_name(task, turbo_tasks)
3667 }
3668}
3669
3670enum DebugTraceTransientTask {
3671 Cached {
3672 task_name: &'static str,
3673 cell_type_id: Option<ValueTypeId>,
3674 cause_self: Option<Box<DebugTraceTransientTask>>,
3675 cause_args: Vec<DebugTraceTransientTask>,
3676 },
3677 Collapsed {
3679 task_name: &'static str,
3680 cell_type_id: Option<ValueTypeId>,
3681 },
3682 Uncached {
3683 cell_type_id: Option<ValueTypeId>,
3684 },
3685}
3686
3687impl DebugTraceTransientTask {
3688 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3689 let indent = " ".repeat(level);
3690 f.write_str(&indent)?;
3691
3692 fn fmt_cell_type_id(
3693 f: &mut fmt::Formatter<'_>,
3694 cell_type_id: Option<ValueTypeId>,
3695 ) -> fmt::Result {
3696 if let Some(ty) = cell_type_id {
3697 write!(
3698 f,
3699 " (read cell of type {})",
3700 get_value_type(ty).ty.global_name
3701 )
3702 } else {
3703 Ok(())
3704 }
3705 }
3706
3707 match self {
3709 Self::Cached {
3710 task_name,
3711 cell_type_id,
3712 ..
3713 }
3714 | Self::Collapsed {
3715 task_name,
3716 cell_type_id,
3717 ..
3718 } => {
3719 f.write_str(task_name)?;
3720 fmt_cell_type_id(f, *cell_type_id)?;
3721 if matches!(self, Self::Collapsed { .. }) {
3722 f.write_str(" (collapsed)")?;
3723 }
3724 }
3725 Self::Uncached { cell_type_id } => {
3726 f.write_str("unknown transient task")?;
3727 fmt_cell_type_id(f, *cell_type_id)?;
3728 }
3729 }
3730 f.write_char('\n')?;
3731
3732 if let Self::Cached {
3734 cause_self,
3735 cause_args,
3736 ..
3737 } = self
3738 {
3739 if let Some(c) = cause_self {
3740 writeln!(f, "{indent} self:")?;
3741 c.fmt_indented(f, level + 1)?;
3742 }
3743 if !cause_args.is_empty() {
3744 writeln!(f, "{indent} args:")?;
3745 for c in cause_args {
3746 c.fmt_indented(f, level + 1)?;
3747 }
3748 }
3749 }
3750 Ok(())
3751 }
3752}
3753
3754impl fmt::Display for DebugTraceTransientTask {
3755 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3756 self.fmt_indented(f, 0)
3757 }
3758}
3759
3760fn far_future() -> Instant {
3762 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3767}
3768
3769fn encode_task_data(
3781 task: TaskId,
3782 data: &TaskStorage,
3783 category: SpecificTaskDataCategory,
3784 scratch_buffer: &mut TurboBincodeBuffer,
3785) -> Result<TurboBincodeBuffer> {
3786 scratch_buffer.clear();
3787 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3788 data.encode(category, &mut encoder)?;
3789
3790 if cfg!(feature = "verify_serialization") {
3791 TaskStorage::new()
3792 .decode(
3793 category,
3794 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3795 )
3796 .with_context(|| {
3797 format!(
3798 "expected to be able to decode serialized data for '{category:?}' information \
3799 for {task}"
3800 )
3801 })?;
3802 }
3803 Ok(SmallVec::from_slice(scratch_buffer))
3804}